Skip to content

Commit

Permalink
fix: update request handling of gRPC based CopyWriter (#2858)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenWhitehead authored Jan 6, 2025
1 parent 46e964f commit 093cb87
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ final class GapicCopyWriter extends CopyWriter {
private final GrpcStorageOptions options;
private final UnaryCallable<RewriteObjectRequest, RewriteResponse> callable;
private final ResultRetryAlgorithm<?> alg;
private final RewriteObjectRequest originalRequest;
private final RewriteResponse initialResponse;

private RewriteResponse mostRecentResponse;
Expand All @@ -39,13 +40,15 @@ final class GapicCopyWriter extends CopyWriter {
GrpcStorageImpl storage,
UnaryCallable<RewriteObjectRequest, RewriteResponse> callable,
ResultRetryAlgorithm<?> alg,
RewriteObjectRequest originalRequest,
RewriteResponse initialResponse) {
this.storage = storage;
this.options = storage.getOptions();
this.callable = callable;
this.alg = alg;
this.initialResponse = initialResponse;
this.mostRecentResponse = initialResponse;
this.originalRequest = originalRequest;
}

@Override
Expand Down Expand Up @@ -76,9 +79,7 @@ public long getTotalBytesCopied() {
public void copyChunk() {
if (!isDone()) {
RewriteObjectRequest req =
RewriteObjectRequest.newBuilder()
.setRewriteToken(mostRecentResponse.getRewriteToken())
.build();
originalRequest.toBuilder().setRewriteToken(mostRecentResponse.getRewriteToken()).build();
GrpcCallContext retryContext = Retrying.newCallContext();
mostRecentResponse =
Retrying.run(options, alg, () -> callable.call(req, retryContext), Decoder.identity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,17 +656,24 @@ public CopyWriter copy(CopyRequest copyRequest) {
RewriteObjectRequest.newBuilder()
.setDestinationName(dstProto.getName())
.setDestinationBucket(dstProto.getBucket())
// destination_kms_key comes from dstOpts
// according to the docs in the protos, it is illegal to populate the following fields,
// clear them out if they are set
// destination_predefined_acl comes from dstOpts
// if_*_match come from srcOpts and dstOpts
// copy_source_encryption_* come from srcOpts
// common_object_request_params come from dstOpts
.setDestination(dstProto.toBuilder().clearName().clearBucket().clearKmsKey().build())
.setSourceBucket(srcProto.getBucket())
.setSourceObject(srcProto.getName());

// according to the docs in the protos, it is illegal to populate the following fields,
// clear them out if they are set
// * destination_kms_key comes from dstOpts
// * destination_predefined_acl comes from dstOpts
// * if_*_match come from srcOpts and dstOpts
// * copy_source_encryption_* come from srcOpts
// * common_object_request_params come from dstOpts
Object cleanedDst = dstProto.toBuilder().clearName().clearBucket().clearKmsKey().build();
// only set the destination if it is not equal to the default instance
// otherwise we will clobber default values populated in the gcs server side for the object
// metadata
if (!cleanedDst.equals(Object.getDefaultInstance())) {
b.setDestination(cleanedDst);
}

if (src.getGeneration() != null) {
b.setSourceGeneration(src.getGeneration());
}
Expand All @@ -685,7 +692,8 @@ public CopyWriter copy(CopyRequest copyRequest) {
getOptions(),
retryAlgorithmManager.getFor(req),
() -> callable.call(req, retryContext),
(resp) -> new GapicCopyWriter(this, callable, retryAlgorithmManager.idempotent(), resp));
(resp) ->
new GapicCopyWriter(this, callable, retryAlgorithmManager.idempotent(), req, resp));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.CopyWriter;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.PackagePrivateMethodWorkarounds;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
Expand Down Expand Up @@ -562,7 +563,6 @@ public void testListBlobsCurrentDirectoryIncludesBothObjectsAndSyntheticDirector
}

@Test
// When gRPC support is added for matchGlob, enable this test for gRPC.
public void testListBlobsWithMatchGlob() throws Exception {
BucketInfo bucketInfo = BucketInfo.newBuilder(generator.randomBucketName()).build();
try (TemporaryBucket tempBucket =
Expand Down Expand Up @@ -848,8 +848,6 @@ public void testComposeBlobFail() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlob() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand All @@ -872,8 +870,35 @@ public void testCopyBlob() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void copyBlob_classChange_multipleChunks() {

String sourceBlobName = generator.randomObjectName() + "-source";
BlobId source = BlobId.of(bucket.getName(), sourceBlobName);
ImmutableMap<String, String> metadata = ImmutableMap.of("k", "v");
BlobInfo blob = BlobInfo.newBuilder(source).setMetadata(metadata).build();
int _5MiB = 5 * 1024 * 1024;
byte[] bytes = DataGenerator.base64Characters().genBytes(_5MiB);
Blob remoteBlob = storage.create(blob, bytes);
assertThat(remoteBlob).isNotNull();
String targetBlobName = generator.randomObjectName() + "-target";
CopyRequest req =
CopyRequest.newBuilder()
.setSource(source)
.setTarget(
BlobInfo.newBuilder(bucket, targetBlobName)
// change the storage class to force GCS to copy bytes
.setStorageClass(StorageClass.NEARLINE)
.build(),
BlobTargetOption.doesNotExist())
.setMegabytesCopiedPerChunk(2L)
.build();
CopyWriter copyWriter = storage.copy(req);
BlobInfo remoteBlob2 = copyWriter.getResult();
assertThat(copyWriter.isDone()).isTrue();
assertThat(remoteBlob2).isNotNull();
}

@Test
public void testCopyBlobWithPredefinedAcl() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand Down Expand Up @@ -903,8 +928,6 @@ public void testCopyBlobWithPredefinedAcl() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlobWithEncryptionKeys() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand Down Expand Up @@ -955,8 +978,6 @@ public void testCopyBlobWithEncryptionKeys() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlobUpdateMetadata() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand All @@ -981,9 +1002,7 @@ public void testCopyBlobUpdateMetadata() {
assertTrue(storage.delete(bucket.getName(), targetBlobName));
}

// Re-enable this test when it stops failing
// @Test
@Exclude(transports = Transport.GRPC)
@Test
public void testCopyBlobUpdateStorageClass() {
String sourceBlobName = generator.randomObjectName() + "-source";
BlobId source = BlobId.of(bucket.getName(), sourceBlobName);
Expand All @@ -1007,8 +1026,6 @@ public void testCopyBlobUpdateStorageClass() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlobNoContentType() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand All @@ -1022,17 +1039,16 @@ public void testCopyBlobNoContentType() {
CopyWriter copyWriter = storage.copy(req);
assertEquals(bucket.getName(), copyWriter.getResult().getBucket());
assertEquals(targetBlobName, copyWriter.getResult().getName());
assertNull(copyWriter.getResult().getContentType());
assertTrue(
copyWriter.getResult().getContentType() == null
|| copyWriter.getResult().getContentType().isEmpty());
assertEquals(metadata, copyWriter.getResult().getMetadata());
assertTrue(copyWriter.isDone());
assertTrue(remoteSourceBlob.delete());
assertTrue(storage.delete(bucket.getName(), targetBlobName));
}

@Test
// Verified against testbench
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlobFail() {

String sourceBlobName = "test-copy-blob-source-fail";
Expand Down

0 comments on commit 093cb87

Please sign in to comment.