From d44ccf8be83821aaefb45dbbae6cd3895ab3c9a7 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Mon, 25 Jun 2018 15:32:10 -0700 Subject: [PATCH 1/2] Change stub, databaseStub and instanceStub to spannerStub, databaseAdminStub and instanceAdminStub Return the original operation if updatabaseDdl fails with ALREADY_EXISTS --- .../com/google/cloud/spanner/SpannerImpl.java | 4 - .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 92 ++++++++++--------- .../spanner/DatabaseAdminClientImplTest.java | 26 +++--- .../cloud/spanner/it/ITDatabaseAdminTest.java | 7 +- 4 files changed, 68 insertions(+), 61 deletions(-) diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index e9245f2db3f2..94273b99b770 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -489,10 +489,6 @@ public OperationFuture updateDatabaseDdl( throws SpannerException { final String dbName = getDatabaseName(instanceId, databaseId); final String opId = operationId != null ? operationId : randomOperationId(); - // TODO(hzyi) - // Spanner checks the exception and if the error code is ALREADY_EXISTS - // it creates a new Operation instead of throwing the exception. This - // feature is not implemented in this PR but will come later OperationFuture rawOperationFuture = rpc.updateDatabaseDdl(dbName, statements, opId); return new OperationFutureImpl( diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index ad8f654b249c..dbf2ada3e006 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -18,7 +18,6 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; -import com.google.common.base.Preconditions; import com.google.api.core.ApiFunction; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.GaxProperties; @@ -27,18 +26,19 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiClientHeaderProvider; -import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.HeaderProvider; -import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.OperationCallable; +import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; -import com.google.api.gax.rpc.ResponseObserver; -import com.google.api.gax.rpc.StreamController; import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.ServiceOptions; import com.google.cloud.grpc.GrpcTransportOptions; +import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerOptions; @@ -53,6 +53,7 @@ import com.google.cloud.spanner.v1.stub.SpannerStub; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.longrunning.GetOperationRequest; import com.google.longrunning.Operation; @@ -110,10 +111,9 @@ public class GapicSpannerRpc implements SpannerRpc { PathTemplate.create("projects/{project}"); private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; - // TODO(hzyi): change the stub names to be more intuitive - private final SpannerStub stub; - private final InstanceAdminStub instanceStub; - private final DatabaseAdminStub databaseStub; + private final SpannerStub spannerStub; + private final InstanceAdminStub instanceAdminStub; + private final DatabaseAdminStub databaseAdminStub; private final String projectId; private final String projectName; private final SpannerMetadataProvider metadataProvider; @@ -126,9 +126,6 @@ public GapicSpannerRpc(SpannerOptions options) { this.projectId = options.getProjectId(); this.projectName = PROJECT_NAME_TEMPLATE.instantiate("project", this.projectId); - // TODO(hzyi): inject userAgent to headerProvider so that it - // can be picked up by ChannelProvider - // create a metadataProvider which combines both internal headers and // per-method-call extra headers for channelProvider to inject the headers // for rpc calls @@ -177,7 +174,7 @@ public GapicSpannerRpc(SpannerOptions options) { try { // TODO: bump the version of gax and remove this try-catch block // applyToAllUnaryMethods does not throw exception in the latest version - this.stub = + this.spannerStub = GrpcSpannerStub.create( SpannerStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) @@ -192,7 +189,7 @@ public Void apply(UnaryCallSettings.Builder builder) { }) .build()); - this.instanceStub = + this.instanceAdminStub = GrpcInstanceAdminStub.create( InstanceAdminStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) @@ -206,7 +203,7 @@ public Void apply(UnaryCallSettings.Builder builder) { } }) .build()); - this.databaseStub = + this.databaseAdminStub = GrpcDatabaseAdminStub.create( DatabaseAdminStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) @@ -237,7 +234,7 @@ public Paginated listInstanceConfigs(int pageSize, @Nullable Str GrpcCallContext context = newCallContext(null, projectName); ListInstanceConfigsResponse response = - get(instanceStub.listInstanceConfigsCallable().futureCall(request, context)); + get(instanceAdminStub.listInstanceConfigsCallable().futureCall(request, context)); return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken()); } @@ -247,7 +244,7 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build(); GrpcCallContext context = newCallContext(null, projectName); - return get(instanceStub.getInstanceConfigCallable().futureCall(request, context)); + return get(instanceAdminStub.getInstanceConfigCallable().futureCall(request, context)); } @Override @@ -265,7 +262,7 @@ public Paginated listInstances( GrpcCallContext context = newCallContext(null, projectName); ListInstancesResponse response = - get(instanceStub.listInstancesCallable().futureCall(request, context)); + get(instanceAdminStub.listInstancesCallable().futureCall(request, context)); return new Paginated<>(response.getInstancesList(), response.getNextPageToken()); } @@ -279,7 +276,7 @@ public OperationFuture createInstance( .setInstance(instance) .build(); GrpcCallContext context = newCallContext(null, parent); - return instanceStub.createInstanceOperationCallable().futureCall(request, context); + return instanceAdminStub.createInstanceOperationCallable().futureCall(request, context); } @Override @@ -288,7 +285,7 @@ public OperationFuture updateInstance( UpdateInstanceRequest request = UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build(); GrpcCallContext context = newCallContext(null, instance.getName()); - return instanceStub.updateInstanceOperationCallable().futureCall(request, context); + return instanceAdminStub.updateInstanceOperationCallable().futureCall(request, context); } @Override @@ -297,7 +294,7 @@ public Instance getInstance(String instanceName) throws SpannerException { GetInstanceRequest.newBuilder().setName(instanceName).build(); GrpcCallContext context = newCallContext(null, instanceName); - return get(instanceStub.getInstanceCallable().futureCall(request, context)); + return get(instanceAdminStub.getInstanceCallable().futureCall(request, context)); } @Override @@ -306,7 +303,7 @@ public void deleteInstance(String instanceName) throws SpannerException { DeleteInstanceRequest.newBuilder().setName(instanceName).build(); GrpcCallContext context = newCallContext(null, instanceName); - get(instanceStub.deleteInstanceCallable().futureCall(request, context)); + get(instanceAdminStub.deleteInstanceCallable().futureCall(request, context)); } @Override @@ -320,7 +317,7 @@ public Paginated listDatabases( ListDatabasesRequest request = requestBuilder.build(); GrpcCallContext context = newCallContext(null, instanceName); - ListDatabasesResponse response = get(databaseStub.listDatabasesCallable() + ListDatabasesResponse response = get(databaseAdminStub.listDatabasesCallable() .futureCall(request, context)); return new Paginated<>(response.getDatabasesList(), response.getNextPageToken()); } @@ -335,7 +332,7 @@ public OperationFuture createDatabase( .addAllExtraStatements(additionalStatements) .build(); GrpcCallContext context = newCallContext(null, instanceName); - return databaseStub.createDatabaseOperationCallable().futureCall(request, context); + return databaseAdminStub.createDatabaseOperationCallable().futureCall(request, context); } @Override @@ -348,7 +345,20 @@ public OperationFuture updateDatabaseDdl( .setOperationId(MoreObjects.firstNonNull(updateId, "")) .build(); GrpcCallContext context = newCallContext(null, databaseName); - return databaseStub.updateDatabaseDdlOperationCallable().futureCall(request, context); + OperationCallable callable = databaseAdminStub.updateDatabaseDdlOperationCallable(); + OperationFuture operationFuture = callable.futureCall(request, context); + try { + operationFuture.getInitialFuture().get(); + } catch (InterruptedException e) { + // The error would be handled in SpannerImpl, swallow it here + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof AlreadyExistsException) { + String operationName = String.format("%s/operations/%s", databaseName, updateId); + return callable.resumeFutureCall(operationName, context); + } + } + return operationFuture; } @Override @@ -357,7 +367,7 @@ public void dropDatabase(String databaseName) throws SpannerException { DropDatabaseRequest.newBuilder().setDatabase(databaseName).build(); GrpcCallContext context = newCallContext(null, databaseName); - get(databaseStub.dropDatabaseCallable().futureCall(request, context)); + get(databaseAdminStub.dropDatabaseCallable().futureCall(request, context)); } @Override @@ -368,7 +378,7 @@ public Database getDatabase(String databaseName) throws SpannerException { .build(); GrpcCallContext context = newCallContext(null, databaseName); - return get(databaseStub.getDatabaseCallable().futureCall(request, context)); + return get(databaseAdminStub.getDatabaseCallable().futureCall(request, context)); } @Override @@ -377,7 +387,7 @@ public List getDatabaseDdl(String databaseName) throws SpannerException GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build(); GrpcCallContext context = newCallContext(null, databaseName); - return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context)) + return get(databaseAdminStub.getDatabaseDdlCallable().futureCall(request, context)) .getStatementsList(); } @@ -385,7 +395,7 @@ public List getDatabaseDdl(String databaseName) throws SpannerException public Operation getOperation(String name) throws SpannerException { GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build(); GrpcCallContext context = newCallContext(null, name); - return get(databaseStub.getOperationsStub().getOperationCallable() + return get(databaseAdminStub.getOperationsStub().getOperationCallable() .futureCall(request, context)); } @@ -400,7 +410,7 @@ public Session createSession(String databaseName, @Nullable Map } CreateSessionRequest request = requestBuilder.build(); GrpcCallContext context = newCallContext(options, databaseName); - return get(stub.createSessionCallable().futureCall(request, context)); + return get(spannerStub.createSessionCallable().futureCall(request, context)); } @Override @@ -409,7 +419,7 @@ public void deleteSession(String sessionName, @Nullable Map options) DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build(); GrpcCallContext context = newCallContext(options, sessionName); - get(stub.deleteSessionCallable().futureCall(request, context)); + get(spannerStub.deleteSessionCallable().futureCall(request, context)); } @Override @@ -417,7 +427,7 @@ public StreamingCall read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options) { GrpcCallContext context = newCallContext(options, request.getSession()); SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); - stub.streamingReadCallable().call(request, responseObserver, context); + spannerStub.streamingReadCallable().call(request, responseObserver, context); final StreamController controller = responseObserver.getController(); return new StreamingCall() { @Override @@ -439,7 +449,7 @@ public StreamingCall executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { GrpcCallContext context = newCallContext(options, request.getSession()); SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); - stub.executeStreamingSqlCallable().call(request, responseObserver, context); + spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context); final StreamController controller = responseObserver.getController(); return new StreamingCall() { @Override @@ -460,35 +470,35 @@ public void cancel(String message) { public Transaction beginTransaction( BeginTransactionRequest request, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext(options, request.getSession()); - return get(stub.beginTransactionCallable().futureCall(request, context)); + return get(spannerStub.beginTransactionCallable().futureCall(request, context)); } @Override public CommitResponse commit(CommitRequest commitRequest, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext(options, commitRequest.getSession()); - return get(stub.commitCallable().futureCall(commitRequest, context)); + return get(spannerStub.commitCallable().futureCall(commitRequest, context)); } @Override public void rollback(RollbackRequest request, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext(options, request.getSession()); - get(stub.rollbackCallable().futureCall(request, context)); + get(spannerStub.rollbackCallable().futureCall(request, context)); } @Override public PartitionResponse partitionQuery( PartitionQueryRequest request, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext(options, request.getSession()); - return get(stub.partitionQueryCallable().futureCall(request, context)); + return get(spannerStub.partitionQueryCallable().futureCall(request, context)); } @Override public PartitionResponse partitionRead( PartitionReadRequest request, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext(options, request.getSession()); - return get(stub.partitionReadCallable().futureCall(request, context)); + return get(spannerStub.partitionReadCallable().futureCall(request, context)); } /** Gets the result of an async RPC call, handling any exceptions encountered. */ @@ -516,9 +526,9 @@ private GrpcCallContext newCallContext(@Nullable Map options, String } public void shutdown() { - this.stub.close(); - this.instanceStub.close(); - this.databaseStub.close(); + this.spannerStub.close(); + this.instanceAdminStub.close(); + this.databaseAdminStub.close(); } /** diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java index 7f206fa3bed7..42c1e886485d 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java @@ -115,21 +115,25 @@ public void updateDatabaseDdl() throws Exception { assertThat(op.getName()).isEqualTo(opName); } - @Ignore("More work needs to be done") @Test - // TODO(hzyi) - // Changing the surface to OperationFuture breaks updateDatabaseDdl in the case - // that there is already a longrunning operation running. Disabling this test for - // this PR and I will fix this in the next PR. public void updateDatabaseDdlOpAlreadyExists() throws Exception { - String opName = DB_NAME + "/operations/myop"; - String opId = "myop"; + String originalOpName = DB_NAME + "/operations/originalop"; + String originalOpId = "originalop"; List ddl = ImmutableList.of(); - when(rpc.updateDatabaseDdl(DB_NAME, ddl, opId)) - .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, "")); + OperationFuture originalOp = + OperationFutureUtil.immediateOperationFuture( + originalOpName, Empty.getDefaultInstance(), UpdateDatabaseDdlMetadata.getDefaultInstance()); + + String newOpName = DB_NAME + "/operations/newop"; + String newOpId = "newop"; + OperationFuture newop = + OperationFutureUtil.immediateOperationFuture( + newOpName, Empty.getDefaultInstance(), UpdateDatabaseDdlMetadata.getDefaultInstance()); + + when(rpc.updateDatabaseDdl(DB_NAME, ddl, newOpId)).thenReturn(originalOp); OperationFuture op = - client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, opId); - assertThat(op.getName()).isEqualTo(opName); + client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, newOpId); + assertThat(op.getName()).isEqualTo(originalOpName); } @Test diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java index 8be3bacf321d..f78e2c48b5c8 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java @@ -108,12 +108,7 @@ public void databaseOperations() throws Exception { db = dbAdminClient.getDatabase(testHelper.getInstanceId().getInstance(), dbId); } - @Ignore("More work needs to be done") @Test - // TODO(hzyi) - // Changing the surface to OperationFuture breaks updateDatabaseDdl in the case - // that there is already a longrunning operation running. Disabling this test for - // this PR and I will fix this in the next PR. public void updateDdlRetry() throws Exception { String dbId = testHelper.getUniqueDatabaseId(); String instanceId = testHelper.getInstanceId().getInstance(); @@ -127,6 +122,8 @@ public void updateDdlRetry() throws Exception { dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), "myop"); OperationFuture op2 = dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), "myop"); + op1.get(); + op2.get(); assertThat(op1.getMetadata().get()).isEqualTo(op2.getMetadata().get()); } From 86f5d119baaa2fe5634e44dbb4b1359834c8f386 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Tue, 26 Jun 2018 10:53:49 -0700 Subject: [PATCH 2/2] Code review issues --- .../google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index dbf2ada3e006..bc41aa4a0239 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -38,7 +38,6 @@ import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.ServiceOptions; import com.google.cloud.grpc.GrpcTransportOptions; -import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerOptions; @@ -109,8 +108,10 @@ public class GapicSpannerRpc implements SpannerRpc { private static final PathTemplate PROJECT_NAME_TEMPLATE = PathTemplate.create("projects/{project}"); + private static final PathTemplate OPERATION_NAME_TEMPLATE = + PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}"); private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; - + private final SpannerStub spannerStub; private final InstanceAdminStub instanceAdminStub; private final DatabaseAdminStub databaseAdminStub; @@ -350,11 +351,12 @@ public OperationFuture updateDatabaseDdl( try { operationFuture.getInitialFuture().get(); } catch (InterruptedException e) { - // The error would be handled in SpannerImpl, swallow it here + throw SpannerExceptionFactory.newSpannerException(e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof AlreadyExistsException) { - String operationName = String.format("%s/operations/%s", databaseName, updateId); + String operationName = + OPERATION_NAME_TEMPLATE.instantiate("database", databaseName, "operation", updateId); return callable.resumeFutureCall(operationName, context); } }