From fcf3a0e83034ddb97e98326c1c24a36b910757e7 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Tue, 22 May 2018 16:01:40 -0700 Subject: [PATCH] Add logging interceptor and error augmentation interceptor --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 96 ++++++++-------- .../spanner/spi/v1/LoggingInterceptor.java | 108 ++++++++++++++++++ .../spi/v1/SpannerInterceptorProvider.java | 40 +++++++ 3 files changed, 194 insertions(+), 50 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LoggingInterceptor.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 18ca176d0153..fba7beab6ab6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -23,11 +23,9 @@ import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.rpc.ApiClientHeaderProvider; -import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StatusCode; @@ -52,6 +50,7 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; import com.google.longrunning.GetOperationRequest; +import com.google.longrunning.Operation; import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; @@ -83,10 +82,10 @@ import com.google.spanner.v1.CreateSessionRequest; import com.google.spanner.v1.DeleteSessionRequest; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.PartitionQueryRequest; import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; -import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Session; @@ -100,8 +99,6 @@ import java.util.concurrent.Future; import javax.annotation.Nullable; -import com.google.longrunning.Operation; - /** Implementation of Cloud Spanner remote calls using Gapic libraries. */ public class GapicSpannerRpc implements SpannerRpc { @@ -145,9 +142,7 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException { mergedHeaderProvider.getHeaders(), internalHeaderProviderBuilder.getResourceHeaderKey()); - // TODO(pongad): make RPC logging work (formerly LoggingInterceptor) // TODO(pongad): add watchdog - // TODO(pongad): make error augmentation work (formerly SpannerErrorInterceptor) // TODO(hzyi): make this channelProvider configurable through SpannerOptions TransportChannelProvider channelProvider = @@ -156,11 +151,12 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException { .setEndpoint(options.getEndpoint()) .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) .setPoolSize(options.getNumChannels()) + .setInterceptorProvider(new SpannerInterceptorProvider()) .build(); CredentialsProvider credentialsProvider = GrpcTransportOptions.setUpCredentialsProvider(options); - + // Disabling retry for now because spanner handles retry in SpannerImpl. // We will finally want to improve gax but for smooth transitioning we // preserve the retry in SpannerImpl @@ -168,48 +164,48 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException { // TODO: bump the version of gax and remove this try-catch block // applyToAllUnaryMethods does not throw exception in the latest version this.stub = - GrpcSpannerStub.create( - SpannerStubSettings.newBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .applyToAllUnaryMethods( - new ApiFunction, Void>() { - @Override - public Void apply(UnaryCallSettings.Builder builder) { - builder.setRetryableCodes(ImmutableSet.of()); - return null; - } - }) - .build()); - - this.instanceStub = - GrpcInstanceAdminStub.create( - InstanceAdminStubSettings.newBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .applyToAllUnaryMethods( - new ApiFunction, Void>() { - @Override - public Void apply(UnaryCallSettings.Builder builder) { - builder.setRetryableCodes(ImmutableSet.of()); - return null; - } - }) - .build()); - this.databaseStub = - GrpcDatabaseAdminStub.create( - DatabaseAdminStubSettings.newBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .applyToAllUnaryMethods( - new ApiFunction, Void>() { - @Override - public Void apply(UnaryCallSettings.Builder builder) { - builder.setRetryableCodes(ImmutableSet.of()); - return null; - } - }) - .build()); + GrpcSpannerStub.create( + SpannerStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .build()); + + this.instanceStub = + GrpcInstanceAdminStub.create( + InstanceAdminStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .build()); + this.databaseStub = + GrpcDatabaseAdminStub.create( + DatabaseAdminStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .build()); } catch (Exception e) { throw SpannerExceptionFactory.newSpannerException(e); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LoggingInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LoggingInterceptor.java new file mode 100644 index 000000000000..44571b1a6523 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LoggingInterceptor.java @@ -0,0 +1,108 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** Adds logging to rpc calls */ +class LoggingInterceptor implements ClientInterceptor { + + private final Logger logger; + private final Level level; + + LoggingInterceptor(Logger logger, Level level) { + this.logger = logger; + this.level = level; + } + + private class CallLogger { + + private final MethodDescriptor method; + + CallLogger(MethodDescriptor method) { + this.method = method; + } + + void log(String message) { + logger.log( + level, + "{0}[{1}]: {2}", + new Object[] { + method.getFullMethodName(), Integer.toHexString(System.identityHashCode(this)), message + }); + } + + void logfmt(String message, Object... params) { + log(String.format(message, params)); + } + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + if (!logger.isLoggable(level)) { + return next.newCall(method, callOptions); + } + + final CallLogger callLogger = new CallLogger(method); + callLogger.log("Start"); + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + super.start( + new ForwardingClientCallListener.SimpleForwardingClientCallListener( + responseListener) { + @Override + public void onMessage(RespT message) { + callLogger.logfmt("Received:\n%s", message); + super.onMessage(message); + } + + @Override + public void onClose(Status status, Metadata trailers) { + callLogger.logfmt("Closed with status %s and trailers %s", status, trailers); + super.onClose(status, trailers); + } + }, + headers); + } + + @Override + public void sendMessage(ReqT message) { + callLogger.logfmt("Send:\n%s", message); + super.sendMessage(message); + } + + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + callLogger.logfmt("Cancelled with message %s", message); + super.cancel(message, cause); + } + }; + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java new file mode 100644 index 000000000000..a07d549cd415 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import com.google.api.gax.grpc.GrpcInterceptorProvider; +import com.google.common.collect.ImmutableList; +import io.grpc.ClientInterceptor; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * For internal use only. + */ +class SpannerInterceptorProvider implements GrpcInterceptorProvider { + + private static final List clientInterceptors = + ImmutableList.of( + new SpannerErrorInterceptor(), + new LoggingInterceptor(Logger.getLogger(GrpcSpannerRpc.class.getName()), Level.FINER)); + + @Override + public List getInterceptors() { + return clientInterceptors; + } + +}