Skip to content

Commit

Permalink
RetryingHttpRequesterFilter: drain response before mapping to excep…
Browse files Browse the repository at this point in the history
…tion (#2391)

Motivation:

`RetryingHttpRequesterFilter` has `responseMapper` feature that helps
users to translate some responses into `HttpResponseException`. However,
it doesn't drain the response payload body, which will cause connection
leaks.

Modifications:

- Drain response payload body before returning `HttpResponseException`;
- Improve tests to verify connections do not leak;
- Use `assertThrows` instead of `fail` in tests;

Result:

Connections do not leak when `responseMapper` feature is used.
  • Loading branch information
idelpivnitskiy authored Oct 11, 2022
1 parent f927517 commit d9d8e93
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,12 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
final StreamingHttpRequest request) {
Single<StreamingHttpResponse> single = delegate.request(request);
if (responseMapper != null) {
single = single.map(resp -> {
single = single.flatMap(resp -> {
final HttpResponseException exception = responseMapper.apply(resp);
if (exception != null) {
throw exception;
}

return resp;
return exception != null ?
// Drain response payload body before discarding it:
resp.payloadBody().ignoreElements().onErrorComplete().concat(Single.failed(exception)) :
Single.succeeded(resp);
});
}

Expand Down Expand Up @@ -244,7 +243,7 @@ public static class HttpResponseException extends RuntimeException {

private static final long serialVersionUID = -7182949760823647710L;

// FIXME: 0.43 - remove deprecated method
// FIXME: 0.43 - make deprecated field private
/**
* {@link HttpResponseMetaData} of the response that caused this exception.
*
Expand All @@ -253,7 +252,7 @@ public static class HttpResponseException extends RuntimeException {
@Deprecated
public final HttpResponseMetaData metaData;

// FIXME: 0.43 - remove deprecated method
// FIXME: 0.43 - remove deprecated field
/**
* Exception detail message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import io.servicetalk.http.api.BlockingHttpClient;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpConnectionFilter;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.HostAndPort;
Expand Down Expand Up @@ -64,7 +67,6 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

class RetryingHttpRequesterFilterTest {

Expand Down Expand Up @@ -123,13 +125,9 @@ void maxTotalRetries() {
failingClient = failingConnClientBuilder
.appendClientFilter(new Builder().maxTotalRetries(1).build())
.buildBlocking();
try {
failingClient.request(failingClient.get("/"));
fail("Request is expected to fail.");
} catch (Exception e) {
assertThat("Unexpected exception.", e, instanceOf(RetryableException.class));
assertThat("Unexpected calls to select.", lbSelectInvoked.get(), is(2));
}
Exception e = assertThrows(Exception.class, () -> failingClient.request(failingClient.get("/")));
assertThat("Unexpected exception.", e, instanceOf(RetryableException.class));
assertThat("Unexpected calls to select.", lbSelectInvoked.get(), is(2));
}

@Test
Expand Down Expand Up @@ -157,27 +155,21 @@ void requestRetryingPredicateWithConditionalAppend() {
}

private void assertRequestRetryingPred(final BlockingHttpClient client) {
try {
client.request(client.get("/"));
fail("Request is expected to fail.");
} catch (Exception e) {
assertThat("Unexpected exception.", e, instanceOf(RetryableException.class));
// Account for LB readiness
assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(1.0, 1.0));
}

try {
client.request(client.get("/retry"));
fail("Request is expected to fail.");
} catch (Exception e) {
assertThat("Unexpected exception.", e, instanceOf(RetryableException.class));
// 1 Run + 3 Retries + 1 residual count from previous request + account for LB readiness
assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0));
}
Exception e = assertThrows(Exception.class, () -> client.request(client.get("/")));
assertThat("Unexpected exception.", e, instanceOf(RetryableException.class));
// Account for LB readiness
assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(1.0, 1.0));

e = assertThrows(Exception.class, () -> client.request(client.get("/retry")));
assertThat("Unexpected exception.", e, instanceOf(RetryableException.class));
// 1 Run + 3 Retries + 1 residual count from previous request + account for LB readiness
assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0));
}

@Test
void responseRetryingPredicate() {
void testResponseMapper() {
AtomicInteger newConnectionCreated = new AtomicInteger();
AtomicInteger responseDrained = new AtomicInteger();
normalClient = normalClientBuilder
.appendClientFilter(new Builder()
.responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ?
Expand All @@ -187,15 +179,24 @@ void responseRetryingPredicate() {
// Retry only responses marked so
.retryResponses((requestMetaData, throwable) -> ofImmediate())
.build())
.appendConnectionFilter(c -> {
newConnectionCreated.incrementAndGet();
return new StreamingHttpConnectionFilter(c) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate().request(request)
.map(response -> response.transformPayloadBody(payload -> payload
.whenFinally(responseDrained::incrementAndGet)));
}
};
})
.buildBlocking();
try {
normalClient.request(normalClient.get("/"));
fail("Request is expected to fail.");
} catch (Exception e) {
e.printStackTrace();
assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class));
assertThat("Unexpected calls to select.", lbSelectInvoked.get(), is(4));
}
HttpResponseException e = assertThrows(HttpResponseException.class,
() -> normalClient.request(normalClient.get("/")));
assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class));
assertThat("Unexpected calls to select.", lbSelectInvoked.get(), is(4));
assertThat("Response payload body was not drained on every mapping", responseDrained.get(), is(4));
assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1));
}

@Test()
Expand Down

0 comments on commit d9d8e93

Please sign in to comment.