Skip to content

Commit

Permalink
Implement PrestoS3FileSystem#listFiles for direct recursive listings
Browse files Browse the repository at this point in the history
Implements FileSystem#listFiles(Path, boolean recursive) for
PrestoS3FileSystem which in theory adds support for directly listing
S3 files underneath a prefix without recursive calls through each
"directory". This direct traversal requires much fewer requests
when dealing with nested directories but may violate some PathFilter
implementation's expectation of being called at each directory level,
and may perform worse when a large number objects are contained within
hidden paths (since filtering would be performed after the fact). I'm
open to suggestions about how to balance this trade-off and integrate
this with the DirectoryLister.

Incidentally, a straightforward improvement to getFileStatus fell out
of the implementation allowing the isDir check for a path prefix (with
no object present) to be done by limiting the listing result size to 1
instead of listing the full default 1000.
  • Loading branch information
pettyjamesm authored and zhenxiao committed Aug 21, 2020
1 parent e7af71b commit 95725f6
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.amazonaws.regions.Regions.US_EAST_1;
Expand Down Expand Up @@ -275,38 +276,53 @@ public FileStatus[] listStatus(Path path)
return toArray(list, LocatedFileStatus.class);
}

@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path path, boolean recursive)
{
// Either a single level or full listing, depending on the recursive flag, no "directories" are included
return new S3ObjectsRemoteIterator(listPrefix(path, OptionalInt.empty(), recursive ? ListingMode.RECURSIVE_FILES_ONLY : ListingMode.SHALLOW_FILES_ONLY));
}

@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path path)
{
STATS.newListLocatedStatusCall();
return new RemoteIterator<LocatedFileStatus>()
return new S3ObjectsRemoteIterator(listPrefix(path, OptionalInt.empty(), ListingMode.SHALLOW_ALL));
}

private static final class S3ObjectsRemoteIterator
implements RemoteIterator<LocatedFileStatus>
{
private final Iterator<LocatedFileStatus> iterator;

public S3ObjectsRemoteIterator(Iterator<LocatedFileStatus> iterator)
{
private final Iterator<LocatedFileStatus> iterator = listPrefix(path);
this.iterator = requireNonNull(iterator, "iterator is null");
}

@Override
public boolean hasNext()
throws IOException
{
try {
return iterator.hasNext();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
@Override
public boolean hasNext()
throws IOException
{
try {
return iterator.hasNext();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
}

@Override
public LocatedFileStatus next()
throws IOException
{
try {
return iterator.next();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
@Override
public LocatedFileStatus next()
throws IOException
{
try {
return iterator.next();
}
};
catch (AmazonClientException e) {
throw new IOException(e);
}
}
}

@Override
Expand All @@ -325,7 +341,7 @@ public FileStatus getFileStatus(Path path)

if (metadata == null) {
// check if this path is a directory
Iterator<LocatedFileStatus> iterator = listPrefix(path);
Iterator<LocatedFileStatus> iterator = listPrefix(path, OptionalInt.of(1), ListingMode.SHALLOW_ALL);
if (iterator.hasNext()) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath(path));
}
Expand Down Expand Up @@ -480,7 +496,18 @@ public boolean mkdirs(Path f, FsPermission permission)
return true;
}

private Iterator<LocatedFileStatus> listPrefix(Path path)
private enum ListingMode {
SHALLOW_ALL, // Shallow listing of files AND directories
SHALLOW_FILES_ONLY,
RECURSIVE_FILES_ONLY;

public boolean isFilesOnly()
{
return (this == SHALLOW_FILES_ONLY || this == RECURSIVE_FILES_ONLY);
}
}

private Iterator<LocatedFileStatus> listPrefix(Path path, OptionalInt initialMaxKeys, ListingMode mode)
{
String key = keyFromPath(path);
if (!key.isEmpty()) {
Expand All @@ -490,7 +517,8 @@ private Iterator<LocatedFileStatus> listPrefix(Path path)
ListObjectsRequest request = new ListObjectsRequest()
.withBucketName(getBucketName(uri))
.withPrefix(key)
.withDelimiter(PATH_SEPARATOR);
.withDelimiter(mode == ListingMode.RECURSIVE_FILES_ONLY ? null : PATH_SEPARATOR)
.withMaxKeys(initialMaxKeys.isPresent() ? initialMaxKeys.getAsInt() : null);

STATS.newListObjectsCall();
Iterator<ObjectListing> listings = new AbstractSequentialIterator<ObjectListing>(s3.listObjects(request))
Expand All @@ -501,23 +529,39 @@ protected ObjectListing computeNext(ObjectListing previous)
if (!previous.isTruncated()) {
return null;
}
// Clear any max keys set for the initial request before submitting subsequent requests. Values < 0
// are not sent in the request and the default limit is used
previous.setMaxKeys(-1);
return s3.listNextBatchOfObjects(previous);
}
};

return Iterators.concat(Iterators.transform(listings, this::statusFromListing));
Iterator<LocatedFileStatus> result = Iterators.concat(Iterators.transform(listings, this::statusFromListing));
if (mode.isFilesOnly()) {
// Even recursive listing can still contain empty "directory" objects, must filter them out
result = Iterators.filter(result, LocatedFileStatus::isFile);
}
return result;
}

private Iterator<LocatedFileStatus> statusFromListing(ObjectListing listing)
{
List<String> prefixes = listing.getCommonPrefixes();
List<S3ObjectSummary> objects = listing.getObjectSummaries();
if (prefixes.isEmpty()) {
return statusFromObjects(objects);
}
if (objects.isEmpty()) {
return statusFromPrefixes(prefixes);
}
return Iterators.concat(
statusFromPrefixes(listing.getCommonPrefixes()),
statusFromObjects(listing.getObjectSummaries()));
statusFromPrefixes(prefixes),
statusFromObjects(objects));
}

private Iterator<LocatedFileStatus> statusFromPrefixes(List<String> prefixes)
{
List<LocatedFileStatus> list = new ArrayList<>();
List<LocatedFileStatus> list = new ArrayList<>(prefixes.size());
for (String prefix : prefixes) {
Path path = qualifiedPath(new Path(PATH_SEPARATOR + prefix));
FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.facebook.presto.hive.s3.PrestoS3FileSystem.UnrecoverableS3OperationException;
import com.google.common.base.VerifyException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -51,6 +57,10 @@
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -83,6 +93,7 @@
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.Files.createTempFile;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestPrestoS3FileSystem
Expand Down Expand Up @@ -664,4 +675,71 @@ public S3ObjectInputStream getObjectContent()
}
}
}

@Test
public void testListPrefixModes()
throws Exception
{
S3ObjectSummary rootObject = new S3ObjectSummary();
rootObject.setStorageClass(StorageClass.Standard.toString());
rootObject.setKey("standard-object-at-root.txt");
rootObject.setLastModified(new Date());

S3ObjectSummary childObject = new S3ObjectSummary();
childObject.setStorageClass(StorageClass.Standard.toString());
childObject.setKey("prefix/child-object.txt");
childObject.setLastModified(new Date());

try (PrestoS3FileSystem fs = new PrestoS3FileSystem()) {
MockAmazonS3 s3 = new MockAmazonS3()
{
@Override
public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
{
ObjectListing listing = new ObjectListing();
// Shallow listing
if ("/".equals(listObjectsRequest.getDelimiter())) {
listing.getCommonPrefixes().add("prefix");
listing.getObjectSummaries().add(rootObject);
return listing;
}
// Recursive listing of object keys only
listing.getObjectSummaries().addAll(Arrays.asList(childObject, rootObject));
return listing;
}
};
Path rootPath = new Path("s3n://test-bucket/");
fs.initialize(rootPath.toUri(), new Configuration());
fs.setS3Client(s3);

List<LocatedFileStatus> shallowAll = remoteIteratorToList(fs.listLocatedStatus(rootPath));
assertEquals(shallowAll.size(), 2);
assertTrue(shallowAll.get(0).isDirectory());
assertFalse(shallowAll.get(1).isDirectory());
assertEquals(shallowAll.get(0).getPath(), new Path(rootPath, "prefix"));
assertEquals(shallowAll.get(1).getPath(), new Path(rootPath, rootObject.getKey()));

List<LocatedFileStatus> shallowFiles = remoteIteratorToList(fs.listFiles(rootPath, false));
assertEquals(shallowFiles.size(), 1);
assertFalse(shallowFiles.get(0).isDirectory());
assertEquals(shallowFiles.get(0).getPath(), new Path(rootPath, rootObject.getKey()));

List<LocatedFileStatus> recursiveFiles = remoteIteratorToList(fs.listFiles(rootPath, true));
assertEquals(recursiveFiles.size(), 2);
assertFalse(recursiveFiles.get(0).isDirectory());
assertFalse(recursiveFiles.get(1).isDirectory());
assertEquals(recursiveFiles.get(0).getPath(), new Path(rootPath, childObject.getKey()));
assertEquals(recursiveFiles.get(1).getPath(), new Path(rootPath, rootObject.getKey()));
}
}

private static List<LocatedFileStatus> remoteIteratorToList(RemoteIterator<LocatedFileStatus> statuses)
throws IOException
{
List<LocatedFileStatus> result = new ArrayList<>();
while (statuses.hasNext()) {
result.add(statuses.next());
}
return result;
}
}

0 comments on commit 95725f6

Please sign in to comment.