Skip to content

Commit

Permalink
ConnectionFactory-level filters can not migrate to new API of #1956 (#…
Browse files Browse the repository at this point in the history
…2089)

Motivation:

In #1956 `StreamingHttpRequester` defines a new method
`request(StreamingHttpRequest)`. `AbstractStreamingHttpConnection` only
implements the deprecated method that also takes an
`HttpExecutionStrategy`. Users who apply a filter through
`ConnectionFactory` see `UnsupportedOperationException`. Also, if they
have 2+ filters applied through `ConnectionFactory`, they can not mix
deprecated and new API.

Modifications:

- Implement
`AbstractStreamingHttpConnection#request(StreamingHttpRequest)`;
- Enhance `NewToDeprecatedFilter` to be `ConnectionFactoryFilter` too;
- Apply `NewToDeprecatedFilter` when users invoke
`appendConnectionFactoryFilter`;
- Test `appendConnectionFactoryFilter` in `MixedFiltersTest`;

Result:

Users who apply request/response filters through `ConnectionFactory`
can migrate from deprecated to new `request(StreamingHttpRequest)` API.
  • Loading branch information
idelpivnitskiy authored Feb 18, 2022
1 parent a4dd491 commit ccbe7f9
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.servicetalk.concurrent.api.Single;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpApiConversions.requestStrategy;

/**
* Because users can have a mixed set of filters, we should always delegate from the new {@code request} method to the
Expand Down Expand Up @@ -104,9 +104,4 @@ public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strat
// No influence since we do not block.
return strategy;
}

static HttpExecutionStrategy requestStrategy(HttpRequestMetaData metaData, HttpExecutionStrategy fallback) {
final HttpExecutionStrategy strategy = metaData.context().get(HTTP_EXECUTION_STRATEGY_KEY);
return strategy != null ? strategy : fallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static io.servicetalk.http.netty.HeaderUtils.emptyMessageBody;
import static io.servicetalk.http.netty.HeaderUtils.flatEmptyMessage;
import static io.servicetalk.http.netty.HeaderUtils.setRequestContentLength;
import static io.servicetalk.http.netty.NewToDeprecatedFilter.requestStrategy;
import static io.servicetalk.transport.netty.internal.FlushStrategies.flushOnEnd;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -104,8 +105,14 @@ public final Single<StreamingHttpResponse> invokeClient(final Publisher<Object>
}

@Override
public Single<StreamingHttpResponse> request(final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
public final Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return defer(() -> request(requestStrategy(request, executionContext().executionStrategy()), request)
.shareContextOnSubscribe());
}

@Override
public final Single<StreamingHttpResponse> request(final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return defer(() -> {
final Publisher<Object> flatRequest;
// See https://tools.ietf.org/html/rfc7230#section-3.3.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ private MultiAddressHttpClientFilterFactory<HostAndPort> appendClientFilter(
}

@Nonnull
private MultiAddressHttpClientFilterFactory<HostAndPort> appendClientFilter0(
private static MultiAddressHttpClientFilterFactory<HostAndPort> appendClientFilter0(
final @Nullable MultiAddressHttpClientFilterFactory<HostAndPort> current,
final MultiAddressHttpClientFilterFactory<HostAndPort> next) {
return current == null ? next : (group, client) -> current.create(group, next.create(group, client));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> extends SingleAddressHtt

static final Duration SD_RETRY_STRATEGY_INIT_DURATION = ofSeconds(10);
static final Duration SD_RETRY_STRATEGY_JITTER = ofSeconds(5);

private final NewToDeprecatedFilter<R> newToDeprecatedFilter;
@Nullable
private final U address;
@Nullable
Expand Down Expand Up @@ -136,27 +138,32 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> extends SingleAddressHtt
private RetryingHttpRequesterFilter retryingHttpRequesterFilter;

DefaultSingleAddressHttpClientBuilder(
final U address, final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer) {
final U address, final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer,
final NewToDeprecatedFilter<R> newToDeprecatedFilter) {
this.address = requireNonNull(address);
config = new HttpClientConfig();
executionContextBuilder = new HttpExecutionContextBuilder();
influencerChainBuilder = new ClientStrategyInfluencerChainBuilder();
this.loadBalancerFactory = DefaultHttpLoadBalancerFactory.Builder.<R>fromDefaults().build();
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
this.newToDeprecatedFilter = requireNonNull(newToDeprecatedFilter);
}

DefaultSingleAddressHttpClientBuilder(
final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer) {
final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer,
final NewToDeprecatedFilter<R> newToDeprecatedFilter) {
address = null; // Unknown address - template builder pending override via: copy(address)
config = new HttpClientConfig();
executionContextBuilder = new HttpExecutionContextBuilder();
influencerChainBuilder = new ClientStrategyInfluencerChainBuilder();
this.loadBalancerFactory = DefaultHttpLoadBalancerFactory.Builder.<R>fromDefaults().build();
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
this.newToDeprecatedFilter = requireNonNull(newToDeprecatedFilter);
}

private DefaultSingleAddressHttpClientBuilder(@Nullable final U address,
final DefaultSingleAddressHttpClientBuilder<U, R> from) {
final DefaultSingleAddressHttpClientBuilder<U, R> from,
final NewToDeprecatedFilter<R> newToDeprecatedFilter) {
this.address = address;
proxyAddress = from.proxyAddress;
config = new HttpClientConfig(from.config);
Expand All @@ -173,6 +180,7 @@ private DefaultSingleAddressHttpClientBuilder(@Nullable final U address,
autoRetry = from.autoRetry;
connectionFactoryFilter = from.connectionFactoryFilter;
retryingHttpRequesterFilter = from.retryingHttpRequesterFilter;
this.newToDeprecatedFilter = requireNonNull(newToDeprecatedFilter);
}

//FIXME: 0.42 - to be removed when auto-retry is removed.
Expand All @@ -181,31 +189,34 @@ private boolean customAutoRetry() {
}

private DefaultSingleAddressHttpClientBuilder<U, R> copy() {
return new DefaultSingleAddressHttpClientBuilder<>(address, this);
return new DefaultSingleAddressHttpClientBuilder<>(address, this, new NewToDeprecatedFilter<>());
}

private DefaultSingleAddressHttpClientBuilder<U, R> copy(final U address) {
return new DefaultSingleAddressHttpClientBuilder<>(requireNonNull(address), this);
return new DefaultSingleAddressHttpClientBuilder<>(requireNonNull(address), this,
new NewToDeprecatedFilter<>());
}

static DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> forHostAndPort(
final HostAndPort address) {
return new DefaultSingleAddressHttpClientBuilder<>(address, globalDnsServiceDiscoverer());
return new DefaultSingleAddressHttpClientBuilder<>(address, globalDnsServiceDiscoverer(),
NEW_TO_DEPRECATED_FILTER);
}

static DefaultSingleAddressHttpClientBuilder<String, InetSocketAddress> forServiceAddress(
final String serviceName) {
return new DefaultSingleAddressHttpClientBuilder<>(serviceName, globalSrvDnsServiceDiscoverer());
return new DefaultSingleAddressHttpClientBuilder<>(serviceName, globalSrvDnsServiceDiscoverer(),
NEW_TO_DEPRECATED_FILTER);
}

static <U, R extends SocketAddress> DefaultSingleAddressHttpClientBuilder<U, R> forResolvedAddress(
final U u, final Function<U, R> toResolvedAddressMapper) {
ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> sd = new NoopServiceDiscoverer<>(toResolvedAddressMapper);
return new DefaultSingleAddressHttpClientBuilder<>(u, sd);
return new DefaultSingleAddressHttpClientBuilder<>(u, sd, new NewToDeprecatedFilter<>());
}

static DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> forUnknownHostAndPort() {
return new DefaultSingleAddressHttpClientBuilder<>(globalDnsServiceDiscoverer());
return new DefaultSingleAddressHttpClientBuilder<>(globalDnsServiceDiscoverer(), NEW_TO_DEPRECATED_FILTER);
}

static final class HttpClientBuildContext<U, R> {
Expand Down Expand Up @@ -250,7 +261,7 @@ HttpClientConfig httpConfig() {
}

StreamingHttpClient build() {
return buildStreaming(this);
return buildStreaming(this, builder.newToDeprecatedFilter);
}

ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> serviceDiscoverer(
Expand All @@ -267,10 +278,11 @@ StreamingHttpClient build() {

@Override
public StreamingHttpClient buildStreaming() {
return buildStreaming(copyBuildCtx());
return buildStreaming(copyBuildCtx(), newToDeprecatedFilter);
}

private static <U, R> StreamingHttpClient buildStreaming(final HttpClientBuildContext<U, R> ctx) {
private static <U, R> StreamingHttpClient buildStreaming(final HttpClientBuildContext<U, R> ctx,
final NewToDeprecatedFilter<R> newToDeprecatedFilter) {
final ReadOnlyHttpClientConfig roConfig = ctx.httpConfig().asReadOnly();
if (roConfig.h2Config() != null && roConfig.hasProxy()) {
throw new IllegalStateException("Proxying is not yet supported with HTTP/2");
Expand Down Expand Up @@ -334,11 +346,11 @@ ctx.builder.connectionFilterFactory, new AlpnReqRespFactoryFunc(
// If we're talking to a proxy over http (not https), rewrite the request-target to absolute-form, as
// specified by the RFC: https://tools.ietf.org/html/rfc7230#section-5.3.2
currClientFilterFactory = appendFilter(currClientFilterFactory,
ctx.builder.proxyAbsoluteAddressFilterFactory());
ctx.builder.proxyAbsoluteAddressFilterFactory(), newToDeprecatedFilter);
}
if (ctx.builder.addHostHeaderFallbackFilter) {
currClientFilterFactory = appendFilter(currClientFilterFactory, new HostHeaderHttpRequesterFilter(
ctx.builder.hostToCharSequenceFunction.apply(ctx.builder.address)));
ctx.builder.hostToCharSequenceFunction.apply(ctx.builder.address)), newToDeprecatedFilter);
}

FilterableStreamingHttpClient lbClient = closeOnException.prepend(
Expand All @@ -347,7 +359,8 @@ ctx.builder.connectionFilterFactory, new AlpnReqRespFactoryFunc(
lbClient = new AutoRetryFilter(lbClient,
ctx.builder.autoRetry.newStrategy(lb.eventStream(), ctx.sdStatus));
} else if (ctx.builder.autoRetry == DEFAULT_AUTO_RETRY && ctx.builder.retryingHttpRequesterFilter == null) {
currClientFilterFactory = appendFilter(currClientFilterFactory, DEFAULT_RETRY_FILTER);
currClientFilterFactory = appendFilter(currClientFilterFactory, DEFAULT_RETRY_FILTER,
newToDeprecatedFilter);
}
return new FilterableClientToClient(currClientFilterFactory != null ?
currClientFilterFactory.create(lbClient, lb.eventStream(), ctx.sdStatus) : lbClient,
Expand Down Expand Up @@ -394,11 +407,12 @@ private static <U, R> String targetAddress(final HttpClientBuildContext<U, R> ct
ctx.builder.address.toString() : ctx.builder.address + " (via " + ctx.proxyAddress + ")";
}

private static ContextAwareStreamingHttpClientFilterFactory appendFilter(
private static <R> ContextAwareStreamingHttpClientFilterFactory appendFilter(
@Nullable final ContextAwareStreamingHttpClientFilterFactory currClientFilterFactory,
final StreamingHttpClientFilterFactory appendClientFilterFactory) {
final StreamingHttpClientFilterFactory appendClientFilterFactory,
final NewToDeprecatedFilter<R> newToDeprecatedFilter) {
return appendFilter0(appendFilter0(currClientFilterFactory, appendClientFilterFactory),
NEW_TO_DEPRECATED_FILTER);
newToDeprecatedFilter);
}

private static ContextAwareStreamingHttpClientFilterFactory appendFilter0(
Expand Down Expand Up @@ -532,7 +546,7 @@ public DefaultSingleAddressHttpClientBuilder<U, R> appendConnectionFilter(
final StreamingHttpConnectionFilterFactory factory) {
requireNonNull(factory);
connectionFilterFactory = appendConnectionFilter(appendConnectionFilter(connectionFilterFactory, factory),
NEW_TO_DEPRECATED_FILTER);
newToDeprecatedFilter);
influencerChainBuilder.add(factory);
return this;
}
Expand All @@ -548,7 +562,8 @@ private static StreamingHttpConnectionFilterFactory appendConnectionFilter(
public DefaultSingleAddressHttpClientBuilder<U, R> appendConnectionFactoryFilter(
final ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> factory) {
requireNonNull(factory);
connectionFactoryFilter = appendConnectionFactoryFilter(connectionFactoryFilter, factory);
connectionFactoryFilter = appendConnectionFactoryFilter(
appendConnectionFactoryFilter(connectionFactoryFilter, factory), newToDeprecatedFilter);
influencerChainBuilder.add(factory);
return this;
}
Expand Down Expand Up @@ -632,7 +647,7 @@ public DefaultSingleAddressHttpClientBuilder<U, R> appendClientFilter(
}
retryingHttpRequesterFilter = (RetryingHttpRequesterFilter) factory;
}
clientFilterFactory = appendFilter(clientFilterFactory, factory);
clientFilterFactory = appendFilter(clientFilterFactory, factory, newToDeprecatedFilter);
influencerChainBuilder.add(factory);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.Publisher.failed;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.forUnknownHostAndPort;
import static io.servicetalk.http.netty.NewToDeprecatedFilter.NEW_TO_DEPRECATED_FILTER;
import static java.util.function.Function.identity;

/**
Expand Down Expand Up @@ -85,7 +86,7 @@ public static MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> forM
final ServiceDiscoverer<HostAndPort, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>>
serviceDiscoverer) {
return new DefaultMultiAddressUrlHttpClientBuilder(
new DefaultSingleAddressHttpClientBuilder<>(serviceDiscoverer));
new DefaultSingleAddressHttpClientBuilder<>(serviceDiscoverer, NEW_TO_DEPRECATED_FILTER));
}

/**
Expand Down Expand Up @@ -301,7 +302,7 @@ public static SingleAddressHttpClientBuilder<InetSocketAddress, InetSocketAddres
public static <U, R> SingleAddressHttpClientBuilder<U, R> forSingleAddress(
final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer,
final U address) {
return new DefaultSingleAddressHttpClientBuilder<>(address, serviceDiscoverer);
return new DefaultSingleAddressHttpClientBuilder<>(address, serviceDiscoverer, new NewToDeprecatedFilter<>());
}

/**
Expand Down Expand Up @@ -348,6 +349,6 @@ public Completable closeAsync() {
public Completable closeAsyncGracefully() {
return closeable.closeAsyncGracefully();
}
}), serviceDiscoverer, partitionAttributesBuilderFactory);
}, new NewToDeprecatedFilter<>()), serviceDiscoverer, partitionAttributesBuilderFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionFactoryFilter;
import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
Expand All @@ -30,6 +33,10 @@
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.TransportObserver;

import java.net.InetSocketAddress;
import javax.annotation.Nullable;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;

Expand All @@ -42,14 +49,11 @@
*/
// 0.41 branch requires two copies of this class: one in http-api, one in http-netty. Keep them identical.
@Deprecated
final class NewToDeprecatedFilter implements StreamingHttpClientFilterFactory, StreamingHttpConnectionFilterFactory,
HttpExecutionStrategyInfluencer {

static final NewToDeprecatedFilter NEW_TO_DEPRECATED_FILTER = new NewToDeprecatedFilter();
final class NewToDeprecatedFilter<R> implements StreamingHttpClientFilterFactory, StreamingHttpConnectionFilterFactory,
ConnectionFactoryFilter<R, FilterableStreamingHttpConnection>,
HttpExecutionStrategyInfluencer {

private NewToDeprecatedFilter() {
// Singleton
}
static final NewToDeprecatedFilter<InetSocketAddress> NEW_TO_DEPRECATED_FILTER = new NewToDeprecatedFilter<>();

@Override
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) {
Expand Down Expand Up @@ -123,4 +127,16 @@ static HttpExecutionStrategy requestStrategy(HttpRequestMetaData metaData, HttpE
final HttpExecutionStrategy strategy = metaData.context().get(HTTP_EXECUTION_STRATEGY_KEY);
return strategy != null ? strategy : fallback;
}

@Override
public ConnectionFactory<R, FilterableStreamingHttpConnection> create(
final ConnectionFactory<R, FilterableStreamingHttpConnection> cf) {
return new DelegatingConnectionFactory<R, FilterableStreamingHttpConnection>(cf) {
@Override
public Single<FilterableStreamingHttpConnection> newConnection(
final R resolvedAddress, @Nullable final TransportObserver observer) {
return delegate().newConnection(resolvedAddress, observer).map(NewToDeprecatedFilter.this::create);
}
};
}
}
Loading

0 comments on commit ccbe7f9

Please sign in to comment.