Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use filter strategy for streaming async server #2156

Merged
merged 2 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public Single<HttpServerContext> listen(final HttpService service) {

@Override
public Single<HttpServerContext> listenStreaming(final StreamingHttpService service) {
bondolo marked this conversation as resolved.
Show resolved Hide resolved
return listenForService(service, strategy);
return listenForService(service, computeServiceStrategy(service));
}

@Override
Expand Down Expand Up @@ -360,7 +360,7 @@ private HttpExecutionStrategy computeServiceStrategy(Object service) {
HttpExecutionStrategy serviceStrategy = requiredOffloads(service, defaultStrategy());
HttpExecutionStrategy filterStrategy = computeRequiredStrategy(serviceFilters, serviceStrategy);
return defaultStrategy() == strategy ? filterStrategy :
strategy.hasOffloads() ? strategy.merge(filterStrategy) : offloadNone();
strategy.hasOffloads() ? strategy.merge(filterStrategy) : strategy;
}

private static StreamingHttpService applyInternalFilters(StreamingHttpService service,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.utils.auth.BasicAuthHttpServiceFilter;
import io.servicetalk.http.utils.auth.BasicAuthHttpServiceFilter.CredentialsVerifier;
import io.servicetalk.transport.api.ExecutionStrategyInfluencer;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.api.ServerContext;
Expand Down Expand Up @@ -129,6 +128,7 @@ private BlockingHttpClient setup(boolean noOffloadsInfluence) throws Exception {
when(credentialsVerifier.apply(anyString(), anyString())).thenReturn(succeeded("success"));
when(credentialsVerifier.closeAsync()).thenReturn(completed());
when(credentialsVerifier.closeAsyncGracefully()).thenReturn(completed());
when(credentialsVerifier.requiredOffloads()).thenCallRealMethod();
CredentialsVerifier<String> verifier = credentialsVerifier;
if (noOffloadsInfluence) {
verifier = new InfluencingVerifier(verifier, offloadNone());
Expand All @@ -146,8 +146,7 @@ private BlockingHttpClient setup(boolean noOffloadsInfluence) throws Exception {
return this.client;
}

private static final class OffloadCheckingService implements StreamingHttpService,
ExecutionStrategyInfluencer<HttpExecutionStrategy> {
bondolo marked this conversation as resolved.
Show resolved Hide resolved
private static final class OffloadCheckingService implements StreamingHttpService {

private enum OffloadPoint {
ServiceHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.HttpServerContext;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.ReservedBlockingHttpConnection;
import io.servicetalk.http.api.ReservedBlockingStreamingHttpConnection;
import io.servicetalk.http.api.ReservedHttpConnection;
import io.servicetalk.http.api.ReservedStreamingHttpConnection;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.ServerContext;

Expand All @@ -48,6 +54,7 @@
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
import static io.servicetalk.http.netty.HttpClients.forSingleAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
Expand Down Expand Up @@ -79,16 +86,26 @@ void tearDown() throws Exception {
@ValueSource(booleans = {false, true})
void testStreaming(boolean customStrategy) throws Exception {
StreamingHttpClient client = initClientAndServer(builder ->
builder.listenStreaming((ctx, request, responseFactory) -> {
serviceStrategyRef.set(ctx.executionContext().executionStrategy());
return succeeded(responseFactory.ok());
builder.listenStreaming(new StreamingHttpService() {
@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
serviceStrategyRef.set(ctx.executionContext().executionStrategy());
return succeeded(responseFactory.ok());
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return offloadNone();
}
}), customStrategy).buildStreaming();
clientAsCloseable = client;
if (!customStrategy) {
assert expectedClientStrategy == null;
expectedClientStrategy = defaultStrategy();
assert expectedServerStrategy == null;
expectedServerStrategy = defaultStrategy();
expectedServerStrategy = offloadNone();
}
HttpExecutionStrategy clientStrat = client.executionContext().executionStrategy();
assertThat("Unexpected client strategy.", clientStrat, equalStrategies(expectedClientStrategy));
Expand Down Expand Up @@ -211,18 +228,18 @@ private SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> initClien
return clientBuilder;
}

static Matcher<HttpExecutionStrategy> equalStrategies(@Nullable HttpExecutionStrategy expected) {
return new TypeSafeMatcher<HttpExecutionStrategy>() {
static Matcher<ExecutionStrategy> equalStrategies(@Nullable ExecutionStrategy expected) {
return new TypeSafeMatcher<ExecutionStrategy>() {

@Override
public void describeMismatchSafely(@Nullable HttpExecutionStrategy item, Description mismatchDescription) {
public void describeMismatchSafely(@Nullable ExecutionStrategy item, Description mismatchDescription) {
mismatchDescription
.appendText("was strategy ")
.appendValue(item);
}

@Override
protected boolean matchesSafely(final @Nullable HttpExecutionStrategy item) {
protected boolean matchesSafely(final @Nullable ExecutionStrategy item) {
return Objects.equals(expected, item);
}

Expand Down
Loading