From e892ff9bd595b42a37853a237c3b3e568e23483d Mon Sep 17 00:00:00 2001 From: Dariusz Jedrzejczyk Date: Mon, 11 Oct 2021 12:09:15 +0200 Subject: [PATCH] Untangling `GrpcServerBuilder` from `HttpServerBuilder` (#1861) Motivation: The methods in `GrpcServerBuilder` are often only delegating to the underlying HTTP transport builder. It will be beneficial if the API can be simplified and have only gRPC specific settings. The first step is to deprecate the methods which can be called directly on the underlying `HttpServerBuilder` and to introduce a method for configuring that builder. Modifications: - Introduce a `GrpcServerBuilder.HttpInitializer` interface and a corresponding `GrpcServerBuilder#initialize` method which accepts the initializer for the underlying `HttpServerBuilder` instance, - Deprecate methods which have only served the purpose of delegating to the underlying HTTP transport builder, - Added default implementations for deprecated methods, - Made the builder reconfigurable after first build, - Clarified javadoc for lifecycleObserver overriding by initializer, - Refactor tests to use the new approach to showcase it works. Result: The API of `GrpcServerBuilder` is prepared for removal of the deprecated methods in 0.42 release and the API is more specific, while maintaining the same level of configurability. --- .../grpc/api/GrpcServerBuilder.java | 156 +++++++++++++++--- .../grpc/netty/DefaultGrpcServerBuilder.java | 74 ++++++--- .../servicetalk/grpc/netty/GrpcServers.java | 4 +- .../grpc/netty/ErrorHandlingTest.java | 4 +- .../netty/GrpcClientRequiresTrailersTest.java | 6 +- .../GrpcClientValidatesContentTypeTest.java | 6 +- .../grpc/netty/GrpcLifecycleObserverTest.java | 7 +- .../netty/GrpcServiceContextProtocolTest.java | 2 +- .../GrpcSslAndNonSslConnectionsTest.java | 12 +- .../servicetalk/grpc/netty/KeepAliveTest.java | 20 ++- .../grpc/netty/ProtocolCompatibilityTest.java | 31 ++-- .../grpc/netty/TrailersOnlyErrorTest.java | 4 +- 12 files changed, 236 insertions(+), 90 deletions(-) diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcServerBuilder.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcServerBuilder.java index 001e8aae32..adabd3b612 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcServerBuilder.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcServerBuilder.java @@ -19,8 +19,10 @@ import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Single; import io.servicetalk.http.api.HttpExecutionStrategy; +import io.servicetalk.http.api.HttpLifecycleObserver; import io.servicetalk.http.api.HttpProtocolConfig; import io.servicetalk.http.api.HttpRequest; +import io.servicetalk.http.api.HttpServerBuilder; import io.servicetalk.http.api.HttpServiceContext; import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpResponse; @@ -55,7 +57,45 @@ */ public abstract class GrpcServerBuilder { - private boolean appendedCatchAllFilter; + /** + * Initializes the underlying {@link HttpServerBuilder} used for the transport layer. + */ + @FunctionalInterface + public interface HttpInitializer { + + /** + * Configures the underlying {@link HttpServerBuilder}. + * @param builder The builder to customize the HTTP layer. + */ + void initialize(HttpServerBuilder builder); + + /** + * Appends the passed {@link HttpInitializer} to this {@link HttpInitializer} such that this instance is + * applied first and then the argument's {@link HttpInitializer}. + * @param toAppend {@link HttpInitializer} to append. + * @return A composite {@link HttpInitializer} after the append operation. + */ + default HttpInitializer append(HttpInitializer toAppend) { + return builder -> { + initialize(builder); + toAppend.initialize(builder); + }; + } + } + + /** + * Set a function which can configure the underlying {@link HttpServerBuilder} used for the transport layer. + *

+ * Please note that this method shouldn't be mixed with the {@link Deprecated} methods of this class as the order + * of operations would not be the same as the order in which the calls are made. Please migrate all of the calls + * to this method. + * @param initializer Initializes the underlying HTTP transport builder. + * @return {@code this}. + */ + public GrpcServerBuilder initializeHttp(HttpInitializer initializer) { + throw new UnsupportedOperationException("Initializing the HttpServerBuilder using this method is not yet" + + "supported by " + getClass().getName()); + } /** * Configurations of various underlying protocol versions. @@ -65,8 +105,15 @@ public abstract class GrpcServerBuilder { * * @param protocols {@link HttpProtocolConfig} for each protocol that should be supported. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#protocols(HttpProtocolConfig...)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder protocols(HttpProtocolConfig... protocols); + @Deprecated + public GrpcServerBuilder protocols(HttpProtocolConfig... protocols) { + throw new UnsupportedOperationException("Method protocols is not supported by " + getClass().getName()); + } /** * Set a default timeout during which gRPC calls are expected to complete. This default will be used only if the @@ -81,8 +128,15 @@ public abstract class GrpcServerBuilder { * Set the SSL/TLS configuration. * @param config The configuration to use. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#sslConfig(ServerSslConfig)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder sslConfig(ServerSslConfig config); + @Deprecated + public GrpcServerBuilder sslConfig(ServerSslConfig config) { + throw new UnsupportedOperationException("Method sslConfig is not supported by " + getClass().getName()); + } /** * Set the SSL/TLS and SNI configuration. @@ -91,8 +145,15 @@ public abstract class GrpcServerBuilder { * @param sniMap A map where the keys are matched against the client certificate's SNI extension value in order * to provide the corresponding {@link ServerSslConfig}. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#sslConfig(ServerSslConfig, Map)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder sslConfig(ServerSslConfig defaultConfig, Map sniMap); + @Deprecated + public GrpcServerBuilder sslConfig(ServerSslConfig defaultConfig, Map sniMap) { + throw new UnsupportedOperationException("Method sslConfig is not supported by " + getClass().getName()); + } /** * Add a {@link SocketOption} that is applied. @@ -103,8 +164,15 @@ public abstract class GrpcServerBuilder { * @return {@code this}. * @see StandardSocketOptions * @see ServiceTalkSocketOptions + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#socketOption(SocketOption, Object)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder socketOption(SocketOption option, T value); + @Deprecated + public GrpcServerBuilder socketOption(SocketOption option, T value) { + throw new UnsupportedOperationException("Method socketOption is not supported by " + getClass().getName()); + } /** * Adds a {@link SocketOption} that is applied to the server socket channel which listens/accepts socket channels. @@ -114,8 +182,16 @@ public abstract class GrpcServerBuilder { * @return this. * @see StandardSocketOptions * @see ServiceTalkSocketOptions + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#listenSocketOption(SocketOption, Object)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder listenSocketOption(SocketOption option, T value); + @Deprecated + public GrpcServerBuilder listenSocketOption(SocketOption option, T value) { + throw new UnsupportedOperationException("Method listenSocketOption is not supported by " + + getClass().getName()); + } /** * Enables wire-logging for connections created by this builder. @@ -125,21 +201,39 @@ public abstract class GrpcServerBuilder { * @param logUserData {@code true} to include user data (e.g. data, headers, etc.). {@code false} to exclude user * data and log only network events. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#enableWireLogging(String, LogLevel, BooleanSupplier)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder enableWireLogging(String loggerName, LogLevel logLevel, - BooleanSupplier logUserData); + @Deprecated + public GrpcServerBuilder enableWireLogging(String loggerName, LogLevel logLevel, BooleanSupplier logUserData) { + throw new UnsupportedOperationException("Method enableWireLogging is not supported by " + + getClass().getName()); + } /** * Sets a {@link TransportObserver} that provides visibility into transport events. * * @param transportObserver A {@link TransportObserver} that provides visibility into transport events. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#transportObserver(TransportObserver)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder transportObserver(TransportObserver transportObserver); + @Deprecated + public GrpcServerBuilder transportObserver(TransportObserver transportObserver) { + throw new UnsupportedOperationException("Method transportObserver is not supported by " + + getClass().getName()); + } /** * Sets a {@link GrpcLifecycleObserver} that provides visibility into gRPC lifecycle events. - * + *

+ * Note, if {@link #initializeHttp(HttpInitializer)} is used to configure + * {@link HttpServerBuilder#lifecycleObserver(HttpLifecycleObserver)} – that will override the value specified + * using this method. Please choose only one approach. * @param lifecycleObserver A {@link GrpcLifecycleObserver} that provides visibility into gRPC lifecycle events. * @return {@code this}. */ @@ -160,8 +254,16 @@ public abstract GrpcServerBuilder enableWireLogging(String loggerName, LogLevel * @param enable When {@code false} it will disable the automatic consumption of request * {@link StreamingHttpRequest#payloadBody()}. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#drainRequestPayloadBody(boolean)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder drainRequestPayloadBody(boolean enable); + @Deprecated + public GrpcServerBuilder drainRequestPayloadBody(boolean enable) { + throw new UnsupportedOperationException("Method drainRequestPayloadBody is not supported by " + + getClass().getName()); + } /** * Append the filter to the chain of filters used to decorate the {@link ConnectionAcceptor} used by this builder. @@ -179,8 +281,16 @@ public abstract GrpcServerBuilder enableWireLogging(String loggerName, LogLevel * @param factory {@link ConnectionAcceptorFactory} to append. Lifetime of this * {@link ConnectionAcceptorFactory} is managed by this builder and the server started thereof. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#appendConnectionAcceptorFilter(ConnectionAcceptorFactory)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder appendConnectionAcceptorFilter(ConnectionAcceptorFactory factory); + @Deprecated + public GrpcServerBuilder appendConnectionAcceptorFilter(ConnectionAcceptorFactory factory) { + throw new UnsupportedOperationException("Method appendConnectionAcceptorFilter is not supported by " + + getClass().getName()); + } /** * Append the filter to the chain of filters used to decorate the service used by this builder. @@ -195,9 +305,13 @@ public abstract GrpcServerBuilder enableWireLogging(String loggerName, LogLevel * * @param factory {@link StreamingHttpServiceFilterFactory} to append. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#appendServiceFilter(StreamingHttpServiceFilterFactory)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ + @Deprecated public final GrpcServerBuilder appendHttpServiceFilter(StreamingHttpServiceFilterFactory factory) { - appendCatchAllFilterIfRequired(); doAppendHttpServiceFilter(factory); return this; } @@ -217,10 +331,14 @@ public final GrpcServerBuilder appendHttpServiceFilter(StreamingHttpServiceFilte * @param predicate the {@link Predicate} to test if the filter must be applied. * @param factory {@link StreamingHttpServiceFilterFactory} to append. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#appendServiceFilter(Predicate, StreamingHttpServiceFilterFactory)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ + @Deprecated public final GrpcServerBuilder appendHttpServiceFilter(Predicate predicate, StreamingHttpServiceFilterFactory factory) { - appendCatchAllFilterIfRequired(); doAppendHttpServiceFilter(predicate, factory); return this; } @@ -361,11 +479,11 @@ public final ServerContext listenAndAwait(GrpcBindableService... servic protected abstract void doAppendHttpServiceFilter(Predicate predicate, StreamingHttpServiceFilterFactory factory); - private void appendCatchAllFilterIfRequired() { - if (!appendedCatchAllFilter) { - doAppendHttpServiceFilter(CatchAllHttpServiceFilter::new); - appendedCatchAllFilter = true; - } + protected static void appendCatchAllFilter(HttpServerBuilder httpServerBuilder) { + // TODO(dj): Move to DefaultGrpcServerBuilder + // This code depends on GrpcUtils which is inaccessible from the servicetalk-grpc-netty module. + // When this class is converted to an interface we can also refactor that part. + httpServerBuilder.appendServiceFilter(CatchAllHttpServiceFilter::new); } static final class CatchAllHttpServiceFilter extends StreamingHttpServiceFilter { diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java index bb31d7f202..8e0bb8d0e8 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.function.BooleanSupplier; import java.util.function.Predicate; +import java.util.function.Supplier; import javax.annotation.Nullable; import static io.servicetalk.grpc.api.GrpcExecutionStrategies.defaultStrategy; @@ -59,12 +60,19 @@ import static io.servicetalk.grpc.internal.DeadlineUtils.readTimeoutHeader; import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default; import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; +import static java.util.Objects.requireNonNull; final class DefaultGrpcServerBuilder extends GrpcServerBuilder implements ServerBinder { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultGrpcServerBuilder.class); - private final HttpServerBuilder httpServerBuilder; + private final Supplier httpServerBuilderSupplier; + private GrpcServerBuilder.HttpInitializer initializer = builder -> { + // no-op + }; + private GrpcServerBuilder.HttpInitializer directCallInitializer = builder -> { + // no-op + }; private final ExecutionContextBuilder contextBuilder = new ExecutionContextBuilder() // Make sure we always set a strategy so that ExecutionContextBuilder does not create a strategy which is // not compatible with gRPC. @@ -75,107 +83,113 @@ final class DefaultGrpcServerBuilder extends GrpcServerBuilder implements Server */ @Nullable private Duration defaultTimeout; - private boolean invokedBuild; - DefaultGrpcServerBuilder(final HttpServerBuilder httpServerBuilder) { - this.httpServerBuilder = httpServerBuilder.protocols(h2Default()).allowDropRequestTrailers(true); + DefaultGrpcServerBuilder(final Supplier httpServerBuilderSupplier) { + this.httpServerBuilderSupplier = () -> httpServerBuilderSupplier.get() + .protocols(h2Default()).allowDropRequestTrailers(true); + } + + @Override + public GrpcServerBuilder initializeHttp(final GrpcServerBuilder.HttpInitializer initializer) { + this.initializer = requireNonNull(initializer); + return this; } @Override public GrpcServerBuilder defaultTimeout(Duration defaultTimeout) { - if (invokedBuild) { - throw new IllegalStateException("default timeout cannot be modified after build, create a new builder"); - } this.defaultTimeout = ensurePositive(defaultTimeout, "defaultTimeout"); return this; } @Override public GrpcServerBuilder protocols(final HttpProtocolConfig... protocols) { - httpServerBuilder.protocols(protocols); + directCallInitializer = directCallInitializer.append(builder -> builder.protocols(protocols)); return this; } @Override public GrpcServerBuilder sslConfig(final ServerSslConfig config) { - httpServerBuilder.sslConfig(config); + directCallInitializer = directCallInitializer.append(builder -> builder.sslConfig(config)); return this; } @Override public GrpcServerBuilder sslConfig(final ServerSslConfig defaultConfig, final Map sniMap) { - httpServerBuilder.sslConfig(defaultConfig, sniMap); + directCallInitializer = directCallInitializer.append(builder -> builder.sslConfig(defaultConfig, sniMap)); return this; } @Override public GrpcServerBuilder socketOption(final SocketOption option, final T value) { - httpServerBuilder.socketOption(option, value); + directCallInitializer = directCallInitializer.append(builder -> builder.socketOption(option, value)); return this; } @Override public GrpcServerBuilder listenSocketOption(final SocketOption option, final T value) { - httpServerBuilder.listenSocketOption(option, value); + directCallInitializer = directCallInitializer.append(builder -> builder.listenSocketOption(option, value)); return this; } @Override public GrpcServerBuilder enableWireLogging(final String loggerName, final LogLevel logLevel, final BooleanSupplier logUserData) { - httpServerBuilder.enableWireLogging(loggerName, logLevel, logUserData); + directCallInitializer = directCallInitializer.append(builder -> + builder.enableWireLogging(loggerName, logLevel, logUserData)); return this; } @Override public GrpcServerBuilder transportObserver(final TransportObserver transportObserver) { - httpServerBuilder.transportObserver(transportObserver); + directCallInitializer = directCallInitializer.append(builder -> builder.transportObserver(transportObserver)); return this; } @Override public GrpcServerBuilder lifecycleObserver(final GrpcLifecycleObserver lifecycleObserver) { - httpServerBuilder.lifecycleObserver(new GrpcToHttpLifecycleObserverBridge(lifecycleObserver)); + directCallInitializer = directCallInitializer.append(builder -> builder + .lifecycleObserver(new GrpcToHttpLifecycleObserverBridge(lifecycleObserver))); return this; } @Override public GrpcServerBuilder drainRequestPayloadBody(boolean enable) { - httpServerBuilder.drainRequestPayloadBody(enable); + directCallInitializer = directCallInitializer.append(builder -> builder.drainRequestPayloadBody(enable)); return this; } @Override public GrpcServerBuilder appendConnectionAcceptorFilter(final ConnectionAcceptorFactory factory) { - httpServerBuilder.appendConnectionAcceptorFilter(factory); + directCallInitializer = directCallInitializer.append(builder -> + builder.appendConnectionAcceptorFilter(factory)); return this; } @Override public GrpcServerBuilder executor(final Executor executor) { contextBuilder.executor(executor); - httpServerBuilder.executor(executor); + directCallInitializer = directCallInitializer.append(builder -> builder.executor(executor)); return this; } @Override public GrpcServerBuilder ioExecutor(final IoExecutor ioExecutor) { contextBuilder.ioExecutor(ioExecutor); - httpServerBuilder.ioExecutor(ioExecutor); + directCallInitializer = directCallInitializer.append(builder -> builder.ioExecutor(ioExecutor)); return this; } @Override public GrpcServerBuilder bufferAllocator(final BufferAllocator allocator) { contextBuilder.bufferAllocator(allocator); - httpServerBuilder.bufferAllocator(allocator); + directCallInitializer = directCallInitializer.append(builder -> builder.bufferAllocator(allocator)); return this; } @Override public GrpcServerBuilder executionStrategy(final GrpcExecutionStrategy strategy) { contextBuilder.executionStrategy(strategy); - httpServerBuilder.executionStrategy(strategy); + directCallInitializer = directCallInitializer.append(builder -> builder.executionStrategy(strategy)); return this; } @@ -186,20 +200,26 @@ protected Single doListen(final GrpcServiceFactory servi @Override protected void doAppendHttpServiceFilter(final StreamingHttpServiceFilterFactory factory) { - httpServerBuilder.appendServiceFilter(factory); + directCallInitializer = directCallInitializer.append(builder -> builder.appendServiceFilter(factory)); } @Override protected void doAppendHttpServiceFilter(final Predicate predicate, final StreamingHttpServiceFilterFactory factory) { - httpServerBuilder.appendServiceFilter(predicate, factory); + directCallInitializer = directCallInitializer.append(builder -> + builder.appendServiceFilter(predicate, factory)); } private HttpServerBuilder preBuild() { - if (!invokedBuild) { - doAppendHttpServiceFilter(new TimeoutHttpServiceFilter(grpcDetermineTimeout(defaultTimeout), true)); - } - invokedBuild = true; + final HttpServerBuilder httpServerBuilder = httpServerBuilderSupplier.get(); + + appendCatchAllFilter(httpServerBuilder); + + directCallInitializer.initialize(httpServerBuilder); + initializer.initialize(httpServerBuilder); + + httpServerBuilder.appendServiceFilter( + new TimeoutHttpServiceFilter(grpcDetermineTimeout(defaultTimeout), true)); return httpServerBuilder; } diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcServers.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcServers.java index 338b24be82..09e487483e 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcServers.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcServers.java @@ -36,7 +36,7 @@ private GrpcServers() { * @return a new builder. */ public static GrpcServerBuilder forPort(int port) { - return new DefaultGrpcServerBuilder(HttpServers.forPort(port)); + return new DefaultGrpcServerBuilder(() -> HttpServers.forPort(port)); } /** @@ -46,6 +46,6 @@ public static GrpcServerBuilder forPort(int port) { * @return a new builder. */ public static GrpcServerBuilder forAddress(SocketAddress socketAddress) { - return new DefaultGrpcServerBuilder(HttpServers.forAddress(socketAddress)); + return new DefaultGrpcServerBuilder(() -> HttpServers.forAddress(socketAddress)); } } diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java index 6934ea6700..e7ca8b9f89 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java @@ -261,7 +261,9 @@ private void setUp(TestMode testMode, GrpcExecutionStrategy serverStrategy, throw new IllegalArgumentException("Unknown mode: " + testMode); } this.requestPublisher = requestPublisher; - serverContext = GrpcServers.forAddress(localAddress(0)).appendHttpServiceFilter(serviceFilterFactory) + final StreamingHttpServiceFilterFactory filterFactory = serviceFilterFactory; + serverContext = GrpcServers.forAddress(localAddress(0)) + .initializeHttp(builder -> builder.appendServiceFilter(filterFactory)) .executionStrategy(serverStrategy).listenAndAwait(serviceFactory); final StreamingHttpClientFilterFactory pickedClientFilterFactory = clientFilterFactory; GrpcClientBuilder clientBuilder = diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java index c8f3644cef..ac7505ac07 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java @@ -60,8 +60,8 @@ class GrpcClientRequiresTrailersTest { private BlockingTesterClient client; private void setUp(boolean hasTrailers) throws Exception { - serverContext = GrpcServers.forAddress(localAddress(0)) - .appendHttpServiceFilter(service -> new StreamingHttpServiceFilter(service) { + serverContext = GrpcServers.forAddress(localAddress(0)).initializeHttp(builder -> builder + .appendServiceFilter(service -> new StreamingHttpServiceFilter(service) { @Override public Single handle( final HttpServiceContext ctx, final StreamingHttpRequest request, @@ -78,7 +78,7 @@ protected HttpHeaders payloadComplete(final HttpHeaders trailers) { return resp; }); } - }) + })) .listenAndAwait(new TesterProto.Tester.TesterService() { @Override public Single testRequestStream(final GrpcServiceContext ctx, diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java index f67f82f67a..5ab8ff916d 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java @@ -48,8 +48,8 @@ final class GrpcClientValidatesContentTypeTest { private TesterProto.Tester.BlockingTesterClient client; void setUp(boolean withCharset) throws Exception { - serverContext = GrpcServers.forAddress(localAddress(0)) - .appendHttpServiceFilter(service -> new StreamingHttpServiceFilter(service) { + serverContext = GrpcServers.forAddress(localAddress(0)).initializeHttp(builder -> builder + .appendServiceFilter(service -> new StreamingHttpServiceFilter(service) { @Override public Single handle( final HttpServiceContext ctx, final StreamingHttpRequest request, @@ -60,7 +60,7 @@ public Single handle( return resp; }); } - }) + })) .listenAndAwait(new TesterProto.Tester.TesterService() { @Override public Single testRequestStream(final GrpcServiceContext ctx, diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java index 3ffe257e99..227c2cdaa0 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java @@ -143,8 +143,11 @@ private void setUp(boolean error) throws Exception { server = forAddress(localAddress(0)) .ioExecutor(SERVER_CTX.ioExecutor()) .executor(SERVER_CTX.executor()) - .enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true) - .protocols(h2().enableFrameLogging("servicetalk-tests-h2-frame-logger", TRACE, () -> true).build()) + .initializeHttp(builder -> builder + .enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true) + .protocols(h2().enableFrameLogging("servicetalk-tests-h2-frame-logger", TRACE, () -> true) + .build()) + ) .lifecycleObserver(combine(serverLifecycleObserver, LOGGING)) .listenAndAwait(new EchoService(error)); diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java index 53cf36c41d..ad03ef70a6 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java @@ -67,7 +67,7 @@ private void setUp(HttpProtocolVersion httpProtocol, boolean streamingService) t expectedValue = "gRPC over " + httpProtocol; serverContext = GrpcServers.forAddress(localAddress(0)) - .protocols(protocolConfig(httpProtocol)) + .initializeHttp(builder -> builder.protocols(protocolConfig(httpProtocol))) .listenAndAwait(streamingService ? new ServiceFactory(new TesterServiceImpl()) : new ServiceFactory(new BlockingTesterServiceImpl())); diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcSslAndNonSslConnectionsTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcSslAndNonSslConnectionsTest.java index 3a1907f14c..d70b3196a0 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcSslAndNonSslConnectionsTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcSslAndNonSslConnectionsTest.java @@ -55,9 +55,7 @@ private ServerContext nonSecureGrpcServer() throws Exception { private ServerContext secureGrpcServer() throws Exception { return GrpcServers.forAddress(localAddress(0)) - .sslConfig( - trustedServerConfig() - ) + .initializeHttp(builder -> builder.sslConfig(trustedServerConfig())) .listenAndAwait(serviceFactory()); } @@ -155,10 +153,10 @@ void secureClientToSecureServerWithoutPeerHostSucceeds() throws Exception { @Test void noSniClientDefaultServerFallbackSuccess() throws Exception { try (ServerContext serverContext = GrpcServers.forAddress(localAddress(0)) - .sslConfig( + .initializeHttp(builder -> builder.sslConfig( trustedServerConfig(), singletonMap(getLoopbackAddress().getHostName(), untrustedServerConfig()) - ) + )) .listenAndAwait(serviceFactory()); BlockingTesterClient client = GrpcClients.forAddress( getLoopbackAddress().getHostName(), serverHostAndPort(serverContext).port()) @@ -177,10 +175,10 @@ void noSniClientDefaultServerFallbackSuccess() throws Exception { @Test void noSniClientDefaultServerFallbackFailExpected() throws Exception { try (ServerContext serverContext = GrpcServers.forAddress(localAddress(0)) - .sslConfig( + .initializeHttp(builder -> builder.sslConfig( untrustedServerConfig(), singletonMap(getLoopbackAddress().getHostName(), trustedServerConfig()) - ) + )) .listenAndAwait(serviceFactory()); BlockingTesterClient client = GrpcClients.forAddress( getLoopbackAddress().getHostName(), serverHostAndPort(serverContext).port()) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java index c0ffb14554..d9390ff3a6 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java @@ -84,15 +84,17 @@ private void setUp(final boolean keepAlivesFromClient, final Duration idleTimeout) throws Exception { this.idleTimeoutMillis = idleTimeout.toMillis(); GrpcServerBuilder serverBuilder = forAddress(localAddress(0)) - .executor(SERVER_CTX.executor()) - .ioExecutor(SERVER_CTX.ioExecutor()) - .executionStrategy(defaultStrategy()); - if (!keepAlivesFromClient) { - serverBuilder.protocols(h2Config(keepAliveIdleFor)); - } else { - serverBuilder.socketOption(IDLE_TIMEOUT, idleTimeoutMillis) - .protocols(h2Config(null)); - } + .executor(SERVER_CTX.executor()) + .ioExecutor(SERVER_CTX.ioExecutor()) + .executionStrategy(defaultStrategy()) + .initializeHttp(builder -> { + if (!keepAlivesFromClient) { + builder.protocols(h2Config(keepAliveIdleFor)); + } else { + builder.socketOption(IDLE_TIMEOUT, idleTimeoutMillis) + .protocols(h2Config(null)); + } + }); ctx = serverBuilder.listenAndAwait(new ServiceFactory(new InfiniteStreamsService())); GrpcClientBuilder clientBuilder = GrpcClients.forAddress(serverHostAndPort(ctx)) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java index 54da34f3ee..72a54bafe4 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java @@ -1063,26 +1063,29 @@ private static GrpcServerBuilder serviceTalkServerBuilder(final ErrorMode errorM @Nullable final Duration timeout) { final GrpcServerBuilder serverBuilder = GrpcServers.forAddress(localAddress(0)) - .appendHttpServiceFilter(service -> new StreamingHttpServiceFilter(service) { - @Override - public Single handle(final HttpServiceContext ctx, - final StreamingHttpRequest req, - final StreamingHttpResponseFactory resFactory) { - if (errorMode == ErrorMode.SIMPLE_IN_SERVER_FILTER) { - throwGrpcStatusException(); - } else if (errorMode == ErrorMode.STATUS_IN_SERVER_FILTER) { - throwGrpcStatusExceptionWithStatus(); + .initializeHttp(builder -> { + builder.appendServiceFilter(service -> new StreamingHttpServiceFilter(service) { + @Override + public Single handle(final HttpServiceContext ctx, + final StreamingHttpRequest req, + final StreamingHttpResponseFactory resFactory) { + if (errorMode == ErrorMode.SIMPLE_IN_SERVER_FILTER) { + throwGrpcStatusException(); + } else if (errorMode == ErrorMode.STATUS_IN_SERVER_FILTER) { + throwGrpcStatusExceptionWithStatus(); + } + return delegate().handle(ctx, req, resFactory); } - return delegate().handle(ctx, req, resFactory); + }); + if (ssl) { + builder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, + DefaultTestCerts::loadServerKey).provider(OPENSSL).build()); } }); if (null != timeout) { serverBuilder.defaultTimeout(timeout); } - return ssl ? - serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, - DefaultTestCerts::loadServerKey).provider(OPENSSL).build()) : - serverBuilder; + return serverBuilder; } private static TestServerContext serviceTalkServerBlocking(final ErrorMode errorMode, final boolean ssl, diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/TrailersOnlyErrorTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/TrailersOnlyErrorTest.java index 3e21f9625e..a6d50dd87a 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/TrailersOnlyErrorTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/TrailersOnlyErrorTest.java @@ -151,14 +151,14 @@ void testServiceFilterThrows() throws Exception { final TesterService service = mockTesterService(); final GrpcServerBuilder serverBuilder = GrpcServers.forAddress(localAddress(0)) - .appendHttpServiceFilter(svc -> new StreamingHttpServiceFilter(svc) { + .initializeHttp(builder -> builder.appendServiceFilter(svc -> new StreamingHttpServiceFilter(svc) { @Override public Single handle( final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { throw DELIBERATE_EXCEPTION; } - }); + })); try (ServerContext serverContext = serverBuilder.listenAndAwait(new Tester.ServiceFactory(service))) {