Skip to content

Commit

Permalink
Untangling GrpcServerBuilder from HttpServerBuilder (#1861)
Browse files Browse the repository at this point in the history
Motivation:

The methods in `GrpcServerBuilder` are often only delegating to the
underlying HTTP transport builder. It will be beneficial if the API can
be simplified and have only gRPC specific settings. The first step is to
deprecate the methods which can be called directly on the underlying
`HttpServerBuilder` and to introduce a method for configuring that
builder.

Modifications:

- Introduce a `GrpcServerBuilder.HttpInitializer` interface and a
  corresponding `GrpcServerBuilder#initialize` method which accepts the
  initializer for the underlying `HttpServerBuilder` instance,
- Deprecate methods which have only served the purpose of delegating to
  the underlying HTTP transport builder,
- Added default implementations for deprecated methods,
- Made the builder reconfigurable after first build,
- Clarified javadoc for lifecycleObserver overriding by initializer,
- Refactor tests to use the new approach to showcase it works.

Result:

The API of `GrpcServerBuilder` is prepared for removal of the deprecated
methods in 0.42 release and the API is more specific, while maintaining
the same level of configurability.
  • Loading branch information
Dariusz Jedrzejczyk committed Oct 13, 2021
1 parent f249032 commit f8aaa8d
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 96 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,34 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.net.SocketOption;
import java.time.Duration;
import java.util.Map;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;

import static io.servicetalk.grpc.api.GrpcExecutionStrategies.defaultStrategy;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_DEADLINE_KEY;
import static io.servicetalk.grpc.internal.DeadlineUtils.readTimeoutHeader;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;
import static java.util.Objects.requireNonNull;

final class DefaultGrpcServerBuilder extends GrpcServerBuilder implements ServerBinder {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultGrpcServerBuilder.class);

private final HttpServerBuilder httpServerBuilder;
private final Supplier<HttpServerBuilder> httpServerBuilderSupplier;
private GrpcServerBuilder.HttpInitializer initializer = builder -> {
// no-op
};
private GrpcServerBuilder.HttpInitializer directCallInitializer = builder -> {
// no-op
};
private final ExecutionContextBuilder contextBuilder = new ExecutionContextBuilder()
// Make sure we always set a strategy so that ExecutionContextBuilder does not create a strategy which is
// not compatible with gRPC.
Expand All @@ -77,121 +87,126 @@ final class DefaultGrpcServerBuilder extends GrpcServerBuilder implements Server
*/
@Nullable
private Duration defaultTimeout;
private boolean invokedBuild;

DefaultGrpcServerBuilder(final HttpServerBuilder httpServerBuilder) {
this.httpServerBuilder = httpServerBuilder.protocols(h2Default()).allowDropRequestTrailers(true);
DefaultGrpcServerBuilder(final Supplier<HttpServerBuilder> httpServerBuilderSupplier) {
this.httpServerBuilderSupplier = () -> httpServerBuilderSupplier.get()
.protocols(h2Default()).allowDropRequestTrailers(true);
}

@Override
public GrpcServerBuilder initializeHttp(final GrpcServerBuilder.HttpInitializer initializer) {
this.initializer = requireNonNull(initializer);
return this;
}

@Override
public GrpcServerBuilder defaultTimeout(Duration defaultTimeout) {
if (invokedBuild) {
throw new IllegalStateException("default timeout cannot be modified after build, create a new builder");
}
this.defaultTimeout = ensurePositive(defaultTimeout, "defaultTimeout");
return this;
}

@Override
public GrpcServerBuilder protocols(final HttpProtocolConfig... protocols) {
httpServerBuilder.protocols(protocols);
directCallInitializer = directCallInitializer.append(builder -> builder.protocols(protocols));
return this;
}

@Deprecated
@Override
public GrpcServerSecurityConfigurator secure() {
HttpServerSecurityConfigurator secure = httpServerBuilder.secure();
return new DefaultGrpcServerSecurityConfigurator(secure, this);
return new LazyGrpcServerSecurityConfigurator();
}

@Override
public GrpcServerBuilder sslConfig(final ServerSslConfig config) {
httpServerBuilder.sslConfig(config);
directCallInitializer = directCallInitializer.append(builder -> builder.sslConfig(config));
return this;
}

@Override
public GrpcServerBuilder sslConfig(final ServerSslConfig defaultConfig, final Map<String, ServerSslConfig> sniMap) {
httpServerBuilder.sslConfig(defaultConfig, sniMap);
directCallInitializer = directCallInitializer.append(builder -> builder.sslConfig(defaultConfig, sniMap));
return this;
}

@Override
public <T> GrpcServerBuilder socketOption(final SocketOption<T> option, final T value) {
httpServerBuilder.socketOption(option, value);
directCallInitializer = directCallInitializer.append(builder -> builder.socketOption(option, value));
return this;
}

@Override
public <T> GrpcServerBuilder listenSocketOption(final SocketOption<T> option, final T value) {
httpServerBuilder.listenSocketOption(option, value);
directCallInitializer = directCallInitializer.append(builder -> builder.listenSocketOption(option, value));
return this;
}

@Deprecated
@Override
public GrpcServerBuilder enableWireLogging(final String loggerName) {
httpServerBuilder.enableWireLogging(loggerName);
directCallInitializer = directCallInitializer.append(builder -> builder.enableWireLogging(loggerName));
return this;
}

@Override
public GrpcServerBuilder enableWireLogging(final String loggerName, final LogLevel logLevel,
final BooleanSupplier logUserData) {
httpServerBuilder.enableWireLogging(loggerName, logLevel, logUserData);
directCallInitializer = directCallInitializer.append(builder ->
builder.enableWireLogging(loggerName, logLevel, logUserData));
return this;
}

@Override
public GrpcServerBuilder transportObserver(final TransportObserver transportObserver) {
httpServerBuilder.transportObserver(transportObserver);
directCallInitializer = directCallInitializer.append(builder -> builder.transportObserver(transportObserver));
return this;
}

@Override
public GrpcServerBuilder lifecycleObserver(final GrpcLifecycleObserver lifecycleObserver) {
httpServerBuilder.lifecycleObserver(new GrpcToHttpLifecycleObserverBridge(lifecycleObserver));
directCallInitializer = directCallInitializer.append(builder -> builder
.lifecycleObserver(new GrpcToHttpLifecycleObserverBridge(lifecycleObserver)));
return this;
}

@Override
public GrpcServerBuilder drainRequestPayloadBody(boolean enable) {
httpServerBuilder.drainRequestPayloadBody(enable);
directCallInitializer = directCallInitializer.append(builder -> builder.drainRequestPayloadBody(enable));
return this;
}

@Override
public GrpcServerBuilder appendConnectionAcceptorFilter(final ConnectionAcceptorFactory factory) {
httpServerBuilder.appendConnectionAcceptorFilter(factory);
directCallInitializer = directCallInitializer.append(builder ->
builder.appendConnectionAcceptorFilter(factory));
return this;
}

@Override
public GrpcServerBuilder ioExecutor(final IoExecutor ioExecutor) {
contextBuilder.ioExecutor(ioExecutor);
httpServerBuilder.ioExecutor(ioExecutor);
public GrpcServerBuilder executor(final Executor executor) {
contextBuilder.executor(executor);
directCallInitializer = directCallInitializer.append(builder -> builder.executor(executor));
return this;
}

@Override
public GrpcServerBuilder executor(final Executor executor) {
contextBuilder.executor(executor);
httpServerBuilder.executor(executor);
public GrpcServerBuilder ioExecutor(final IoExecutor ioExecutor) {
contextBuilder.ioExecutor(ioExecutor);
directCallInitializer = directCallInitializer.append(builder -> builder.ioExecutor(ioExecutor));
return this;
}

@Override
public GrpcServerBuilder bufferAllocator(final BufferAllocator allocator) {
contextBuilder.bufferAllocator(allocator);
httpServerBuilder.bufferAllocator(allocator);
directCallInitializer = directCallInitializer.append(builder -> builder.bufferAllocator(allocator));
return this;
}

@Override
public GrpcServerBuilder executionStrategy(final GrpcExecutionStrategy strategy) {
contextBuilder.executionStrategy(strategy);
httpServerBuilder.executionStrategy(strategy);
directCallInitializer = directCallInitializer.append(builder -> builder.executionStrategy(strategy));
return this;
}

Expand All @@ -202,20 +217,26 @@ protected Single<ServerContext> doListen(final GrpcServiceFactory<?, ?, ?> servi

@Override
protected void doAppendHttpServiceFilter(final StreamingHttpServiceFilterFactory factory) {
httpServerBuilder.appendServiceFilter(factory);
directCallInitializer = directCallInitializer.append(builder -> builder.appendServiceFilter(factory));
}

@Override
protected void doAppendHttpServiceFilter(final Predicate<StreamingHttpRequest> predicate,
final StreamingHttpServiceFilterFactory factory) {
httpServerBuilder.appendServiceFilter(predicate, factory);
directCallInitializer = directCallInitializer.append(builder ->
builder.appendServiceFilter(predicate, factory));
}

private HttpServerBuilder preBuild() {
if (!invokedBuild) {
doAppendHttpServiceFilter(new TimeoutHttpServiceFilter(grpcDetermineTimeout(defaultTimeout), true));
}
invokedBuild = true;
final HttpServerBuilder httpServerBuilder = httpServerBuilderSupplier.get();

appendCatchAllFilter(httpServerBuilder);

directCallInitializer.initialize(httpServerBuilder);
initializer.initialize(httpServerBuilder);

httpServerBuilder.appendServiceFilter(
new TimeoutHttpServiceFilter(grpcDetermineTimeout(defaultTimeout), true));
return httpServerBuilder;
}

Expand Down Expand Up @@ -278,4 +299,88 @@ public Single<ServerContext> bindBlocking(final BlockingHttpService service) {
public Single<ServerContext> bindBlockingStreaming(final BlockingStreamingHttpService service) {
return preBuild().listenBlockingStreaming(service);
}

private class LazyGrpcServerSecurityConfigurator implements GrpcServerSecurityConfigurator {

@Nullable
private HttpServerSecurityConfigurator delegate;

private HttpInitializer initializer = builder -> delegate = requireNonNull(builder.secure());

@Override
public GrpcServerSecurityConfigurator trustManager(final Supplier<InputStream> trustCertChainSupplier) {
initializer = initializer.append(builder -> delegate.trustManager(trustCertChainSupplier));
return this;
}

@Override
public GrpcServerSecurityConfigurator trustManager(final TrustManagerFactory trustManagerFactory) {
initializer = initializer.append(builder -> delegate.trustManager(trustManagerFactory));
return this;
}

@Override
public GrpcServerSecurityConfigurator protocols(final String... protocols) {
initializer = initializer.append(builder -> delegate.protocols(protocols));
return this;
}

@Override
public GrpcServerSecurityConfigurator ciphers(final Iterable<String> ciphers) {
initializer = initializer.append(builder -> delegate.ciphers(ciphers));
return this;
}

@Override
public GrpcServerSecurityConfigurator sessionCacheSize(final long sessionCacheSize) {
initializer = initializer.append(builder -> delegate.sessionCacheSize(sessionCacheSize));
return this;
}

@Override
public GrpcServerSecurityConfigurator sessionTimeout(final long sessionTimeout) {
initializer = initializer.append(builder -> delegate.sessionTimeout(sessionTimeout));
return this;
}

@Override
public GrpcServerSecurityConfigurator provider(final SslProvider provider) {
initializer = initializer.append(builder -> delegate.provider(provider));
return this;
}

@Override
public GrpcServerSecurityConfigurator clientAuth(final ClientAuth clientAuth) {
initializer = initializer.append(builder -> delegate.clientAuth(clientAuth));
return this;
}

@Override
public GrpcServerBuilder commit(final KeyManagerFactory keyManagerFactory) {
initializer = initializer.append(builder -> delegate.commit(keyManagerFactory));
DefaultGrpcServerBuilder.this.directCallInitializer =
DefaultGrpcServerBuilder.this.directCallInitializer.append(initializer);
return DefaultGrpcServerBuilder.this;
}

@Override
public GrpcServerBuilder commit(final Supplier<InputStream> keyCertChainSupplier,
final Supplier<InputStream> keySupplier) {
initializer = initializer.append(builder -> delegate.commit(keyCertChainSupplier, keySupplier));
DefaultGrpcServerBuilder.this.directCallInitializer =
DefaultGrpcServerBuilder.this.directCallInitializer.append(initializer);
return DefaultGrpcServerBuilder.this;
}

@Override
public GrpcServerBuilder commit(final Supplier<InputStream> keyCertChainSupplier,
final Supplier<InputStream> keySupplier,
final String keyPassword) {
initializer = initializer.append(builder ->
delegate.commit(keyCertChainSupplier, keySupplier, keyPassword));
DefaultGrpcServerBuilder.this.directCallInitializer =
DefaultGrpcServerBuilder.this.directCallInitializer.append(initializer);
return DefaultGrpcServerBuilder.this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private GrpcServers() {
* @return a new builder.
*/
public static GrpcServerBuilder forPort(int port) {
return new DefaultGrpcServerBuilder(HttpServers.forPort(port));
return new DefaultGrpcServerBuilder(() -> HttpServers.forPort(port));
}

/**
Expand All @@ -46,6 +46,6 @@ public static GrpcServerBuilder forPort(int port) {
* @return a new builder.
*/
public static GrpcServerBuilder forAddress(SocketAddress socketAddress) {
return new DefaultGrpcServerBuilder(HttpServers.forAddress(socketAddress));
return new DefaultGrpcServerBuilder(() -> HttpServers.forAddress(socketAddress));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ private void setUp(TestMode testMode, GrpcExecutionStrategy serverStrategy,
throw new IllegalArgumentException("Unknown mode: " + testMode);
}
this.requestPublisher = requestPublisher;
serverContext = GrpcServers.forAddress(localAddress(0)).appendHttpServiceFilter(serviceFilterFactory)
final StreamingHttpServiceFilterFactory filterFactory = serviceFilterFactory;
serverContext = GrpcServers.forAddress(localAddress(0))
.initializeHttp(builder -> builder.appendServiceFilter(filterFactory))
.executionStrategy(serverStrategy).listenAndAwait(serviceFactory);
final StreamingHttpClientFilterFactory pickedClientFilterFactory = clientFilterFactory;
GrpcClientBuilder<HostAndPort, InetSocketAddress> clientBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ private void setUp(boolean error) throws Exception {
server = forAddress(localAddress(0))
.ioExecutor(SERVER_CTX.ioExecutor())
.executor(SERVER_CTX.executor())
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true)
.protocols(h2().enableFrameLogging("servicetalk-tests-h2-frame-logger", TRACE, () -> true).build())
.initializeHttp(builder -> builder
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true)
.protocols(h2().enableFrameLogging("servicetalk-tests-h2-frame-logger", TRACE, () -> true)
.build())
)
.lifecycleObserver(combine(serverLifecycleObserver, LOGGING))
.listenAndAwait(new EchoService(error));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private void setUp(HttpProtocolVersion httpProtocol, boolean streamingService) t
expectedValue = "gRPC over " + httpProtocol;

serverContext = GrpcServers.forAddress(localAddress(0))
.protocols(protocolConfig(httpProtocol))
.initializeHttp(builder -> builder.protocols(protocolConfig(httpProtocol)))
.listenAndAwait(streamingService ?
new ServiceFactory(new TesterServiceImpl()) :
new ServiceFactory(new BlockingTesterServiceImpl()));
Expand Down
Loading

0 comments on commit f8aaa8d

Please sign in to comment.