-
Notifications
You must be signed in to change notification settings - Fork 241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move list object API to java storage. #1083
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,11 @@ | |
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException; | ||
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.decodeMetadata; | ||
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.encodeMetadata; | ||
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo.createInferredDirectory; | ||
import static com.google.common.base.Preconditions.checkArgument; | ||
import static com.google.common.base.Preconditions.checkNotNull; | ||
import static com.google.common.base.Preconditions.checkState; | ||
import static com.google.common.base.Strings.emptyToNull; | ||
import static com.google.common.base.Strings.isNullOrEmpty; | ||
import static java.lang.Math.toIntExact; | ||
|
||
|
@@ -54,6 +56,7 @@ | |
import com.google.cloud.storage.Storage; | ||
import com.google.cloud.storage.Storage.BlobField; | ||
import com.google.cloud.storage.Storage.BlobGetOption; | ||
import com.google.cloud.storage.Storage.BlobListOption; | ||
import com.google.cloud.storage.Storage.BlobSourceOption; | ||
import com.google.cloud.storage.Storage.BlobTargetOption; | ||
import com.google.cloud.storage.Storage.BucketField; | ||
|
@@ -77,9 +80,13 @@ | |
import java.nio.file.FileAlreadyExistsException; | ||
import java.nio.file.Paths; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentHashMap.KeySetView; | ||
import java.util.concurrent.ExecutorService; | ||
|
@@ -95,7 +102,6 @@ | |
*/ | ||
@VisibleForTesting | ||
public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage { | ||
|
||
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); | ||
|
||
// Maximum number of times to retry deletes in the case of precondition failures. | ||
|
@@ -442,6 +448,235 @@ public void deleteBuckets(List<String> bucketNames) throws IOException { | |
} | ||
} | ||
|
||
/** | ||
* @see GoogleCloudStorage#listObjectInfo(String, String, ListObjectOptions) | ||
*/ | ||
@Override | ||
public List<GoogleCloudStorageItemInfo> listObjectInfo( | ||
String bucketName, String objectNamePrefix, ListObjectOptions listOptions) | ||
throws IOException { | ||
logger.atFiner().log("listObjectInfo(%s, %s, %s)", bucketName, objectNamePrefix, listOptions); | ||
List<GoogleCloudStorageItemInfo> objectInfos = new ArrayList<>(); | ||
|
||
String pageToken = null; | ||
do { | ||
pageToken = | ||
listObjectInfoPageInternal( | ||
bucketName, objectNamePrefix, objectInfos, listOptions, pageToken); | ||
} while (pageToken != null | ||
&& getMaxRemainingResults(listOptions.getMaxResults(), objectInfos) > 0); | ||
|
||
objectInfos.sort(Comparator.comparing(GoogleCloudStorageItemInfo::getObjectName)); | ||
|
||
return objectInfos; | ||
} | ||
|
||
/** | ||
* @see GoogleCloudStorage#listObjectInfoPage(String, String, ListObjectOptions, String) | ||
*/ | ||
@Override | ||
public ListPage<GoogleCloudStorageItemInfo> listObjectInfoPage( | ||
String bucketName, String objectNamePrefix, ListObjectOptions listOptions, String pageToken) | ||
throws IOException { | ||
logger.atFiner().log( | ||
"listObjectInfoPage(%s, %s, %s, %s)", bucketName, objectNamePrefix, listOptions, pageToken); | ||
|
||
checkArgument( | ||
listOptions.getMaxResults() == MAX_RESULTS_UNLIMITED, | ||
"maxResults should be unlimited for 'listObjectInfoPage' call, but was %s", | ||
listOptions.getMaxResults()); | ||
|
||
List<GoogleCloudStorageItemInfo> objectInfos = new ArrayList<>(); | ||
|
||
String nextPageToken = | ||
listObjectInfoPageInternal( | ||
bucketName, objectNamePrefix, objectInfos, listOptions, pageToken); | ||
return new ListPage<>(objectInfos, nextPageToken); | ||
} | ||
|
||
private String listObjectInfoPageInternal( | ||
String bucketName, | ||
String objectNamePrefix, | ||
List<GoogleCloudStorageItemInfo> objectInfos, | ||
ListObjectOptions listOptions, | ||
String pageToken) | ||
throws IOException { | ||
// TODO: Check if we need to set maxResults + 1 in case of listing prefixes. | ||
logger.atFiner().log("listObjectInfoPage(%s, %s)", objectInfos, listOptions); | ||
|
||
// Although GCS does not implement a file system, it treats objects that end | ||
// in delimiter as different from other objects when listing objects. | ||
// | ||
// If caller sends foo/ as the prefix, foo/ is returned as an object name. | ||
// That is inconsistent with listing items in a directory. | ||
// Not sure if that is a bug in GCS or the intended behavior. | ||
// | ||
// In this case, we do not want foo/ in the returned list because we want to | ||
// keep the behavior more like a file system without calling it as such. | ||
// Therefore, we filter out such entry. | ||
// Determine if the caller sent a directory name as a prefix. | ||
boolean objectPrefixEndsWithDelimiter = | ||
!isNullOrEmpty(objectNamePrefix) && objectNamePrefix.endsWith(PATH_DELIMITER); | ||
|
||
boolean objectPrefixIncluded = false; | ||
|
||
Iterator<Blob> blobIterator; | ||
String nextPageToken = null; | ||
try { | ||
Page<Blob> blobListPage = | ||
storage.list( | ||
bucketName, | ||
prepareBlobListOptions(listOptions, objectNamePrefix, pageToken) | ||
.toArray(new BlobListOption[0])); | ||
nextPageToken = emptyToNull(blobListPage.getNextPageToken()); | ||
blobIterator = blobListPage.getValues().iterator(); | ||
} catch (StorageException e) { | ||
String resource = StringPaths.fromComponents(bucketName, objectNamePrefix); | ||
if (errorExtractor.getErrorType(e) == ErrorType.NOT_FOUND) { | ||
logger.atFiner().withCause(e).log( | ||
"listStorageObjectsInternal(%s, %s): item not found", resource, listOptions); | ||
return null; | ||
} | ||
throw new IOException("Error listing " + resource, e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Veneer would have retried, correct? |
||
} | ||
|
||
while (blobIterator.hasNext() | ||
&& getMaxRemainingResults(listOptions.getMaxResults(), objectInfos) > 0) { | ||
Blob blob = blobIterator.next(); | ||
if (blob.isDirectory() && listOptions.isIncludePrefix()) { | ||
// Handle prefixes. | ||
objectInfos.add(createInferredDirectory(new StorageResourceId(bucketName, blob.getName()))); | ||
} else if (!objectPrefixEndsWithDelimiter || !blob.getName().equals(objectNamePrefix)) { | ||
// Handle objects. | ||
objectInfos.add(createItemInfoForBlob(blob)); | ||
} else if (blob.getName().equals(objectNamePrefix) && listOptions.isIncludePrefix()) { | ||
// Handle object prefixes. Object prefixes show up as non-directory. | ||
objectInfos.add(createItemInfoForBlob(blob)); | ||
objectPrefixIncluded = true; | ||
} | ||
} | ||
|
||
// Create inferred directory for the prefix object if necessary. | ||
if (listOptions.isIncludePrefix() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code duplication. Is there a way to avoid this? |
||
// Only add an inferred directory for non-null prefix name | ||
&& objectNamePrefix != null | ||
// Only add an inferred directory if listing in directory mode (non-flat listing) | ||
&& listOptions.getDelimiter() != null | ||
// Only add an inferred directory if listed any prefixes or objects, i.e. prefix "exists" | ||
&& !objectInfos.isEmpty() | ||
// Only add an inferred directory if prefix object is not listed already | ||
&& !objectPrefixIncluded) { | ||
objectInfos.add(createInferredDirectory(new StorageResourceId(bucketName, objectNamePrefix))); | ||
} | ||
return nextPageToken; | ||
} | ||
|
||
private static long getMaxRemainingResults( | ||
long maxResults, List<GoogleCloudStorageItemInfo> blobs) { | ||
if (maxResults <= 0) { | ||
return Long.MAX_VALUE; | ||
} | ||
return maxResults - blobs.size(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is different from Apiary implementation. Why? |
||
} | ||
|
||
private static BlobField getBlobField(String field) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is this getting used? |
||
Optional<BlobField> resultField = | ||
Arrays.stream(BlobField.values()) | ||
.filter(blobField -> blobField.getApiaryName().equals(field)) | ||
.findFirst(); | ||
return resultField.orElse(null); | ||
} | ||
|
||
private List<BlobListOption> prepareBlobListOptions( | ||
ListObjectOptions listOptions, | ||
@Nullable String objectNamePrefix, | ||
@Nullable String pageToken) { | ||
long maxResults = listOptions.getMaxResults(); | ||
|
||
List<BlobListOption> blobListOptions = new ArrayList<>(); | ||
|
||
blobListOptions.add( | ||
BlobListOption.pageSize( | ||
maxResults <= 0 || maxResults >= storageOptions.getMaxListItemsPerCall() | ||
? storageOptions.getMaxListItemsPerCall() | ||
: maxResults)); | ||
|
||
if (listOptions.getDelimiter() != null) { | ||
blobListOptions.add(BlobListOption.delimiter(listOptions.getDelimiter())); | ||
} | ||
if (objectNamePrefix != null) { | ||
blobListOptions.add(BlobListOption.prefix(objectNamePrefix)); | ||
} | ||
if (listOptions.getFields() != null) { | ||
blobListOptions.add( | ||
BlobListOption.fields( | ||
Arrays.stream(listOptions.getFields().split(",")) | ||
.map(GoogleCloudStorageClientImpl::getBlobField) | ||
.toArray(BlobField[]::new))); | ||
} | ||
if (pageToken != null) { | ||
blobListOptions.add(BlobListOption.pageToken(pageToken)); | ||
} | ||
return blobListOptions; | ||
} | ||
|
||
private static GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) { | ||
checkNotNull(blob, "object must not be null"); | ||
checkArgument(!isNullOrEmpty(blob.getBucket()), "object must have a bucket: %s", blob); | ||
checkArgument(!isNullOrEmpty(blob.getName()), "object must have a name: %s", blob); | ||
return createItemInfoForBlob(new StorageResourceId(blob.getBucket(), blob.getName()), blob); | ||
} | ||
|
||
private static GoogleCloudStorageItemInfo createItemInfoForBlob( | ||
StorageResourceId resourceId, Blob blob) { | ||
checkArgument(resourceId != null, "resourceId must not be null"); | ||
checkArgument(blob != null, "object must not be null"); | ||
checkArgument( | ||
resourceId.isStorageObject(), | ||
"resourceId must be a StorageObject. resourceId: %s", | ||
resourceId); | ||
checkArgument( | ||
resourceId.getBucketName().equals(blob.getBucket()), | ||
"resourceId.getBucketName() must equal object.getBucket(): '%s' vs '%s'", | ||
resourceId.getBucketName(), | ||
blob.getBucket()); | ||
checkArgument( | ||
resourceId.getObjectName().equals(blob.getName()), | ||
"resourceId.getObjectName() must equal object.getName(): '%s' vs '%s'", | ||
resourceId.getObjectName(), | ||
blob.getName()); | ||
|
||
Map<String, byte[]> decodedMetadata = | ||
blob.getMetadata() == null ? null : decodeMetadata(blob.getMetadata()); | ||
|
||
byte[] md5Hash = null; | ||
byte[] crc32c = null; | ||
|
||
if (!isNullOrEmpty(blob.getCrc32c())) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please try to avoid code duplication. |
||
crc32c = BaseEncoding.base64().decode(blob.getCrc32c()); | ||
} | ||
|
||
if (!isNullOrEmpty(blob.getMd5())) { | ||
md5Hash = BaseEncoding.base64().decode(blob.getMd5()); | ||
} | ||
|
||
return GoogleCloudStorageItemInfo.createObject( | ||
resourceId, | ||
blob.getCreateTimeOffsetDateTime() == null | ||
? 0 | ||
: blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(), | ||
blob.getUpdateTimeOffsetDateTime() == null | ||
? 0 | ||
: blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(), | ||
blob.getSize() == null ? 0 : blob.getSize(), | ||
blob.getContentType(), | ||
blob.getContentEncoding(), | ||
decodedMetadata, | ||
blob.getGeneration() == null ? 0 : blob.getGeneration(), | ||
blob.getMetageneration() == null ? 0 : blob.getMetageneration(), | ||
new VerificationAttributes(md5Hash, crc32c)); | ||
} | ||
|
||
@Override | ||
public SeekableByteChannel open( | ||
StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) throws IOException { | ||
|
@@ -661,57 +896,6 @@ private static ExecutorSupplier getPCUExecutorSupplier(ExecutorService pCUExecut | |
: ExecutorSupplier.useExecutor(pCUExecutorService); | ||
} | ||
|
||
/** Helper for converting a StorageResourceId + Blob into a GoogleCloudStorageItemInfo. */ | ||
private static GoogleCloudStorageItemInfo createItemInfoForBlob( | ||
StorageResourceId resourceId, Blob blob) { | ||
checkArgument(resourceId != null, "resourceId must not be null"); | ||
checkArgument(blob != null, "object must not be null"); | ||
checkArgument( | ||
resourceId.isStorageObject(), | ||
"resourceId must be a StorageObject. resourceId: %s", | ||
resourceId); | ||
checkArgument( | ||
resourceId.getBucketName().equals(blob.getBucket()), | ||
"resourceId.getBucketName() must equal object.getBucket(): '%s' vs '%s'", | ||
resourceId.getBucketName(), | ||
blob.getBucket()); | ||
checkArgument( | ||
resourceId.getObjectName().equals(blob.getName()), | ||
"resourceId.getObjectName() must equal object.getName(): '%s' vs '%s'", | ||
resourceId.getObjectName(), | ||
blob.getName()); | ||
|
||
Map<String, byte[]> decodedMetadata = | ||
blob.getMetadata() == null ? null : decodeMetadata(blob.getMetadata()); | ||
|
||
byte[] md5Hash = null; | ||
byte[] crc32c = null; | ||
|
||
if (!isNullOrEmpty(blob.getCrc32c())) { | ||
crc32c = BaseEncoding.base64().decode(blob.getCrc32c()); | ||
} | ||
|
||
if (!isNullOrEmpty(blob.getMd5())) { | ||
md5Hash = BaseEncoding.base64().decode(blob.getMd5()); | ||
} | ||
|
||
return GoogleCloudStorageItemInfo.createObject( | ||
resourceId, | ||
blob.getCreateTimeOffsetDateTime() == null | ||
? 0 | ||
: blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(), | ||
blob.getUpdateTimeOffsetDateTime() == null | ||
? 0 | ||
: blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(), | ||
blob.getSize() == null ? 0 : blob.getSize(), | ||
blob.getContentType(), | ||
blob.getContentEncoding(), | ||
decodedMetadata, | ||
blob.getGeneration() == null ? 0 : blob.getGeneration(), | ||
blob.getMetageneration() == null ? 0 : blob.getMetageneration(), | ||
new VerificationAttributes(md5Hash, crc32c)); | ||
} | ||
|
||
public static Builder builder() { | ||
return new AutoBuilder_GoogleCloudStorageClientImpl_Builder(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we include the e.message as well to the IOException message. Sometime some of the tools just log the message and skip the inner exception details.