diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java index ac4e3a32567b..c7e08ee2eaa7 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java @@ -90,6 +90,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.concurrent.atomic.AtomicBoolean; import static com.amazonaws.regions.Regions.US_EAST_1; @@ -275,38 +276,53 @@ public FileStatus[] listStatus(Path path) return toArray(list, LocatedFileStatus.class); } + @Override + public RemoteIterator listFiles(Path path, boolean recursive) + { + // Either a single level or full listing, depending on the recursive flag, no "directories" are included + return new S3ObjectsRemoteIterator(listPrefix(path, OptionalInt.empty(), recursive ? ListingMode.RECURSIVE_FILES_ONLY : ListingMode.SHALLOW_FILES_ONLY)); + } + @Override public RemoteIterator listLocatedStatus(Path path) { STATS.newListLocatedStatusCall(); - return new RemoteIterator() + return new S3ObjectsRemoteIterator(listPrefix(path, OptionalInt.empty(), ListingMode.SHALLOW_ALL)); + } + + private static final class S3ObjectsRemoteIterator + implements RemoteIterator + { + private final Iterator iterator; + + public S3ObjectsRemoteIterator(Iterator iterator) { - private final Iterator iterator = listPrefix(path); + this.iterator = requireNonNull(iterator, "iterator is null"); + } - @Override - public boolean hasNext() - throws IOException - { - try { - return iterator.hasNext(); - } - catch (AmazonClientException e) { - throw new IOException(e); - } + @Override + public boolean hasNext() + throws IOException + { + try { + return iterator.hasNext(); } + catch (AmazonClientException e) { + throw new IOException(e); + } + } - @Override - public LocatedFileStatus next() - throws IOException - { - try { - return iterator.next(); - } - catch (AmazonClientException e) { - throw new IOException(e); - } + @Override + public LocatedFileStatus next() + throws IOException + { + try { + return iterator.next(); } - }; + catch (AmazonClientException e) { + throw new IOException(e); + } + } } @Override @@ -325,7 +341,7 @@ public FileStatus getFileStatus(Path path) if (metadata == null) { // check if this path is a directory - Iterator iterator = listPrefix(path); + Iterator iterator = listPrefix(path, OptionalInt.of(1), ListingMode.SHALLOW_ALL); if (iterator.hasNext()) { return new FileStatus(0, true, 1, 0, 0, qualifiedPath(path)); } @@ -480,7 +496,18 @@ public boolean mkdirs(Path f, FsPermission permission) return true; } - private Iterator listPrefix(Path path) + private enum ListingMode { + SHALLOW_ALL, // Shallow listing of files AND directories + SHALLOW_FILES_ONLY, + RECURSIVE_FILES_ONLY; + + public boolean isFilesOnly() + { + return (this == SHALLOW_FILES_ONLY || this == RECURSIVE_FILES_ONLY); + } + } + + private Iterator listPrefix(Path path, OptionalInt initialMaxKeys, ListingMode mode) { String key = keyFromPath(path); if (!key.isEmpty()) { @@ -490,7 +517,8 @@ private Iterator listPrefix(Path path) ListObjectsRequest request = new ListObjectsRequest() .withBucketName(getBucketName(uri)) .withPrefix(key) - .withDelimiter(PATH_SEPARATOR); + .withDelimiter(mode == ListingMode.RECURSIVE_FILES_ONLY ? null : PATH_SEPARATOR) + .withMaxKeys(initialMaxKeys.isPresent() ? initialMaxKeys.getAsInt() : null); STATS.newListObjectsCall(); Iterator listings = new AbstractSequentialIterator(s3.listObjects(request)) @@ -501,23 +529,39 @@ protected ObjectListing computeNext(ObjectListing previous) if (!previous.isTruncated()) { return null; } + // Clear any max keys set for the initial request before submitting subsequent requests. Values < 0 + // are not sent in the request and the default limit is used + previous.setMaxKeys(-1); return s3.listNextBatchOfObjects(previous); } }; - return Iterators.concat(Iterators.transform(listings, this::statusFromListing)); + Iterator result = Iterators.concat(Iterators.transform(listings, this::statusFromListing)); + if (mode.isFilesOnly()) { + // Even recursive listing can still contain empty "directory" objects, must filter them out + result = Iterators.filter(result, LocatedFileStatus::isFile); + } + return result; } private Iterator statusFromListing(ObjectListing listing) { + List prefixes = listing.getCommonPrefixes(); + List objects = listing.getObjectSummaries(); + if (prefixes.isEmpty()) { + return statusFromObjects(objects); + } + if (objects.isEmpty()) { + return statusFromPrefixes(prefixes); + } return Iterators.concat( - statusFromPrefixes(listing.getCommonPrefixes()), - statusFromObjects(listing.getObjectSummaries())); + statusFromPrefixes(prefixes), + statusFromObjects(objects)); } private Iterator statusFromPrefixes(List prefixes) { - List list = new ArrayList<>(); + List list = new ArrayList<>(prefixes.size()); for (String prefix : prefixes) { Path path = qualifiedPath(new Path(PATH_SEPARATOR + prefix)); FileStatus status = new FileStatus(0, true, 1, 0, 0, path); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/s3/TestPrestoS3FileSystem.java b/presto-hive/src/test/java/com/facebook/presto/hive/s3/TestPrestoS3FileSystem.java index 4d8a17bbc8b5..fc7d7df3f06a 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/s3/TestPrestoS3FileSystem.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/s3/TestPrestoS3FileSystem.java @@ -29,16 +29,22 @@ import com.amazonaws.services.s3.model.EncryptionMaterialsProvider; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.StorageClass; import com.facebook.presto.hive.s3.PrestoS3FileSystem.UnrecoverableS3OperationException; import com.google.common.base.VerifyException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.testng.SkipException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -51,6 +57,10 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -83,6 +93,7 @@ import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.createTempFile; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; public class TestPrestoS3FileSystem @@ -664,4 +675,71 @@ public S3ObjectInputStream getObjectContent() } } } + + @Test + public void testListPrefixModes() + throws Exception + { + S3ObjectSummary rootObject = new S3ObjectSummary(); + rootObject.setStorageClass(StorageClass.Standard.toString()); + rootObject.setKey("standard-object-at-root.txt"); + rootObject.setLastModified(new Date()); + + S3ObjectSummary childObject = new S3ObjectSummary(); + childObject.setStorageClass(StorageClass.Standard.toString()); + childObject.setKey("prefix/child-object.txt"); + childObject.setLastModified(new Date()); + + try (PrestoS3FileSystem fs = new PrestoS3FileSystem()) { + MockAmazonS3 s3 = new MockAmazonS3() + { + @Override + public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) + { + ObjectListing listing = new ObjectListing(); + // Shallow listing + if ("/".equals(listObjectsRequest.getDelimiter())) { + listing.getCommonPrefixes().add("prefix"); + listing.getObjectSummaries().add(rootObject); + return listing; + } + // Recursive listing of object keys only + listing.getObjectSummaries().addAll(Arrays.asList(childObject, rootObject)); + return listing; + } + }; + Path rootPath = new Path("s3n://test-bucket/"); + fs.initialize(rootPath.toUri(), new Configuration()); + fs.setS3Client(s3); + + List shallowAll = remoteIteratorToList(fs.listLocatedStatus(rootPath)); + assertEquals(shallowAll.size(), 2); + assertTrue(shallowAll.get(0).isDirectory()); + assertFalse(shallowAll.get(1).isDirectory()); + assertEquals(shallowAll.get(0).getPath(), new Path(rootPath, "prefix")); + assertEquals(shallowAll.get(1).getPath(), new Path(rootPath, rootObject.getKey())); + + List shallowFiles = remoteIteratorToList(fs.listFiles(rootPath, false)); + assertEquals(shallowFiles.size(), 1); + assertFalse(shallowFiles.get(0).isDirectory()); + assertEquals(shallowFiles.get(0).getPath(), new Path(rootPath, rootObject.getKey())); + + List recursiveFiles = remoteIteratorToList(fs.listFiles(rootPath, true)); + assertEquals(recursiveFiles.size(), 2); + assertFalse(recursiveFiles.get(0).isDirectory()); + assertFalse(recursiveFiles.get(1).isDirectory()); + assertEquals(recursiveFiles.get(0).getPath(), new Path(rootPath, childObject.getKey())); + assertEquals(recursiveFiles.get(1).getPath(), new Path(rootPath, rootObject.getKey())); + } + } + + private static List remoteIteratorToList(RemoteIterator statuses) + throws IOException + { + List result = new ArrayList<>(); + while (statuses.hasNext()) { + result.add(statuses.next()); + } + return result; + } }