Skip to content

Commit

Permalink
Untangling GrpcClientBuilder from SingleAddressHttpClientBuilder (#…
Browse files Browse the repository at this point in the history
…1867)

Motivation:

The methods in `GrpcClientBuilder` are often only delegating to the
underlying HTTP transport builder. It will be beneficial if the API can
be simplified and have only gRPC specific settings. The first step is to
deprecate the methods which can be called directly on the underlying
`SingleAddressHttpClientBuilder` and to introduce a method for configuring that
builder.

Modifications:

- Introduce a `GrpcClientBuilder.HttpInitializer` interface and a
  corresponding `GrpcClientBuilder#initializeHttp` method which accepts the
  initializer for the underlying `SingleAddressHttpClientBuilder` instance,
- Deprecate methods which have only served the purpose of delegating to
  the underlying HTTP transport builder,
- Made builder reusable and added implementations for deprecated methods,
- Refactor tests to use the new approach to showcase it works.

Result:

The API of `GrpcClientBuilder` is prepared for removal of the deprecated
methods in 0.42 release and the API is more specific, while maintaining
the same level of configurability.
  • Loading branch information
Dariusz Jedrzejczyk authored Oct 11, 2021
1 parent 212aa6b commit 3c9bd5c
Show file tree
Hide file tree
Showing 17 changed files with 387 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public static void main(String... args) throws Exception {
try (BlockingGreeterClient client = GrpcClients.forAddress("localhost", 8080)
// Append this filter first for most cases to maximize visibility!
// See javadocs on GrpcLifecycleObserverRequesterFilter for more details on filter ordering.
.appendHttpClientFilter(new GrpcLifecycleObserverRequesterFilter(
GrpcLifecycleObservers.logging("servicetalk-examples-grpc-observer-logger", TRACE)))
.initializeHttp(builder -> builder.appendClientFilter(new GrpcLifecycleObserverRequesterFilter(
GrpcLifecycleObservers.logging("servicetalk-examples-grpc-observer-logger", TRACE))))
.buildBlocking(new ClientFactory())) {
client.sayHello(HelloRequest.newBuilder().setName("LifecycleObserver").build());
// Ignore the response for this example. See logs for GrpcLifecycleObserver results.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@
* offload publications to another {@link Executor} <b>after</b> capturing timing of events. If blocking code is
* executed inside callbacks without offloading, it will negatively impact {@link IoExecutor}.
* <p>
* To install this observer for the server use {@link GrpcServerBuilder#lifecycleObserver(GrpcLifecycleObserver)}, for
* the client use {@link GrpcClientBuilder#appendHttpClientFilter(StreamingHttpClientFilterFactory)} with
* {@code io.servicetalk.grpc.netty.GrpcLifecycleObserverRequesterFilter}.
* To install this observer for the server use {@link GrpcServerBuilder#lifecycleObserver(GrpcLifecycleObserver)}.
* For the client use {@code io.servicetalk.grpc.netty.GrpcLifecycleObserverRequesterFilter} as argument to
* {@link io.servicetalk.http.api.SingleAddressHttpClientBuilder#appendClientFilter(StreamingHttpClientFilterFactory)}
* which can be configured using {@link GrpcClientBuilder#initializeHttp(GrpcClientBuilder.HttpInitializer)}.
*/
@FunctionalInterface
public interface GrpcLifecycleObserver extends HttpLifecycleObserver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.grpc.api.GrpcClientBuilder;
import io.servicetalk.grpc.api.GrpcClientCallFactory;
import io.servicetalk.grpc.api.GrpcExecutionStrategy;
import io.servicetalk.grpc.api.GrpcStatusException;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpLoadBalancerFactory;
import io.servicetalk.http.api.HttpProtocolConfig;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.utils.TimeoutFromRequest;
import io.servicetalk.http.utils.TimeoutHttpRequesterFilter;
import io.servicetalk.logging.api.LogLevel;
Expand All @@ -44,13 +49,17 @@
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.grpc.api.GrpcStatus.fromThrowable;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_MAX_TIMEOUT;
import static io.servicetalk.grpc.internal.DeadlineUtils.readTimeoutHeader;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;
import static io.servicetalk.utils.internal.DurationUtils.isInfinite;
import static java.util.Objects.requireNonNull;

final class DefaultGrpcClientBuilder<U, R> extends GrpcClientBuilder<U, R> {

Expand Down Expand Up @@ -78,163 +87,193 @@ public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strat

@Nullable
private Duration defaultTimeout;
private boolean invokedBuild;
private HttpInitializer<U, R> httpInitializer = builder -> {
// no-op
};

private HttpInitializer<U, R> directHttpInitializer = builder -> {
// no-op
};

private final SingleAddressHttpClientBuilder<U, R> httpClientBuilder;
private final Supplier<SingleAddressHttpClientBuilder<U, R>> httpClientBuilderSupplier;

DefaultGrpcClientBuilder(final SingleAddressHttpClientBuilder<U, R> httpClientBuilder) {
this.httpClientBuilder = httpClientBuilder.protocols(h2Default());
DefaultGrpcClientBuilder(final Supplier<SingleAddressHttpClientBuilder<U, R>> httpClientBuilderSupplier) {
this.httpClientBuilderSupplier = httpClientBuilderSupplier;
}

@Override
public GrpcClientBuilder<U, R> initializeHttp(final GrpcClientBuilder.HttpInitializer<U, R> initializer) {
httpInitializer = requireNonNull(initializer);
return this;
}

@Override
public GrpcClientBuilder<U, R> defaultTimeout(Duration defaultTimeout) {
if (invokedBuild) {
throw new IllegalStateException("default timeout cannot be modified after build, create a new builder");
}
this.defaultTimeout = ensurePositive(defaultTimeout, "defaultTimeout");
return this;
}

@Override
public GrpcClientBuilder<U, R> executor(final Executor executor) {
httpClientBuilder.executor(executor);
directHttpInitializer.append(builder -> builder.executor(executor));
return this;
}

@Override
public GrpcClientBuilder<U, R> ioExecutor(final IoExecutor ioExecutor) {
httpClientBuilder.ioExecutor(ioExecutor);
directHttpInitializer.append(builder -> builder.ioExecutor(ioExecutor));
return this;
}

@Override
public GrpcClientBuilder<U, R> bufferAllocator(final BufferAllocator allocator) {
httpClientBuilder.bufferAllocator(allocator);
directHttpInitializer.append(builder -> builder.bufferAllocator(allocator));
return this;
}

@Override
public GrpcClientBuilder<U, R> executionStrategy(final GrpcExecutionStrategy strategy) {
httpClientBuilder.executionStrategy(strategy);
directHttpInitializer.append(builder -> builder.executionStrategy(strategy));
return this;
}

@Override
public <T> GrpcClientBuilder<U, R> socketOption(final SocketOption<T> option, final T value) {
httpClientBuilder.socketOption(option, value);
directHttpInitializer.append(builder -> builder.socketOption(option, value));
return this;
}

@Override
public GrpcClientBuilder<U, R> enableWireLogging(final String loggerName, final LogLevel logLevel,
final BooleanSupplier logUserData) {
httpClientBuilder.enableWireLogging(loggerName, logLevel, logUserData);
directHttpInitializer.append(builder -> builder.enableWireLogging(loggerName, logLevel, logUserData));
return this;
}

@Override
public GrpcClientBuilder<U, R> protocols(HttpProtocolConfig... protocols) {
httpClientBuilder.protocols(protocols);
directHttpInitializer.append(builder -> builder.protocols(protocols));
return this;
}

@Override
public GrpcClientBuilder<U, R> appendConnectionFactoryFilter(
final ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> factory) {
httpClientBuilder.appendConnectionFactoryFilter(factory);
directHttpInitializer.append(builder -> builder.appendConnectionFactoryFilter(factory));
return this;
}

@Override
public GrpcClientBuilder<U, R> appendConnectionFilter(
final StreamingHttpConnectionFilterFactory factory) {
httpClientBuilder.appendConnectionFilter(factory);
directHttpInitializer.append(builder -> builder.appendConnectionFilter(factory));
return this;
}

@Override
public GrpcClientBuilder<U, R> appendConnectionFilter(
final Predicate<StreamingHttpRequest> predicate, final StreamingHttpConnectionFilterFactory factory) {
httpClientBuilder.appendConnectionFilter(predicate, factory);
directHttpInitializer.append(builder -> builder.appendConnectionFilter(predicate, factory));
return this;
}

@Override
public GrpcClientBuilder<U, R> sslConfig(final ClientSslConfig sslConfig) {
httpClientBuilder.sslConfig(sslConfig);
directHttpInitializer.append(builder -> builder.sslConfig(sslConfig));
return this;
}

@Override
public GrpcClientBuilder<U, R> inferPeerHost(final boolean shouldInfer) {
httpClientBuilder.inferPeerHost(shouldInfer);
directHttpInitializer.append(builder -> builder.inferPeerHost(shouldInfer));
return this;
}

@Override
public GrpcClientBuilder<U, R> inferPeerPort(final boolean shouldInfer) {
httpClientBuilder.inferPeerPort(shouldInfer);
directHttpInitializer.append(builder -> builder.inferPeerPort(shouldInfer));
return this;
}

@Override
public GrpcClientBuilder<U, R> inferSniHostname(final boolean shouldInfer) {
httpClientBuilder.inferSniHostname(shouldInfer);
directHttpInitializer.append(builder -> builder.inferSniHostname(shouldInfer));
return this;
}

@Override
public GrpcClientBuilder<U, R> autoRetryStrategy(
final AutoRetryStrategyProvider autoRetryStrategyProvider) {

httpClientBuilder.autoRetryStrategy(autoRetryStrategyProvider);
directHttpInitializer.append(builder -> builder.autoRetryStrategy(autoRetryStrategyProvider));
return this;
}

@Override
public GrpcClientBuilder<U, R> unresolvedAddressToHost(
final Function<U, CharSequence> unresolvedAddressToHostFunction) {
httpClientBuilder.unresolvedAddressToHost(unresolvedAddressToHostFunction);
directHttpInitializer.append(builder -> builder.unresolvedAddressToHost(unresolvedAddressToHostFunction));
return this;
}

@Override
public GrpcClientBuilder<U, R> hostHeaderFallback(final boolean enable) {
httpClientBuilder.hostHeaderFallback(enable);
directHttpInitializer.append(builder -> builder.hostHeaderFallback(enable));
return this;
}

@Override
public GrpcClientBuilder<U, R> serviceDiscoverer(
final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer) {
httpClientBuilder.serviceDiscoverer(serviceDiscoverer);
directHttpInitializer.append(builder -> builder.serviceDiscoverer(serviceDiscoverer));
return this;
}

@Override
public GrpcClientBuilder<U, R> loadBalancerFactory(final HttpLoadBalancerFactory<R> loadBalancerFactory) {
httpClientBuilder.loadBalancerFactory(loadBalancerFactory);
directHttpInitializer.append(builder -> builder.loadBalancerFactory(loadBalancerFactory));
return this;
}

@Override
protected GrpcClientCallFactory newGrpcClientCallFactory() {
SingleAddressHttpClientBuilder<U, R> builder = httpClientBuilderSupplier.get().protocols(h2Default());
appendCatchAllFilter(builder);
directHttpInitializer.initialize(builder);
httpInitializer.initialize(builder);
builder.appendClientFilter(new TimeoutHttpRequesterFilter(GRPC_TIMEOUT_REQHDR, true));
Duration timeout = isInfinite(defaultTimeout, GRPC_MAX_TIMEOUT) ? null : defaultTimeout;
if (!invokedBuild) {
httpClientBuilder.appendClientFilter(new TimeoutHttpRequesterFilter(GRPC_TIMEOUT_REQHDR, true));
}
invokedBuild = true;
return GrpcClientCallFactory.from(httpClientBuilder.buildStreaming(), timeout);
return GrpcClientCallFactory.from(builder.buildStreaming(), timeout);
}

@Override
protected void doAppendHttpClientFilter(final StreamingHttpClientFilterFactory factory) {
httpClientBuilder.appendClientFilter(factory);
directHttpInitializer.append(builder -> builder.appendClientFilter(factory));
}

@Override
public void doAppendHttpClientFilter(final Predicate<StreamingHttpRequest> predicate,
final StreamingHttpClientFilterFactory factory) {
httpClientBuilder.appendClientFilter(predicate, factory);
directHttpInitializer.append(builder -> builder.appendClientFilter(predicate, factory));
}

private static <U, R> void appendCatchAllFilter(SingleAddressHttpClientBuilder<U, R> builder) {
builder.appendClientFilter(client -> new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
final Single<StreamingHttpResponse> resp;
try {
resp = super.request(delegate, strategy, request);
} catch (Throwable t) {
return failed(toGrpcException(t));
}
return resp.onErrorMap(DefaultGrpcClientBuilder::toGrpcException);
}
});
}

private static GrpcStatusException toGrpcException(Throwable cause) {
return fromThrowable(cause).asException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private GrpcClients() {
* @return new builder for the address
*/
public static GrpcClientBuilder<HostAndPort, InetSocketAddress> forAddress(final String host, final int port) {
return new DefaultGrpcClientBuilder<>(HttpClients.forSingleAddress(host, port));
return new DefaultGrpcClientBuilder<>(() -> HttpClients.forSingleAddress(host, port));
}

/**
Expand All @@ -57,7 +57,7 @@ public static GrpcClientBuilder<HostAndPort, InetSocketAddress> forAddress(final
* @return new builder for the address
*/
public static GrpcClientBuilder<HostAndPort, InetSocketAddress> forAddress(final HostAndPort address) {
return new DefaultGrpcClientBuilder<>(HttpClients.forSingleAddress(address));
return new DefaultGrpcClientBuilder<>(() -> HttpClients.forSingleAddress(address));
}

/**
Expand All @@ -68,7 +68,7 @@ public static GrpcClientBuilder<HostAndPort, InetSocketAddress> forAddress(final
* @return new builder for the address
*/
public static GrpcClientBuilder<String, InetSocketAddress> forServiceAddress(final String serviceName) {
return new DefaultGrpcClientBuilder<>(HttpClients.forServiceAddress(serviceName));
return new DefaultGrpcClientBuilder<>(() -> HttpClients.forServiceAddress(serviceName));
}

/**
Expand All @@ -80,7 +80,7 @@ public static GrpcClientBuilder<String, InetSocketAddress> forServiceAddress(fin
*/
public static GrpcClientBuilder<HostAndPort, InetSocketAddress> forResolvedAddress(final String host,
final int port) {
return new DefaultGrpcClientBuilder<>(HttpClients.forResolvedAddress(host, port));
return new DefaultGrpcClientBuilder<>(() -> HttpClients.forResolvedAddress(host, port));
}

/**
Expand All @@ -90,7 +90,7 @@ public static GrpcClientBuilder<HostAndPort, InetSocketAddress> forResolvedAddre
* @return new builder for the address
*/
public static GrpcClientBuilder<HostAndPort, InetSocketAddress> forResolvedAddress(final HostAndPort address) {
return new DefaultGrpcClientBuilder<>(HttpClients.forResolvedAddress(address));
return new DefaultGrpcClientBuilder<>(() -> HttpClients.forResolvedAddress(address));
}

/**
Expand All @@ -101,21 +101,24 @@ public static GrpcClientBuilder<HostAndPort, InetSocketAddress> forResolvedAddre
*/
public static GrpcClientBuilder<InetSocketAddress, InetSocketAddress> forResolvedAddress(
final InetSocketAddress address) {
return new DefaultGrpcClientBuilder<>(HttpClients.forResolvedAddress(address));
return new DefaultGrpcClientBuilder<>(() -> HttpClients.forResolvedAddress(address));
}

/**
* Creates a {@link GrpcClientBuilder} for an address with default {@link LoadBalancer}.
*
* @param address the {@code ResolvedAddress} to connect. This address will also be used for the
* {@link HttpHeaderNames#HOST}. Use {@link GrpcClientBuilder#unresolvedAddressToHost(Function)}
* if you want to override that value or {@link GrpcClientBuilder#hostHeaderFallback(boolean)} if you
* want to disable this behavior.
* {@link HttpHeaderNames#HOST}.
* Use {@link io.servicetalk.http.api.SingleAddressHttpClientBuilder#unresolvedAddressToHost(Function)}
* via {@link GrpcClientBuilder#initializeHttp(GrpcClientBuilder.HttpInitializer)}
* if you want to override that value or
* {@link io.servicetalk.http.api.SingleAddressHttpClientBuilder#hostHeaderFallback(boolean)}
* if you want to disable this behavior.
* @param <T> The type of {@link SocketAddress}.
* @return new builder for the address
*/
public static <T extends SocketAddress> GrpcClientBuilder<T, T> forResolvedAddress(final T address) {
return new DefaultGrpcClientBuilder<>(HttpClients.forResolvedAddress(address));
return new DefaultGrpcClientBuilder<>(() -> HttpClients.forResolvedAddress(address));
}

/**
Expand All @@ -132,6 +135,6 @@ public static <T extends SocketAddress> GrpcClientBuilder<T, T> forResolvedAddre
public static <U, R>
GrpcClientBuilder<U, R> forAddress(final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer,
final U address) {
return new DefaultGrpcClientBuilder<>(HttpClients.forSingleAddress(serviceDiscoverer, address));
return new DefaultGrpcClientBuilder<>(() -> HttpClients.forSingleAddress(serviceDiscoverer, address));
}
}
Loading

0 comments on commit 3c9bd5c

Please sign in to comment.