Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
HADOOP-16080. hadoop-aws does not work with hadoop-client-api (apache…
Browse files Browse the repository at this point in the history
…#2510). Contributed by Chao Sun
  • Loading branch information
sunchao authored and Hexiaoqiao committed Dec 4, 2020
1 parent 7f286dd commit b1294e4
Show file tree
Hide file tree
Showing 18 changed files with 59 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.MoreExecutors;

import org.apache.hadoop.classification.InterfaceAudience;

/**
Expand Down Expand Up @@ -105,8 +103,7 @@ public Thread newThread(Runnable r) {

private BlockingThreadPoolExecutorService(int permitCount,
ThreadPoolExecutor eventProcessingExecutor) {
super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
permitCount, false);
super(eventProcessingExecutor, permitCount, false);
this.eventProcessingExecutor = eventProcessingExecutor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@

package org.apache.hadoop.util;

import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.ForwardingExecutorService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;

import org.apache.hadoop.classification.InterfaceAudience;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand All @@ -49,10 +48,10 @@
@SuppressWarnings("NullableProblems")
@InterfaceAudience.Private
public class SemaphoredDelegatingExecutor extends
ForwardingListeningExecutorService {
ForwardingExecutorService {

private final Semaphore queueingPermits;
private final ListeningExecutorService executorDelegatee;
private final ExecutorService executorDelegatee;
private final int permitCount;

/**
Expand All @@ -62,7 +61,7 @@ public class SemaphoredDelegatingExecutor extends
* @param fair should the semaphore be "fair"
*/
public SemaphoredDelegatingExecutor(
ListeningExecutorService executorDelegatee,
ExecutorService executorDelegatee,
int permitCount,
boolean fair) {
this.permitCount = permitCount;
Expand All @@ -71,7 +70,7 @@ public SemaphoredDelegatingExecutor(
}

@Override
protected ListeningExecutorService delegate() {
protected ExecutorService delegate() {
return executorDelegatee;
}

Expand Down Expand Up @@ -102,7 +101,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
}

@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
public <T> Future<T> submit(Callable<T> task) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Expand All @@ -113,7 +112,7 @@ public <T> ListenableFuture<T> submit(Callable<T> task) {
}

@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
public <T> Future<T> submit(Runnable task, T result) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Expand All @@ -124,7 +123,7 @@ public <T> ListenableFuture<T> submit(Runnable task, T result) {
}

@Override
public ListenableFuture<?> submit(Runnable task) {
public Future<?> submit(Runnable task) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -78,8 +77,8 @@ public class AliyunOSSFileSystem extends FileSystem {
private int maxKeys;
private int maxReadAheadPartNumber;
private int maxConcurrentCopyTasksPerDir;
private ListeningExecutorService boundedThreadPool;
private ListeningExecutorService boundedCopyThreadPool;
private ExecutorService boundedThreadPool;
private ExecutorService boundedCopyThreadPool;

private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public String toString() {
*/
private static float validProbability(float p) {
Preconditions.checkArgument(p >= 0.0f && p <= 1.0f,
"Probability out of range 0 to 1 %s", p);
String.format("Probability out of range 0 to 1 %s", p));
return p;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ private void addPrefixIfNotPresent(List<String> prefixes, String ancestor,
String child) {
Path prefixCandidate = new Path(child).getParent();
Path ancestorPath = new Path(ancestor);
Preconditions.checkArgument(child.startsWith(ancestor), "%s does not " +
"start with %s", child, ancestor);
Preconditions.checkArgument(child.startsWith(ancestor),
String.format("%s does not start with %s", child, ancestor));
while (!prefixCandidate.isRoot()) {
Path nextParent = prefixCandidate.getParent();
if (nextParent.equals(ancestorPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.writeOperationHelper = writeOperationHelper;
this.putTracker = putTracker;
Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
"Block size is too small: %d", blockSize);
String.format("Block size is too small: %d", blockSize));
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.multiPartUpload = null;
this.progressListener = (progress instanceof ProgressListener) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import com.amazonaws.event.ProgressListener;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -181,7 +180,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
private ListeningExecutorService boundedThreadPool;
private ExecutorService boundedThreadPool;
private ExecutorService unboundedThreadPool;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
Expand Down Expand Up @@ -2254,9 +2253,9 @@ S3AFileStatus s3GetFileStatus(final Path path,
final boolean needEmptyDirectoryFlag) throws IOException {
LOG.debug("S3GetFileStatus {}", path);
Preconditions.checkArgument(!needEmptyDirectoryFlag
|| probes.contains(StatusProbeEnum.List),
"s3GetFileStatus(%s) wants to know if a directory is empty but"
+ " does not request a list probe", path);
|| probes.contains(StatusProbeEnum.List), String.format(
"s3GetFileStatus(%s) wants to know if a directory is empty but"
+ " does not request a list probe", path));

if (!key.isEmpty() && !key.endsWith("/")
&& probes.contains(StatusProbeEnum.Head)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class S3AMultipartUploader extends MultipartUploader {

public S3AMultipartUploader(FileSystem fs, Configuration conf) {
Preconditions.checkArgument(fs instanceof S3AFileSystem,
"Wrong filesystem: expected S3A but got %s", fs);
String.format("Wrong filesystem: expected S3A but got %s", fs));
s3a = (S3AFileSystem) fs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public S3AReadOpContext(
dstFileStatus);
this.path = checkNotNull(path);
Preconditions.checkArgument(readahead >= 0,
"invalid readahead %d", readahead);
String.format("invalid readahead %d", readahead));
this.inputPolicy = checkNotNull(inputPolicy);
this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy);
this.readahead = readahead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ public static String lookupPassword(
throws IOException {
String initialVal;
Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
"%s does not start with $%s", baseKey, FS_S3A_PREFIX);
String.format("%s does not start with $%s", baseKey, FS_S3A_PREFIX));
// if there's a bucket, work with it
if (StringUtils.isNotEmpty(bucket)) {
String subkey = baseKey.substring(FS_S3A_PREFIX.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ public UploadPartRequest newUploadPartRequest(
// exactly one source must be set; xor verifies this
checkArgument((uploadStream != null) ^ (sourceFile != null),
"Data source");
checkArgument(size >= 0, "Invalid partition size %s", size);
checkArgument(size >= 0, String.format("Invalid partition size %s", size));
checkArgument(partNumber > 0 && partNumber <= 10000,
"partNumber must be between 1 and 10000 inclusive, but is %s",
partNumber);
String.format("partNumber must be between 1 and 10000 inclusive,"
+ " but is %s", partNumber));

LOG.debug("Creating part upload request for {} #{} size {}",
uploadId, partNumber, size);
Expand All @@ -391,11 +391,11 @@ public UploadPartRequest newUploadPartRequest(
request.setInputStream(uploadStream);
} else {
checkArgument(sourceFile.exists(),
"Source file does not exist: %s", sourceFile);
checkArgument(offset >= 0, "Invalid offset %s", offset);
String.format("Source file does not exist: %s", sourceFile));
checkArgument(offset >= 0, String.format("Invalid offset %s", offset));
long length = sourceFile.length();
checkArgument(offset == 0 || offset < length,
"Offset %s beyond length of file %s", offset, length);
String.format("Offset %s beyond length of file %s", offset, length));
request.setFile(sourceFile);
request.setFileOffset(offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ public Policy(RoleModel.Statement... statements) {
@Override
public void validate() {
checkNotNull(statement, "Statement");
checkState(VERSION.equals(version), "Invalid Version: %s", version);
checkState(VERSION.equals(version),
String.format("Invalid Version: %s", version));
statement.stream().forEach((a) -> a.validate());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext context)
// get files on the local FS in the attempt path
Path attemptPath = getTaskAttemptPath(context);
Preconditions.checkNotNull(attemptPath,
"No attemptPath path in {}", this);
"No attemptPath path in " + this);

LOG.debug("Scanning {} for files to commit", attemptPath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,19 @@ private void checkChildPath(Path childPath) {
URI parentUri = path.toUri();
if (parentUri.getHost() != null) {
URI childUri = childPath.toUri();
Preconditions.checkNotNull(childUri.getHost(), "Expected non-null URI " +
"host: %s", childUri);
Preconditions.checkNotNull(childUri.getHost(),
String.format("Expected non-null URI host: %s", childUri));
Preconditions.checkArgument(
childUri.getHost().equals(parentUri.getHost()),
"childUri %s and parentUri %s must have the same host",
childUri, parentUri);
Preconditions.checkNotNull(childUri.getScheme(), "No scheme in path %s",
childUri);
String.format("childUri %s and parentUri %s must have the same host",
childUri, parentUri));
Preconditions.checkNotNull(childUri.getScheme(),
String.format("No scheme in path %s", childUri));
}
Preconditions.checkArgument(!childPath.isRoot(),
"childPath cannot be the root path: %s", childPath);
String.format("childPath cannot be the root path: %s", childPath));
Preconditions.checkArgument(childPath.getParent().equals(path),
"childPath %s must be a child of %s", childPath, path);
String.format("childPath %s must be a child of %s", childPath, path));
}

/**
Expand All @@ -296,9 +296,9 @@ private Path childStatusToPathKey(FileStatus status) {
Path p = status.getPath();
Preconditions.checkNotNull(p, "Child status' path cannot be null");
Preconditions.checkArgument(!p.isRoot(),
"childPath cannot be the root path: %s", p);
String.format("childPath cannot be the root path: %s", p));
Preconditions.checkArgument(p.getParent().equals(path),
"childPath %s must be a child of %s", p, path);
String.format("childPath %s must be a child of %s", p, path));
URI uri = p.toUri();
URI parentUri = path.toUri();
// If FileStatus' path is missing host, but should have one, add it.
Expand All @@ -317,6 +317,7 @@ private Path childStatusToPathKey(FileStatus status) {

private void checkPathAbsolute(Path p) {
Preconditions.checkNotNull(p, "path must be non-null");
Preconditions.checkArgument(p.isAbsolute(), "path must be absolute: %s", p);
Preconditions.checkArgument(p.isAbsolute(),
String.format("path must be absolute: %s", p));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1420,14 +1420,15 @@ DynamoDB getDynamoDB() {
*/
private Path checkPath(Path path) {
Preconditions.checkNotNull(path);
Preconditions.checkArgument(path.isAbsolute(), "Path %s is not absolute",
path);
Preconditions.checkArgument(path.isAbsolute(),
String.format("Path %s is not absolute", path));
URI uri = path.toUri();
Preconditions.checkNotNull(uri.getScheme(), "Path %s missing scheme", path);
Preconditions.checkNotNull(uri.getScheme(),
String.format("Path %s missing scheme", path));
Preconditions.checkArgument(uri.getScheme().equals(Constants.FS_S3A),
"Path %s scheme must be %s", path, Constants.FS_S3A);
Preconditions.checkArgument(!StringUtils.isEmpty(uri.getHost()), "Path %s" +
" is missing bucket.", path);
String.format("Path %s scheme must be %s", path, Constants.FS_S3A));
Preconditions.checkArgument(!StringUtils.isEmpty(uri.getHost()),
String.format("Path %s is missing bucket.", path));
return path;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ static DDBPathMetadata itemToPathMetadata(Item item, String username,
}

String parentStr = item.getString(PARENT);
Preconditions.checkNotNull(parentStr, "No parent entry in item %s", item);
Preconditions.checkNotNull(parentStr,
String.format("No parent entry in item %s", item));
String childStr = item.getString(CHILD);
Preconditions.checkNotNull(childStr, "No child entry in item %s", item);
Preconditions.checkNotNull(childStr,
String.format("No child entry in item %s", item));

// Skip table version markers, which are only non-absolute paths stored.
Path rawPath = new Path(parentStr, childStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ void initS3AFileSystem(String path) throws IOException {
S3_METADATA_STORE_IMPL);
LOG.debug("updated bucket store option {}", updatedBucketOption);
Preconditions.checkState(S3GUARD_METASTORE_NULL.equals(updatedBucketOption),
"Expected bucket option to be %s but was %s",
S3GUARD_METASTORE_NULL, updatedBucketOption);
String.format("Expected bucket option to be %s but was %s",
S3GUARD_METASTORE_NULL, updatedBucketOption));

FileSystem fs = FileSystem.newInstance(uri, conf);
if (!(fs instanceof S3AFileSystem)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hadoop.fs.s3a;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.StopWatch;
Expand All @@ -33,6 +32,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -70,7 +70,7 @@ public static void afterClass() throws Exception {
@Test
public void testSubmitCallable() throws Exception {
ensureCreated();
ListenableFuture<Integer> f = tpe.submit(callableSleeper);
Future<Integer> f = tpe.submit(callableSleeper);
Integer v = f.get();
assertEquals(SOME_VALUE, v);
}
Expand Down

0 comments on commit b1294e4

Please sign in to comment.