Skip to content

Commit

Permalink
Decouple multi-address and partitioned client builders from `HttpClie…
Browse files Browse the repository at this point in the history
…ntBuildContext` (#2136)

Motivation:

`DefaultMultiAddressHttpClientBuilder` and
`DefaultPartitionedHttpClientBuilder` use
`DefaultSingleAddressHttpClientBuilder` as a template for underlying
single-address clients. It was helpful before we introduced a
`SingleAddressInitializer` in #1387. Before that, all builders had the
same set of methods and it was nice to reuse the single-address builder
instead of maintaining a similar state at higher level builders. Now,
`SingleAddressInitializer` helps to avoid duplication between builders.
We don't have to use `DefaultSingleAddressHttpClientBuilder` as a
template and can simplify its state by using a factory for
`SingleAddressHttpClientBuilder`.

Modifications:

- `DefaultMultiAddressUrlHttpClientBuilder` and
`DefaultPartitionedHttpClientBuilder` use a factory for
`SingleAddressHttpClientBuilder` instead of
`DefaultSingleAddressHttpClientBuilder` as a template and
`HttpExecutionContextBuilder` for context-related items;
- Remove and hide methods from `DefaultSingleAddressHttpClientBuilder`
that are not used by other builders anymore;
- Add a test to verify `DefaultMultiAddressUrlHttpClientBuilder` sets
execution context items for underlying single-address builders;

Result:

`DefaultSingleAddressHttpClientBuilder` has less pkg-private API,
`DefaultMultiAddressUrlHttpClientBuilder` and
`DefaultPartitionedHttpClientBuilder` create a new single-address
builder on demand, similar to grpc builders.
  • Loading branch information
idelpivnitskiy authored Mar 9, 2022
1 parent 310f6b5 commit a93fc9b
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.MultiAddressHttpClientBuilder;
import io.servicetalk.http.api.RedirectConfig;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.HttpClientBuildContext;
import io.servicetalk.http.utils.RedirectingHttpRequesterFilter;
import io.servicetalk.transport.api.ClientSslConfig;
import io.servicetalk.transport.api.ClientSslConfigBuilder;
Expand All @@ -64,6 +64,7 @@
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -85,7 +86,8 @@ final class DefaultMultiAddressUrlHttpClientBuilder

private static final String HTTPS_SCHEME = HTTPS.toString();

private final DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builderTemplate;
private final Function<HostAndPort, SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress>> builderFactory;
private final HttpExecutionContextBuilder executionContextBuilder = new HttpExecutionContextBuilder();

@Nullable
private HttpHeadersFactory headersFactory;
Expand All @@ -95,19 +97,17 @@ final class DefaultMultiAddressUrlHttpClientBuilder
private SingleAddressInitializer<HostAndPort, InetSocketAddress> singleAddressInitializer;

DefaultMultiAddressUrlHttpClientBuilder(
final DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builderTemplate) {
this.builderTemplate = requireNonNull(builderTemplate);
final Function<HostAndPort, SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress>> bFactory) {
this.builderFactory = requireNonNull(bFactory);
}

@Override
public StreamingHttpClient buildStreaming() {
final CompositeCloseable closeables = newCompositeCloseable();
try {
final HttpClientBuildContext<HostAndPort, InetSocketAddress> buildContext = builderTemplate.copyBuildCtx();

final ClientFactory clientFactory = new ClientFactory(buildContext.builder, singleAddressInitializer);

final HttpExecutionContext executionContext = buildContext.builder.executionContextBuilder.build();
final HttpExecutionContext executionContext = executionContextBuilder.build();
final ClientFactory clientFactory = new ClientFactory(builderFactory, executionContext,
singleAddressInitializer);
final CachingKeyFactory keyFactory = closeables.prepend(new CachingKeyFactory());
final HttpHeadersFactory headersFactory = this.headersFactory;
FilterableStreamingHttpClient urlClient = closeables.prepend(
Expand All @@ -120,13 +120,8 @@ public StreamingHttpClient buildStreaming() {
urlClient = redirectConfig == null ? urlClient :
new RedirectingHttpRequesterFilter(redirectConfig).create(urlClient);

HttpExecutionStrategy computedStrategy =
buildContext.builder.computeChainStrategy(executionContext.executionStrategy());

LOGGER.debug("Client created with base strategy {} → computed strategy {}",
executionContext.executionStrategy(), computedStrategy);

return new FilterableClientToClient(urlClient, computedStrategy);
LOGGER.debug("Multi-address client created with base strategy {}", executionContext.executionStrategy());
return new FilterableClientToClient(urlClient, executionContext.executionStrategy());
} catch (final Throwable t) {
closeables.closeAsync().subscribe();
throw t;
Expand Down Expand Up @@ -220,32 +215,36 @@ public int hashCode() {

private static final class ClientFactory implements Function<UrlKey, FilterableStreamingHttpClient> {
private static final ClientSslConfig DEFAULT_CLIENT_SSL_CONFIG = new ClientSslConfigBuilder().build();
private final DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builderTemplate;
private final Function<HostAndPort, SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress>>
builderFactory;
private final HttpExecutionContext executionContext;
@Nullable
private final SingleAddressInitializer<HostAndPort, InetSocketAddress> singleAddressInitializer;

ClientFactory(
final DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builderTemplate,
ClientFactory(final Function<HostAndPort, SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress>>
builderFactory,
final HttpExecutionContext executionContext,
@Nullable final SingleAddressInitializer<HostAndPort, InetSocketAddress> singleAddressInitializer) {
this.builderTemplate = builderTemplate;
this.builderFactory = builderFactory;
this.executionContext = executionContext;
this.singleAddressInitializer = singleAddressInitializer;
}

@Override
public StreamingHttpClient apply(final UrlKey urlKey) {
// Copy existing builder to prevent changes at runtime when concurrently creating clients for new addresses
final HttpClientBuildContext<HostAndPort, InetSocketAddress> buildContext =
builderTemplate.copyBuildCtx(urlKey.hostAndPort);
final SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builder =
requireNonNull(builderFactory.apply(urlKey.hostAndPort));

setExecutionContext(builder, executionContext);
if (HTTPS_SCHEME.equalsIgnoreCase(urlKey.scheme)) {
buildContext.builder.sslConfig(DEFAULT_CLIENT_SSL_CONFIG);
builder.sslConfig(DEFAULT_CLIENT_SSL_CONFIG);
}

if (singleAddressInitializer != null) {
singleAddressInitializer.initialize(urlKey.scheme, urlKey.hostAndPort, buildContext.builder);
singleAddressInitializer.initialize(urlKey.scheme, urlKey.hostAndPort, builder);
}

return buildContext.build();
return builder.buildStreaming();
}
}

Expand Down Expand Up @@ -330,27 +329,27 @@ public StreamingHttpRequest newRequest(final HttpRequestMethod method, final Str

@Override
public MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> ioExecutor(final IoExecutor ioExecutor) {
builderTemplate.ioExecutor(ioExecutor);
executionContextBuilder.ioExecutor(ioExecutor);
return this;
}

@Override
public MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> executor(final Executor executor) {
builderTemplate.executor(executor);
executionContextBuilder.executor(executor);
return this;
}

@Override
public MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> bufferAllocator(
final BufferAllocator allocator) {
builderTemplate.bufferAllocator(allocator);
executionContextBuilder.bufferAllocator(allocator);
return this;
}

@Override
public MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> executionStrategy(
final HttpExecutionStrategy strategy) {
builderTemplate.executionStrategy(strategy);
executionContextBuilder.executionStrategy(strategy);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.PartitionHttpClientBuilderConfigurator;
import io.servicetalk.http.api.PartitionedHttpClientBuilder;
import io.servicetalk.http.api.ReservedStreamingHttpConnection;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.HttpClientBuildContext;
import io.servicetalk.transport.api.IoExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
Expand All @@ -66,75 +66,73 @@
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_INIT_DURATION;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_JITTER;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

final class DefaultPartitionedHttpClientBuilder<U, R> implements PartitionedHttpClientBuilder<U, R> {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPartitionedHttpClientBuilder.class);

private final U address;
private final Function<HttpRequestMetaData, PartitionAttributesBuilder> partitionAttributesBuilderFactory;
private final Supplier<SingleAddressHttpClientBuilder<U, R>> builderFactory;
private final HttpExecutionContextBuilder executionContextBuilder = new HttpExecutionContextBuilder();
private ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> serviceDiscoverer;
@Nullable
private BiIntFunction<Throwable, ? extends Completable> serviceDiscovererRetryStrategy;
private final Function<HttpRequestMetaData, PartitionAttributesBuilder> partitionAttributesBuilderFactory;
private final DefaultSingleAddressHttpClientBuilder<U, R> builderTemplate;
private int serviceDiscoveryMaxQueueSize = 32;
@Nullable
private HttpHeadersFactory headersFactory;
@Nullable
private SingleAddressInitializer<U, R> clientInitializer;
private PartitionHttpClientBuilderConfigurator<U, R> clientFilterFunction = (__, ___) -> { };
private PartitionMapFactory partitionMapFactory = PowerSetPartitionMapFactory.INSTANCE;
private int serviceDiscoveryMaxQueueSize = 32;

DefaultPartitionedHttpClientBuilder(
final DefaultSingleAddressHttpClientBuilder<U, R> builderTemplate,
final U address,
final Supplier<SingleAddressHttpClientBuilder<U, R>> builderFactory,
final ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> serviceDiscoverer,
final Function<HttpRequestMetaData, PartitionAttributesBuilder> partitionAttributesBuilderFactory) {
this.builderTemplate = requireNonNull(builderTemplate);
this.address = requireNonNull(address);
this.builderFactory = requireNonNull(builderFactory);
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
this.partitionAttributesBuilderFactory = requireNonNull(partitionAttributesBuilderFactory);
}

@Override
public StreamingHttpClient buildStreaming() {
final HttpClientBuildContext<U, R> buildContext = builderTemplate.copyBuildCtx();
final HttpExecutionContext executionContext = buildContext.builder.build().executionContext();
final HttpExecutionContext executionContext = executionContextBuilder.build();
BiIntFunction<Throwable, ? extends Completable> sdRetryStrategy = serviceDiscovererRetryStrategy;
if (sdRetryStrategy == null) {
sdRetryStrategy = retryWithConstantBackoffDeltaJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
SD_RETRY_STRATEGY_JITTER, executionContext.executor());
}
ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> psd =
final ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> psd =
new DefaultSingleAddressHttpClientBuilder.RetryingServiceDiscoverer<>(serviceDiscoverer,
sdRetryStrategy);

final PartitionedClientFactory<U, R, FilterableStreamingHttpClient> clientFactory = (pa, sd) -> {
// build new context, user may have changed anything on the builder from the filter
DefaultSingleAddressHttpClientBuilder<U, R> builder = buildContext.builder.copyBuildCtx().builder;
final SingleAddressHttpClientBuilder<U, R> builder = requireNonNull(builderFactory.get());
builder.serviceDiscoverer(sd);
clientFilterFunction.configureForPartition(pa, builder);
setExecutionContext(builder, executionContext);
if (clientInitializer != null) {
clientInitializer.initialize(pa, builder);
}
return builder.buildStreaming();
};

final Publisher<PartitionedServiceDiscovererEvent<R>> psdEvents = psd.discover(buildContext.address())
final Publisher<PartitionedServiceDiscovererEvent<R>> psdEvents = psd.discover(address)
.flatMapConcatIterable(identity());
final HttpHeadersFactory headersFactory = this.headersFactory;
DefaultPartitionedStreamingHttpClientFilter<U, R> partitionedClient =
final DefaultPartitionedStreamingHttpClientFilter<U, R> partitionedClient =
new DefaultPartitionedStreamingHttpClientFilter<>(psdEvents, serviceDiscoveryMaxQueueSize,
clientFactory, partitionAttributesBuilderFactory,
new DefaultStreamingHttpRequestResponseFactory(executionContext.bufferAllocator(),
headersFactory != null ? headersFactory : DefaultHttpHeadersFactory.INSTANCE, HTTP_1_1),
executionContext, partitionMapFactory);

HttpExecutionStrategy computedStrategy =
buildContext.builder.computeChainStrategy(executionContext.executionStrategy());

LOGGER.debug("Client created with base strategy {} → computed strategy {}",
executionContext.executionStrategy(), computedStrategy);

return new FilterableClientToClient(partitionedClient, computedStrategy);
LOGGER.debug("Partitioned client created with base strategy {}", executionContext.executionStrategy());
return new FilterableClientToClient(partitionedClient, executionContext.executionStrategy());
}

private static final class DefaultPartitionedStreamingHttpClientFilter<U, R> implements
Expand Down Expand Up @@ -263,19 +261,19 @@ public StreamingHttpRequest newRequest(final HttpRequestMethod method, final Str

@Override
public PartitionedHttpClientBuilder<U, R> executor(final Executor executor) {
builderTemplate.executor(executor);
executionContextBuilder.executor(executor);
return this;
}

@Override
public PartitionedHttpClientBuilder<U, R> ioExecutor(final IoExecutor ioExecutor) {
builderTemplate.ioExecutor(ioExecutor);
executionContextBuilder.ioExecutor(ioExecutor);
return this;
}

@Override
public PartitionedHttpClientBuilder<U, R> bufferAllocator(final BufferAllocator allocator) {
builderTemplate.bufferAllocator(allocator);
executionContextBuilder.bufferAllocator(allocator);
return this;
}

Expand Down Expand Up @@ -313,7 +311,7 @@ public PartitionedHttpClientBuilder<U, R> initializer(final SingleAddressInitial

@Override
public PartitionedHttpClientBuilder<U, R> executionStrategy(final HttpExecutionStrategy strategy) {
this.builderTemplate.executionStrategy(strategy);
this.executionContextBuilder.executionStrategy(strategy);
return this;
}

Expand Down
Loading

0 comments on commit a93fc9b

Please sign in to comment.