diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcServerBuilder.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcServerBuilder.java index 001e8aae32..adabd3b612 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcServerBuilder.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcServerBuilder.java @@ -19,8 +19,10 @@ import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Single; import io.servicetalk.http.api.HttpExecutionStrategy; +import io.servicetalk.http.api.HttpLifecycleObserver; import io.servicetalk.http.api.HttpProtocolConfig; import io.servicetalk.http.api.HttpRequest; +import io.servicetalk.http.api.HttpServerBuilder; import io.servicetalk.http.api.HttpServiceContext; import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpResponse; @@ -55,7 +57,45 @@ */ public abstract class GrpcServerBuilder { - private boolean appendedCatchAllFilter; + /** + * Initializes the underlying {@link HttpServerBuilder} used for the transport layer. + */ + @FunctionalInterface + public interface HttpInitializer { + + /** + * Configures the underlying {@link HttpServerBuilder}. + * @param builder The builder to customize the HTTP layer. + */ + void initialize(HttpServerBuilder builder); + + /** + * Appends the passed {@link HttpInitializer} to this {@link HttpInitializer} such that this instance is + * applied first and then the argument's {@link HttpInitializer}. + * @param toAppend {@link HttpInitializer} to append. + * @return A composite {@link HttpInitializer} after the append operation. + */ + default HttpInitializer append(HttpInitializer toAppend) { + return builder -> { + initialize(builder); + toAppend.initialize(builder); + }; + } + } + + /** + * Set a function which can configure the underlying {@link HttpServerBuilder} used for the transport layer. + *

+ * Please note that this method shouldn't be mixed with the {@link Deprecated} methods of this class as the order + * of operations would not be the same as the order in which the calls are made. Please migrate all of the calls + * to this method. + * @param initializer Initializes the underlying HTTP transport builder. + * @return {@code this}. + */ + public GrpcServerBuilder initializeHttp(HttpInitializer initializer) { + throw new UnsupportedOperationException("Initializing the HttpServerBuilder using this method is not yet" + + "supported by " + getClass().getName()); + } /** * Configurations of various underlying protocol versions. @@ -65,8 +105,15 @@ public abstract class GrpcServerBuilder { * * @param protocols {@link HttpProtocolConfig} for each protocol that should be supported. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#protocols(HttpProtocolConfig...)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder protocols(HttpProtocolConfig... protocols); + @Deprecated + public GrpcServerBuilder protocols(HttpProtocolConfig... protocols) { + throw new UnsupportedOperationException("Method protocols is not supported by " + getClass().getName()); + } /** * Set a default timeout during which gRPC calls are expected to complete. This default will be used only if the @@ -81,8 +128,15 @@ public abstract class GrpcServerBuilder { * Set the SSL/TLS configuration. * @param config The configuration to use. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#sslConfig(ServerSslConfig)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder sslConfig(ServerSslConfig config); + @Deprecated + public GrpcServerBuilder sslConfig(ServerSslConfig config) { + throw new UnsupportedOperationException("Method sslConfig is not supported by " + getClass().getName()); + } /** * Set the SSL/TLS and SNI configuration. @@ -91,8 +145,15 @@ public abstract class GrpcServerBuilder { * @param sniMap A map where the keys are matched against the client certificate's SNI extension value in order * to provide the corresponding {@link ServerSslConfig}. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#sslConfig(ServerSslConfig, Map)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder sslConfig(ServerSslConfig defaultConfig, Map sniMap); + @Deprecated + public GrpcServerBuilder sslConfig(ServerSslConfig defaultConfig, Map sniMap) { + throw new UnsupportedOperationException("Method sslConfig is not supported by " + getClass().getName()); + } /** * Add a {@link SocketOption} that is applied. @@ -103,8 +164,15 @@ public abstract class GrpcServerBuilder { * @return {@code this}. * @see StandardSocketOptions * @see ServiceTalkSocketOptions + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#socketOption(SocketOption, Object)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder socketOption(SocketOption option, T value); + @Deprecated + public GrpcServerBuilder socketOption(SocketOption option, T value) { + throw new UnsupportedOperationException("Method socketOption is not supported by " + getClass().getName()); + } /** * Adds a {@link SocketOption} that is applied to the server socket channel which listens/accepts socket channels. @@ -114,8 +182,16 @@ public abstract class GrpcServerBuilder { * @return this. * @see StandardSocketOptions * @see ServiceTalkSocketOptions + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#listenSocketOption(SocketOption, Object)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder listenSocketOption(SocketOption option, T value); + @Deprecated + public GrpcServerBuilder listenSocketOption(SocketOption option, T value) { + throw new UnsupportedOperationException("Method listenSocketOption is not supported by " + + getClass().getName()); + } /** * Enables wire-logging for connections created by this builder. @@ -125,21 +201,39 @@ public abstract class GrpcServerBuilder { * @param logUserData {@code true} to include user data (e.g. data, headers, etc.). {@code false} to exclude user * data and log only network events. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#enableWireLogging(String, LogLevel, BooleanSupplier)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder enableWireLogging(String loggerName, LogLevel logLevel, - BooleanSupplier logUserData); + @Deprecated + public GrpcServerBuilder enableWireLogging(String loggerName, LogLevel logLevel, BooleanSupplier logUserData) { + throw new UnsupportedOperationException("Method enableWireLogging is not supported by " + + getClass().getName()); + } /** * Sets a {@link TransportObserver} that provides visibility into transport events. * * @param transportObserver A {@link TransportObserver} that provides visibility into transport events. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#transportObserver(TransportObserver)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder transportObserver(TransportObserver transportObserver); + @Deprecated + public GrpcServerBuilder transportObserver(TransportObserver transportObserver) { + throw new UnsupportedOperationException("Method transportObserver is not supported by " + + getClass().getName()); + } /** * Sets a {@link GrpcLifecycleObserver} that provides visibility into gRPC lifecycle events. - * + *

+ * Note, if {@link #initializeHttp(HttpInitializer)} is used to configure + * {@link HttpServerBuilder#lifecycleObserver(HttpLifecycleObserver)} – that will override the value specified + * using this method. Please choose only one approach. * @param lifecycleObserver A {@link GrpcLifecycleObserver} that provides visibility into gRPC lifecycle events. * @return {@code this}. */ @@ -160,8 +254,16 @@ public abstract GrpcServerBuilder enableWireLogging(String loggerName, LogLevel * @param enable When {@code false} it will disable the automatic consumption of request * {@link StreamingHttpRequest#payloadBody()}. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#drainRequestPayloadBody(boolean)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder drainRequestPayloadBody(boolean enable); + @Deprecated + public GrpcServerBuilder drainRequestPayloadBody(boolean enable) { + throw new UnsupportedOperationException("Method drainRequestPayloadBody is not supported by " + + getClass().getName()); + } /** * Append the filter to the chain of filters used to decorate the {@link ConnectionAcceptor} used by this builder. @@ -179,8 +281,16 @@ public abstract GrpcServerBuilder enableWireLogging(String loggerName, LogLevel * @param factory {@link ConnectionAcceptorFactory} to append. Lifetime of this * {@link ConnectionAcceptorFactory} is managed by this builder and the server started thereof. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#appendConnectionAcceptorFilter(ConnectionAcceptorFactory)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ - public abstract GrpcServerBuilder appendConnectionAcceptorFilter(ConnectionAcceptorFactory factory); + @Deprecated + public GrpcServerBuilder appendConnectionAcceptorFilter(ConnectionAcceptorFactory factory) { + throw new UnsupportedOperationException("Method appendConnectionAcceptorFilter is not supported by " + + getClass().getName()); + } /** * Append the filter to the chain of filters used to decorate the service used by this builder. @@ -195,9 +305,13 @@ public abstract GrpcServerBuilder enableWireLogging(String loggerName, LogLevel * * @param factory {@link StreamingHttpServiceFilterFactory} to append. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#appendServiceFilter(StreamingHttpServiceFilterFactory)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ + @Deprecated public final GrpcServerBuilder appendHttpServiceFilter(StreamingHttpServiceFilterFactory factory) { - appendCatchAllFilterIfRequired(); doAppendHttpServiceFilter(factory); return this; } @@ -217,10 +331,14 @@ public final GrpcServerBuilder appendHttpServiceFilter(StreamingHttpServiceFilte * @param predicate the {@link Predicate} to test if the filter must be applied. * @param factory {@link StreamingHttpServiceFilterFactory} to append. * @return {@code this}. + * @deprecated Call {@link #initializeHttp(HttpInitializer)} and use + * {@link HttpServerBuilder#appendServiceFilter(Predicate, StreamingHttpServiceFilterFactory)} + * on the {@code builder} instance by implementing {@link HttpInitializer#initialize(HttpServerBuilder)} + * functional interface. */ + @Deprecated public final GrpcServerBuilder appendHttpServiceFilter(Predicate predicate, StreamingHttpServiceFilterFactory factory) { - appendCatchAllFilterIfRequired(); doAppendHttpServiceFilter(predicate, factory); return this; } @@ -361,11 +479,11 @@ public final ServerContext listenAndAwait(GrpcBindableService... servic protected abstract void doAppendHttpServiceFilter(Predicate predicate, StreamingHttpServiceFilterFactory factory); - private void appendCatchAllFilterIfRequired() { - if (!appendedCatchAllFilter) { - doAppendHttpServiceFilter(CatchAllHttpServiceFilter::new); - appendedCatchAllFilter = true; - } + protected static void appendCatchAllFilter(HttpServerBuilder httpServerBuilder) { + // TODO(dj): Move to DefaultGrpcServerBuilder + // This code depends on GrpcUtils which is inaccessible from the servicetalk-grpc-netty module. + // When this class is converted to an interface we can also refactor that part. + httpServerBuilder.appendServiceFilter(CatchAllHttpServiceFilter::new); } static final class CatchAllHttpServiceFilter extends StreamingHttpServiceFilter { diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java index bb31d7f202..8e0bb8d0e8 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.function.BooleanSupplier; import java.util.function.Predicate; +import java.util.function.Supplier; import javax.annotation.Nullable; import static io.servicetalk.grpc.api.GrpcExecutionStrategies.defaultStrategy; @@ -59,12 +60,19 @@ import static io.servicetalk.grpc.internal.DeadlineUtils.readTimeoutHeader; import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default; import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; +import static java.util.Objects.requireNonNull; final class DefaultGrpcServerBuilder extends GrpcServerBuilder implements ServerBinder { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultGrpcServerBuilder.class); - private final HttpServerBuilder httpServerBuilder; + private final Supplier httpServerBuilderSupplier; + private GrpcServerBuilder.HttpInitializer initializer = builder -> { + // no-op + }; + private GrpcServerBuilder.HttpInitializer directCallInitializer = builder -> { + // no-op + }; private final ExecutionContextBuilder contextBuilder = new ExecutionContextBuilder() // Make sure we always set a strategy so that ExecutionContextBuilder does not create a strategy which is // not compatible with gRPC. @@ -75,107 +83,113 @@ final class DefaultGrpcServerBuilder extends GrpcServerBuilder implements Server */ @Nullable private Duration defaultTimeout; - private boolean invokedBuild; - DefaultGrpcServerBuilder(final HttpServerBuilder httpServerBuilder) { - this.httpServerBuilder = httpServerBuilder.protocols(h2Default()).allowDropRequestTrailers(true); + DefaultGrpcServerBuilder(final Supplier httpServerBuilderSupplier) { + this.httpServerBuilderSupplier = () -> httpServerBuilderSupplier.get() + .protocols(h2Default()).allowDropRequestTrailers(true); + } + + @Override + public GrpcServerBuilder initializeHttp(final GrpcServerBuilder.HttpInitializer initializer) { + this.initializer = requireNonNull(initializer); + return this; } @Override public GrpcServerBuilder defaultTimeout(Duration defaultTimeout) { - if (invokedBuild) { - throw new IllegalStateException("default timeout cannot be modified after build, create a new builder"); - } this.defaultTimeout = ensurePositive(defaultTimeout, "defaultTimeout"); return this; } @Override public GrpcServerBuilder protocols(final HttpProtocolConfig... protocols) { - httpServerBuilder.protocols(protocols); + directCallInitializer = directCallInitializer.append(builder -> builder.protocols(protocols)); return this; } @Override public GrpcServerBuilder sslConfig(final ServerSslConfig config) { - httpServerBuilder.sslConfig(config); + directCallInitializer = directCallInitializer.append(builder -> builder.sslConfig(config)); return this; } @Override public GrpcServerBuilder sslConfig(final ServerSslConfig defaultConfig, final Map sniMap) { - httpServerBuilder.sslConfig(defaultConfig, sniMap); + directCallInitializer = directCallInitializer.append(builder -> builder.sslConfig(defaultConfig, sniMap)); return this; } @Override public GrpcServerBuilder socketOption(final SocketOption option, final T value) { - httpServerBuilder.socketOption(option, value); + directCallInitializer = directCallInitializer.append(builder -> builder.socketOption(option, value)); return this; } @Override public GrpcServerBuilder listenSocketOption(final SocketOption option, final T value) { - httpServerBuilder.listenSocketOption(option, value); + directCallInitializer = directCallInitializer.append(builder -> builder.listenSocketOption(option, value)); return this; } @Override public GrpcServerBuilder enableWireLogging(final String loggerName, final LogLevel logLevel, final BooleanSupplier logUserData) { - httpServerBuilder.enableWireLogging(loggerName, logLevel, logUserData); + directCallInitializer = directCallInitializer.append(builder -> + builder.enableWireLogging(loggerName, logLevel, logUserData)); return this; } @Override public GrpcServerBuilder transportObserver(final TransportObserver transportObserver) { - httpServerBuilder.transportObserver(transportObserver); + directCallInitializer = directCallInitializer.append(builder -> builder.transportObserver(transportObserver)); return this; } @Override public GrpcServerBuilder lifecycleObserver(final GrpcLifecycleObserver lifecycleObserver) { - httpServerBuilder.lifecycleObserver(new GrpcToHttpLifecycleObserverBridge(lifecycleObserver)); + directCallInitializer = directCallInitializer.append(builder -> builder + .lifecycleObserver(new GrpcToHttpLifecycleObserverBridge(lifecycleObserver))); return this; } @Override public GrpcServerBuilder drainRequestPayloadBody(boolean enable) { - httpServerBuilder.drainRequestPayloadBody(enable); + directCallInitializer = directCallInitializer.append(builder -> builder.drainRequestPayloadBody(enable)); return this; } @Override public GrpcServerBuilder appendConnectionAcceptorFilter(final ConnectionAcceptorFactory factory) { - httpServerBuilder.appendConnectionAcceptorFilter(factory); + directCallInitializer = directCallInitializer.append(builder -> + builder.appendConnectionAcceptorFilter(factory)); return this; } @Override public GrpcServerBuilder executor(final Executor executor) { contextBuilder.executor(executor); - httpServerBuilder.executor(executor); + directCallInitializer = directCallInitializer.append(builder -> builder.executor(executor)); return this; } @Override public GrpcServerBuilder ioExecutor(final IoExecutor ioExecutor) { contextBuilder.ioExecutor(ioExecutor); - httpServerBuilder.ioExecutor(ioExecutor); + directCallInitializer = directCallInitializer.append(builder -> builder.ioExecutor(ioExecutor)); return this; } @Override public GrpcServerBuilder bufferAllocator(final BufferAllocator allocator) { contextBuilder.bufferAllocator(allocator); - httpServerBuilder.bufferAllocator(allocator); + directCallInitializer = directCallInitializer.append(builder -> builder.bufferAllocator(allocator)); return this; } @Override public GrpcServerBuilder executionStrategy(final GrpcExecutionStrategy strategy) { contextBuilder.executionStrategy(strategy); - httpServerBuilder.executionStrategy(strategy); + directCallInitializer = directCallInitializer.append(builder -> builder.executionStrategy(strategy)); return this; } @@ -186,20 +200,26 @@ protected Single doListen(final GrpcServiceFactory servi @Override protected void doAppendHttpServiceFilter(final StreamingHttpServiceFilterFactory factory) { - httpServerBuilder.appendServiceFilter(factory); + directCallInitializer = directCallInitializer.append(builder -> builder.appendServiceFilter(factory)); } @Override protected void doAppendHttpServiceFilter(final Predicate predicate, final StreamingHttpServiceFilterFactory factory) { - httpServerBuilder.appendServiceFilter(predicate, factory); + directCallInitializer = directCallInitializer.append(builder -> + builder.appendServiceFilter(predicate, factory)); } private HttpServerBuilder preBuild() { - if (!invokedBuild) { - doAppendHttpServiceFilter(new TimeoutHttpServiceFilter(grpcDetermineTimeout(defaultTimeout), true)); - } - invokedBuild = true; + final HttpServerBuilder httpServerBuilder = httpServerBuilderSupplier.get(); + + appendCatchAllFilter(httpServerBuilder); + + directCallInitializer.initialize(httpServerBuilder); + initializer.initialize(httpServerBuilder); + + httpServerBuilder.appendServiceFilter( + new TimeoutHttpServiceFilter(grpcDetermineTimeout(defaultTimeout), true)); return httpServerBuilder; } diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcServers.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcServers.java index 338b24be82..09e487483e 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcServers.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcServers.java @@ -36,7 +36,7 @@ private GrpcServers() { * @return a new builder. */ public static GrpcServerBuilder forPort(int port) { - return new DefaultGrpcServerBuilder(HttpServers.forPort(port)); + return new DefaultGrpcServerBuilder(() -> HttpServers.forPort(port)); } /** @@ -46,6 +46,6 @@ public static GrpcServerBuilder forPort(int port) { * @return a new builder. */ public static GrpcServerBuilder forAddress(SocketAddress socketAddress) { - return new DefaultGrpcServerBuilder(HttpServers.forAddress(socketAddress)); + return new DefaultGrpcServerBuilder(() -> HttpServers.forAddress(socketAddress)); } } diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java index 6934ea6700..e7ca8b9f89 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java @@ -261,7 +261,9 @@ private void setUp(TestMode testMode, GrpcExecutionStrategy serverStrategy, throw new IllegalArgumentException("Unknown mode: " + testMode); } this.requestPublisher = requestPublisher; - serverContext = GrpcServers.forAddress(localAddress(0)).appendHttpServiceFilter(serviceFilterFactory) + final StreamingHttpServiceFilterFactory filterFactory = serviceFilterFactory; + serverContext = GrpcServers.forAddress(localAddress(0)) + .initializeHttp(builder -> builder.appendServiceFilter(filterFactory)) .executionStrategy(serverStrategy).listenAndAwait(serviceFactory); final StreamingHttpClientFilterFactory pickedClientFilterFactory = clientFilterFactory; GrpcClientBuilder clientBuilder = diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java index c8f3644cef..ac7505ac07 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java @@ -60,8 +60,8 @@ class GrpcClientRequiresTrailersTest { private BlockingTesterClient client; private void setUp(boolean hasTrailers) throws Exception { - serverContext = GrpcServers.forAddress(localAddress(0)) - .appendHttpServiceFilter(service -> new StreamingHttpServiceFilter(service) { + serverContext = GrpcServers.forAddress(localAddress(0)).initializeHttp(builder -> builder + .appendServiceFilter(service -> new StreamingHttpServiceFilter(service) { @Override public Single handle( final HttpServiceContext ctx, final StreamingHttpRequest request, @@ -78,7 +78,7 @@ protected HttpHeaders payloadComplete(final HttpHeaders trailers) { return resp; }); } - }) + })) .listenAndAwait(new TesterProto.Tester.TesterService() { @Override public Single testRequestStream(final GrpcServiceContext ctx, diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java index f67f82f67a..5ab8ff916d 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java @@ -48,8 +48,8 @@ final class GrpcClientValidatesContentTypeTest { private TesterProto.Tester.BlockingTesterClient client; void setUp(boolean withCharset) throws Exception { - serverContext = GrpcServers.forAddress(localAddress(0)) - .appendHttpServiceFilter(service -> new StreamingHttpServiceFilter(service) { + serverContext = GrpcServers.forAddress(localAddress(0)).initializeHttp(builder -> builder + .appendServiceFilter(service -> new StreamingHttpServiceFilter(service) { @Override public Single handle( final HttpServiceContext ctx, final StreamingHttpRequest request, @@ -60,7 +60,7 @@ public Single handle( return resp; }); } - }) + })) .listenAndAwait(new TesterProto.Tester.TesterService() { @Override public Single testRequestStream(final GrpcServiceContext ctx, diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java index 3ffe257e99..227c2cdaa0 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java @@ -143,8 +143,11 @@ private void setUp(boolean error) throws Exception { server = forAddress(localAddress(0)) .ioExecutor(SERVER_CTX.ioExecutor()) .executor(SERVER_CTX.executor()) - .enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true) - .protocols(h2().enableFrameLogging("servicetalk-tests-h2-frame-logger", TRACE, () -> true).build()) + .initializeHttp(builder -> builder + .enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true) + .protocols(h2().enableFrameLogging("servicetalk-tests-h2-frame-logger", TRACE, () -> true) + .build()) + ) .lifecycleObserver(combine(serverLifecycleObserver, LOGGING)) .listenAndAwait(new EchoService(error)); diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java index 53cf36c41d..ad03ef70a6 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java @@ -67,7 +67,7 @@ private void setUp(HttpProtocolVersion httpProtocol, boolean streamingService) t expectedValue = "gRPC over " + httpProtocol; serverContext = GrpcServers.forAddress(localAddress(0)) - .protocols(protocolConfig(httpProtocol)) + .initializeHttp(builder -> builder.protocols(protocolConfig(httpProtocol))) .listenAndAwait(streamingService ? new ServiceFactory(new TesterServiceImpl()) : new ServiceFactory(new BlockingTesterServiceImpl())); diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcSslAndNonSslConnectionsTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcSslAndNonSslConnectionsTest.java index 3a1907f14c..d70b3196a0 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcSslAndNonSslConnectionsTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcSslAndNonSslConnectionsTest.java @@ -55,9 +55,7 @@ private ServerContext nonSecureGrpcServer() throws Exception { private ServerContext secureGrpcServer() throws Exception { return GrpcServers.forAddress(localAddress(0)) - .sslConfig( - trustedServerConfig() - ) + .initializeHttp(builder -> builder.sslConfig(trustedServerConfig())) .listenAndAwait(serviceFactory()); } @@ -155,10 +153,10 @@ void secureClientToSecureServerWithoutPeerHostSucceeds() throws Exception { @Test void noSniClientDefaultServerFallbackSuccess() throws Exception { try (ServerContext serverContext = GrpcServers.forAddress(localAddress(0)) - .sslConfig( + .initializeHttp(builder -> builder.sslConfig( trustedServerConfig(), singletonMap(getLoopbackAddress().getHostName(), untrustedServerConfig()) - ) + )) .listenAndAwait(serviceFactory()); BlockingTesterClient client = GrpcClients.forAddress( getLoopbackAddress().getHostName(), serverHostAndPort(serverContext).port()) @@ -177,10 +175,10 @@ void noSniClientDefaultServerFallbackSuccess() throws Exception { @Test void noSniClientDefaultServerFallbackFailExpected() throws Exception { try (ServerContext serverContext = GrpcServers.forAddress(localAddress(0)) - .sslConfig( + .initializeHttp(builder -> builder.sslConfig( untrustedServerConfig(), singletonMap(getLoopbackAddress().getHostName(), trustedServerConfig()) - ) + )) .listenAndAwait(serviceFactory()); BlockingTesterClient client = GrpcClients.forAddress( getLoopbackAddress().getHostName(), serverHostAndPort(serverContext).port()) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java index c0ffb14554..d9390ff3a6 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/KeepAliveTest.java @@ -84,15 +84,17 @@ private void setUp(final boolean keepAlivesFromClient, final Duration idleTimeout) throws Exception { this.idleTimeoutMillis = idleTimeout.toMillis(); GrpcServerBuilder serverBuilder = forAddress(localAddress(0)) - .executor(SERVER_CTX.executor()) - .ioExecutor(SERVER_CTX.ioExecutor()) - .executionStrategy(defaultStrategy()); - if (!keepAlivesFromClient) { - serverBuilder.protocols(h2Config(keepAliveIdleFor)); - } else { - serverBuilder.socketOption(IDLE_TIMEOUT, idleTimeoutMillis) - .protocols(h2Config(null)); - } + .executor(SERVER_CTX.executor()) + .ioExecutor(SERVER_CTX.ioExecutor()) + .executionStrategy(defaultStrategy()) + .initializeHttp(builder -> { + if (!keepAlivesFromClient) { + builder.protocols(h2Config(keepAliveIdleFor)); + } else { + builder.socketOption(IDLE_TIMEOUT, idleTimeoutMillis) + .protocols(h2Config(null)); + } + }); ctx = serverBuilder.listenAndAwait(new ServiceFactory(new InfiniteStreamsService())); GrpcClientBuilder clientBuilder = GrpcClients.forAddress(serverHostAndPort(ctx)) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java index 54da34f3ee..72a54bafe4 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java @@ -1063,26 +1063,29 @@ private static GrpcServerBuilder serviceTalkServerBuilder(final ErrorMode errorM @Nullable final Duration timeout) { final GrpcServerBuilder serverBuilder = GrpcServers.forAddress(localAddress(0)) - .appendHttpServiceFilter(service -> new StreamingHttpServiceFilter(service) { - @Override - public Single handle(final HttpServiceContext ctx, - final StreamingHttpRequest req, - final StreamingHttpResponseFactory resFactory) { - if (errorMode == ErrorMode.SIMPLE_IN_SERVER_FILTER) { - throwGrpcStatusException(); - } else if (errorMode == ErrorMode.STATUS_IN_SERVER_FILTER) { - throwGrpcStatusExceptionWithStatus(); + .initializeHttp(builder -> { + builder.appendServiceFilter(service -> new StreamingHttpServiceFilter(service) { + @Override + public Single handle(final HttpServiceContext ctx, + final StreamingHttpRequest req, + final StreamingHttpResponseFactory resFactory) { + if (errorMode == ErrorMode.SIMPLE_IN_SERVER_FILTER) { + throwGrpcStatusException(); + } else if (errorMode == ErrorMode.STATUS_IN_SERVER_FILTER) { + throwGrpcStatusExceptionWithStatus(); + } + return delegate().handle(ctx, req, resFactory); } - return delegate().handle(ctx, req, resFactory); + }); + if (ssl) { + builder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, + DefaultTestCerts::loadServerKey).provider(OPENSSL).build()); } }); if (null != timeout) { serverBuilder.defaultTimeout(timeout); } - return ssl ? - serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, - DefaultTestCerts::loadServerKey).provider(OPENSSL).build()) : - serverBuilder; + return serverBuilder; } private static TestServerContext serviceTalkServerBlocking(final ErrorMode errorMode, final boolean ssl, diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/TrailersOnlyErrorTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/TrailersOnlyErrorTest.java index 3e21f9625e..a6d50dd87a 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/TrailersOnlyErrorTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/TrailersOnlyErrorTest.java @@ -151,14 +151,14 @@ void testServiceFilterThrows() throws Exception { final TesterService service = mockTesterService(); final GrpcServerBuilder serverBuilder = GrpcServers.forAddress(localAddress(0)) - .appendHttpServiceFilter(svc -> new StreamingHttpServiceFilter(svc) { + .initializeHttp(builder -> builder.appendServiceFilter(svc -> new StreamingHttpServiceFilter(svc) { @Override public Single handle( final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { throw DELIBERATE_EXCEPTION; } - }); + })); try (ServerContext serverContext = serverBuilder.listenAndAwait(new Tester.ServiceFactory(service))) {