Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11225
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Dec 18, 2022
2 parents d03e5ae + 6a07b5d commit ff8c2bf
Show file tree
Hide file tree
Showing 18 changed files with 205 additions and 28 deletions.
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ commons-collections:commons-collections:3.2.2
commons-daemon:commons-daemon:1.0.13
commons-io:commons-io:2.8.0
commons-logging:commons-logging:1.1.3
commons-net:commons-net:3.8.0
commons-net:commons-net:3.9.0
de.ruedigermoeller:fst:2.50
io.grpc:grpc-api:1.26.0
io.grpc:grpc-context:1.26.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public long incrementCounter(final String key, final long value) {
return counter.get();
} else {
long l = incAtomicLong(counter, value);
LOG.debug("Incrementing counter {} by {} with final value {}",
LOG.trace("Incrementing counter {} by {} with final value {}",
key, value, l);
return l;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,15 @@ public LogAction record(String recorderName, long currentTimeMs,
if (primaryRecorderName.equals(recorderName) &&
currentTimeMs - minLogPeriodMs >= lastLogTimestampMs) {
lastLogTimestampMs = currentTimeMs;
for (LoggingAction log : currentLogs.values()) {
log.setShouldLog();
}
currentLogs.replaceAll((key, log) -> {
LoggingAction newLog = log;
if (log.hasLogged()) {
// create a fresh log since the old one has already been logged
newLog = new LoggingAction(log.getValueCount());
}
newLog.setShouldLog();
return newLog;
});
}
if (currentLog.shouldLog()) {
currentLog.setHasLogged();
Expand Down Expand Up @@ -357,6 +363,10 @@ private void setHasLogged() {
hasLogged = true;
}

private int getValueCount() {
return stats.length;
}

private void recordValues(double... values) {
if (values.length != stats.length) {
throw new IllegalArgumentException("received " + values.length +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.util.Shell.bashQuote;

/**
* A simple shell-based implementation of {@link IdMappingServiceProvider}
* Map id to user name or group name. It does update every 15 minutes. Only a
Expand Down Expand Up @@ -472,26 +474,27 @@ synchronized private void updateMapIncr(final String name,

boolean updated = false;
updateStaticMapping();
String name2 = bashQuote(name);

if (OS.startsWith("Linux") || OS.equals("SunOS") || OS.contains("BSD")) {
if (isGrp) {
updated = updateMapInternal(gidNameMap, "group",
getName2IdCmdNIX(name, true), ":",
getName2IdCmdNIX(name2, true), ":",
staticMapping.gidMapping);
} else {
updated = updateMapInternal(uidNameMap, "user",
getName2IdCmdNIX(name, false), ":",
getName2IdCmdNIX(name2, false), ":",
staticMapping.uidMapping);
}
} else {
// Mac
if (isGrp) {
updated = updateMapInternal(gidNameMap, "group",
getName2IdCmdMac(name, true), "\\s+",
getName2IdCmdMac(name2, true), "\\s+",
staticMapping.gidMapping);
} else {
updated = updateMapInternal(uidNameMap, "user",
getName2IdCmdMac(name, false), "\\s+",
getName2IdCmdMac(name2, false), "\\s+",
staticMapping.uidMapping);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public static void checkWindowsCommandLineLength(String...commands)
* @param arg the argument to quote
* @return the quoted string
*/
static String bashQuote(String arg) {
@InterfaceAudience.Private
public static String bashQuote(String arg) {
StringBuilder buffer = new StringBuilder(arg.length() + 2);
buffer.append('\'')
.append(arg.replace("'", "'\\''"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

import java.io.*;
Expand All @@ -41,6 +43,9 @@
@InterfaceStability.Unstable
public class XMLUtils {

private static final Logger LOG =
LoggerFactory.getLogger(XMLUtils.class);

public static final String DISALLOW_DOCTYPE_DECL =
"http://apache.org/xml/features/disallow-doctype-decl";
public static final String LOAD_EXTERNAL_DECL =
Expand Down Expand Up @@ -138,8 +143,8 @@ public static TransformerFactory newSecureTransformerFactory()
throws TransformerConfigurationException {
TransformerFactory trfactory = TransformerFactory.newInstance();
trfactory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
trfactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, "");
trfactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_STYLESHEET, "");
bestEffortSetAttribute(trfactory, XMLConstants.ACCESS_EXTERNAL_DTD, "");
bestEffortSetAttribute(trfactory, XMLConstants.ACCESS_EXTERNAL_STYLESHEET, "");
return trfactory;
}

Expand All @@ -156,8 +161,29 @@ public static SAXTransformerFactory newSecureSAXTransformerFactory()
throws TransformerConfigurationException {
SAXTransformerFactory trfactory = (SAXTransformerFactory) SAXTransformerFactory.newInstance();
trfactory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
trfactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, "");
trfactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_STYLESHEET, "");
bestEffortSetAttribute(trfactory, XMLConstants.ACCESS_EXTERNAL_DTD, "");
bestEffortSetAttribute(trfactory, XMLConstants.ACCESS_EXTERNAL_STYLESHEET, "");
return trfactory;
}

/**
* Set an attribute value on a {@link TransformerFactory}. If the TransformerFactory
* does not support the attribute, the method just returns <code>false</code> and
* logs the issue at debug level.
*
* @param transformerFactory to update
* @param name of the attribute to set
* @param value to set on the attribute
* @return whether the attribute was successfully set
*/
static boolean bestEffortSetAttribute(TransformerFactory transformerFactory,
String name, Object value) {
try {
transformerFactory.setAttribute(name, value);
return true;
} catch (Throwable t) {
LOG.debug("Issue setting TransformerFactory attribute {}: {}", name, t.toString());
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ public void testPrimaryAndDependentLoggers() {
assertTrue(helper.record("bar", 0).shouldLog());
}

@Test
public void testInfrequentPrimaryAndDependentLoggers() {
helper = new LogThrottlingHelper(LOG_PERIOD, "foo", timer);

assertTrue(helper.record("foo", 0).shouldLog());
assertTrue(helper.record("bar", 0).shouldLog());

// Both should log once the period has elapsed
assertTrue(helper.record("foo", LOG_PERIOD).shouldLog());
assertTrue(helper.record("bar", LOG_PERIOD).shouldLog());
}

@Test
public void testMultipleLoggersWithValues() {
helper = new LogThrottlingHelper(LOG_PERIOD, "foo", timer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
import java.io.InputStream;
import java.io.StringReader;
import java.io.StringWriter;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.SAXParser;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;

import org.apache.hadoop.test.AbstractHadoopTestBase;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.w3c.dom.Document;
import org.xml.sax.InputSource;
Expand Down Expand Up @@ -128,6 +131,15 @@ public void testExternalDtdWithSecureSAXTransformerFactory() throws Exception {
}
}

@Test
public void testBestEffortSetAttribute() throws Exception {
TransformerFactory factory = TransformerFactory.newInstance();
Assert.assertFalse("unexpected attribute results in return of false",
XMLUtils.bestEffortSetAttribute(factory, "unsupportedAttribute false", "abc"));
Assert.assertTrue("expected attribute results in return of false",
XMLUtils.bestEffortSetAttribute(factory, XMLConstants.ACCESS_EXTERNAL_DTD, ""));
}

private static InputStream getResourceStream(final String filename) {
return TestXMLUtils.class.getResourceAsStream(filename);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ public void onRemoval(
})
.build();

ShutdownHookManager.get().addShutdownHook(new KeyProviderCacheFinalizer(),
SHUTDOWN_HOOK_PRIORITY);
// Register the shutdown hook when not in shutdown
if (!ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(
new KeyProviderCacheFinalizer(), SHUTDOWN_HOOK_PRIORITY);
}
}

public KeyProvider get(final Configuration conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,12 +708,12 @@ public String toString() {

Result newResult(ExitStatus exitStatus, long bytesLeftToMove, long bytesBeingMoved) {
return new Result(exitStatus, bytesLeftToMove, bytesBeingMoved,
dispatcher.getBytesMoved(), dispatcher.getBblocksMoved());
dispatcher.getBytesMoved(), dispatcher.getBlocksMoved());
}

Result newResult(ExitStatus exitStatus) {
return new Result(exitStatus, -1, -1, dispatcher.getBytesMoved(),
dispatcher.getBblocksMoved());
dispatcher.getBlocksMoved());
}

/** Run an iteration for all datanodes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ synchronized int allocate(int n) {
}
}

/** Aloocate a single lot of items */
/** Allocate a single lot of items. */
int allocate() {
return allocate(lotSize);
}
Expand Down Expand Up @@ -1127,7 +1127,7 @@ long getBytesMoved() {
return nnc.getBytesMoved().get();
}

long getBblocksMoved() {
long getBlocksMoved() {
return nnc.getBlocksMoved().get();
}

Expand Down Expand Up @@ -1234,7 +1234,7 @@ public boolean dispatchAndCheckContinue() throws InterruptedException {
*/
private long dispatchBlockMoves() throws InterruptedException {
final long bytesLastMoved = getBytesMoved();
final long blocksLastMoved = getBblocksMoved();
final long blocksLastMoved = getBlocksMoved();
final Future<?>[] futures = new Future<?>[sources.size()];

int concurrentThreads = Math.min(sources.size(),
Expand Down Expand Up @@ -1284,7 +1284,7 @@ public void run() {
waitForMoveCompletion(targets);
LOG.info("Total bytes (blocks) moved in this iteration {} ({})",
StringUtils.byteDesc(getBytesMoved() - bytesLastMoved),
(getBblocksMoved() - blocksLastMoved));
(getBlocksMoved() - blocksLastMoved));

return getBytesMoved() - bytesLastMoved;
}
Expand Down
2 changes: 1 addition & 1 deletion hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
<commons-logging.version>1.1.3</commons-logging.version>
<commons-logging-api.version>1.1</commons-logging-api.version>
<commons-math3.version>3.6.1</commons-math3.version>
<commons-net.version>3.8.0</commons-net.version>
<commons-net.version>3.9.0</commons-net.version>
<commons-text.version>1.10.0</commons-text.version>

<kerby.version>2.0.2</kerby.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -235,6 +236,7 @@ public String toString() {
sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
sb.append('}');
return sb.toString();
}
Expand Down Expand Up @@ -1636,6 +1638,11 @@ public boolean hasPathCapability(final Path path, final String capability)
new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
listener));

// probe for presence of the HADOOP-18546 readahead fix.
case CAPABILITY_SAFE_READAHEAD:
return true;

default:
return super.hasPathCapability(p, capability);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.constants;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Constants which are used internally and which don't fit into the other
* classes.
* For use within the {@code hadoop-azure} module only.
*/
@InterfaceAudience.Private
public final class InternalConstants {

private InternalConstants() {
}

/**
* Does this version of the store have safe readahead?
* Possible combinations of this and the probe
* {@code "fs.capability.etags.available"}.
* <ol>
* <li>{@value}: store is safe</li>
* <li>no etags: store is safe</li>
* <li>etags and not {@value}: store is <i>UNSAFE</i></li>
* </ol>
*/
public static final String CAPABILITY_SAFE_READAHEAD =
"fs.azure.capability.readahead.safe";
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.util.StringUtils.toLowerCase;

/**
Expand Down Expand Up @@ -828,11 +829,12 @@ public IOStatistics getIOStatistics() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
if (streamStatistics != null) {
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append(streamStatistics.toString());
sb.append("}");
sb.append(", ").append(streamStatistics);
}
sb.append("}");
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void init() {

// hide instance constructor
private ReadBufferManager() {
LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
}


Expand Down
Loading

0 comments on commit ff8c2bf

Please sign in to comment.