Skip to content

Commit

Permalink
Add new SENDING_RESPONSE_CLIENT_CLOSED state into HttpPipelineHandler…
Browse files Browse the repository at this point in the history
… FSM (ExpediaGroup#214)

Handles TCP connection closures from connected clients more predictably.
  • Loading branch information
mikkokar authored Jul 13, 2018
1 parent 281d716 commit 16d01cf
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,11 @@ public Builder<S> onStateChange(StateChangeListener<S> stateChangeListener) {
public StateMachine<S> build() {
return new StateMachine<>(initialState, stateEventHandlers, inappropriateEventHandler, stateChangeListener);
}

public Builder<S> debugTransitions(String messagePrefix) {
return this.onStateChange((oldState, newState, event)-> {
LOGGER.info("{} {}: {} -> {}", new Object[] {messagePrefix, event, oldState, newState});
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static com.hotels.styx.api.metrics.RequestProgressListener.IGNORE_REQUEST_PROGRESS;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.ACCEPTING_REQUESTS;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.SENDING_RESPONSE;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.SENDING_RESPONSE_CLIENT_CLOSED;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.TERMINATED;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.WAITING_FOR_RESPONSE;
import static com.hotels.styx.server.netty.connectors.ResponseEnhancer.DO_NOT_MODIFY_RESPONSE;
Expand Down Expand Up @@ -145,12 +146,18 @@ private StateMachine<State> createStateMachine() {

.transition(SENDING_RESPONSE, ResponseSentEvent.class, event -> onResponseSent(event.ctx))
.transition(SENDING_RESPONSE, ResponseWriteErrorEvent.class, event -> onResponseWriteError(event.ctx, event.cause))
.transition(SENDING_RESPONSE, ChannelInactiveEvent.class, event -> onChannelInactive())
.transition(SENDING_RESPONSE, ChannelInactiveEvent.class, event -> SENDING_RESPONSE_CLIENT_CLOSED)
.transition(SENDING_RESPONSE, ChannelExceptionEvent.class, event -> onChannelExceptionWhenSendingResponse(event.ctx, event.cause))
.transition(SENDING_RESPONSE, ResponseObservableErrorEvent.class, event -> logError(event.cause))
.transition(SENDING_RESPONSE, ResponseObservableErrorEvent.class, event -> logError(SENDING_RESPONSE, event.cause))
.transition(SENDING_RESPONSE, ResponseObservableCompletedEvent.class, event -> SENDING_RESPONSE)
.transition(SENDING_RESPONSE, RequestReceivedEvent.class, event -> onPrematureRequest(event.request, event.ctx))

.transition(SENDING_RESPONSE_CLIENT_CLOSED, ResponseSentEvent.class, event -> onResponseSentAfterClientClosed(event.ctx))
.transition(SENDING_RESPONSE_CLIENT_CLOSED, ResponseWriteErrorEvent.class, event -> onResponseWriteError(event.ctx, event.cause))
.transition(SENDING_RESPONSE_CLIENT_CLOSED, ChannelExceptionEvent.class, event -> logError(SENDING_RESPONSE_CLIENT_CLOSED, event.cause))
.transition(SENDING_RESPONSE_CLIENT_CLOSED, ResponseObservableErrorEvent.class, event -> logError(SENDING_RESPONSE_CLIENT_CLOSED, event.cause))
.transition(SENDING_RESPONSE_CLIENT_CLOSED, ResponseObservableCompletedEvent.class, event -> SENDING_RESPONSE_CLIENT_CLOSED)

.transition(TERMINATED, ChannelInactiveEvent.class, event -> TERMINATED)

.onInappropriateEvent((state, event) -> {
Expand All @@ -161,9 +168,9 @@ private StateMachine<State> createStateMachine() {
.build();
}

private State logError(Throwable cause) {
private State logError(State state, Throwable cause) {
httpErrorStatusListener.proxyingFailure(ongoingRequest, ongoingResponse, cause);
return SENDING_RESPONSE;
return state;
}

@VisibleForTesting
Expand All @@ -175,6 +182,7 @@ enum State {
ACCEPTING_REQUESTS,
WAITING_FOR_RESPONSE,
SENDING_RESPONSE,
SENDING_RESPONSE_CLIENT_CLOSED,
TERMINATED
}

Expand Down Expand Up @@ -304,6 +312,13 @@ private State onResponseSent(ChannelHandlerContext ctx) {
}
}

private State onResponseSentAfterClientClosed(ChannelHandlerContext ctx) {
statsSink.onComplete(ongoingRequest.id(), ongoingResponse.status().code());
ongoingRequest = null;
ctx.close();
return TERMINATED;
}

private State onResponseWriteError(ChannelHandlerContext ctx, Throwable cause) {
metrics.counter("requests.cancelled.responseWriteError").inc();
cancelSubscription();
Expand Down Expand Up @@ -350,7 +365,6 @@ private State onChannelExceptionWhenAcceptingRequests(ChannelHandlerContext ctx,
return handleChannelException(ctx, cause);
}


private State handleChannelException(ChannelHandlerContext ctx, Throwable cause) {
if (!isIoException(cause)) {
HttpResponse response = exceptionToResponse(cause, ongoingRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static com.hotels.styx.api.HttpResponse.Builder.response;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.ACCEPTING_REQUESTS;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.SENDING_RESPONSE;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.SENDING_RESPONSE_CLIENT_CLOSED;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.TERMINATED;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.WAITING_FOR_RESPONSE;
import static com.hotels.styx.server.netty.connectors.ResponseEnhancer.DO_NOT_MODIFY_RESPONSE;
Expand Down Expand Up @@ -291,17 +292,71 @@ public void decrementsRequestsOngoingCountOnChannelInactiveWhenRequestIsOngoing(
}

@Test
public void doesNotDecrementRequestsOngoingOnChannelInactiveWhenRequestIsNotOngoing() throws Exception {
HttpPipelineHandler adapter = handlerWithMocks(respondingPipeline)
.responseEnhancer(DO_NOT_MODIFY_RESPONSE)
.build();
public void allowsResponseObservableToCompleteAfterAfterDisconnect() throws Exception {
handler.channelActive(ctx);
handler.channelRead0(ctx, request);
verify(statsCollector).onRequest(eq(request.id()));

adapter.channelActive(ctx);
adapter.channelRead0(ctx, request);
responseObservable.onNext(response);

// When a remote peer (client) has received a response in full,
// and it has closed the TCP connection:
handler.channelInactive(ctx);

// ... only after that the response observable completes:
responseObservable.onCompleted();

// ... then treat it like a successfully sent response:
writerFuture.complete(null);
verify(statsCollector, never()).onTerminate(eq(request.id()));
verify(statsCollector).onComplete(eq(request.id()), eq(200));
}

@Test
public void responseFailureInSendingResponseClientConnectedState() throws Exception {
RuntimeException cause = new RuntimeException("Something went wrong");

handler.channelActive(ctx);
handler.channelRead0(ctx, request);
verify(statsCollector).onRequest(eq(request.id()));

adapter.channelInactive(ctx);
responseObservable.onNext(response);

// Remote peer (client) has closed the connection for whatever reason:
handler.channelInactive(ctx);

// ... the PipelineHandler is now in SENDING_RESPONSE_CLIENT_DISCONNECTED state,
// and response writer indicates a failure:
writerFuture.completeExceptionally(cause);
verify(statsCollector).onTerminate(eq(request.id()));
verify(statsCollector, never()).onComplete(eq(request.id()), eq(200));
assertThat(metrics.counter("outstanding").getCount(), is(0L));
assertThat(metrics.counter("requests.cancelled.responseWriteError").getCount(), is(1L));

assertThat(responseUnsubscribed.get(), is(true));
}

@Test
public void channelExceptionAfterClientClosed() throws Exception {
RuntimeException cause = new RuntimeException("Something went wrong");

handler.channelActive(ctx);
handler.channelRead0(ctx, request);
verify(statsCollector).onRequest(eq(request.id()));

responseObservable.onNext(response);

// Remote peer (client) has closed the connection for whatever reason:
handler.channelInactive(ctx);

// ... but the channel exception occurs while response writer has finished:
handler.exceptionCaught(ctx, cause);

// Then, just log the error,
verify(errorListener).proxyingFailure(eq(request), eq(response), eq(cause));

// and allow response writer event (success/failure) to conclude the state machine cycle.
assertThat(handler.state(), is(SENDING_RESPONSE_CLIENT_CLOSED));
}

@Test
Expand Down Expand Up @@ -752,28 +807,6 @@ public void responseWriteFailsInSendingResponseState() throws Exception {
assertThat(handler.state(), is(TERMINATED));
}

@Test
public void channelInactiveEventInSendingResponseState() throws Exception {
// In Sending Response state,
// The inbound TCP connection gets closes

CompletableFuture<Void> future = new CompletableFuture<>();
HttpPipelineHandler adapter = handlerWithMocks().responseEnhancer(DO_NOT_MODIFY_RESPONSE).responseWriterFactory(responseWriterFactory(future)).build();

adapter.channelActive(ctx);
adapter.channelRead0(ctx, request);
responseObservable.onNext(response);
assertThat(adapter.state(), is(SENDING_RESPONSE));
assertThat(future.isCompletedExceptionally(), is(false));

adapter.channelInactive(ctx);

assertThat(future.isCompletedExceptionally(), is(true));
assertThat(responseUnsubscribed.get(), is(true));
verify(statsCollector).onTerminate(request.id());
assertThat(adapter.state(), is(TERMINATED));
}

@Test
public void ioExceptionInSendingResponseState() throws Exception {
// In Sending Response state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.hotels.styx.api.messages.FullHttpResponse
import com.hotels.styx.api.{HttpClient, HttpRequest, HttpResponse}
import com.hotels.styx.client.HttpConfig.newHttpConfigBuilder
import com.hotels.styx.client.HttpRequestOperationFactory.Builder.httpRequestOperationFactoryBuilder
import com.hotels.styx.client.connectionpool.CloseAfterUseConnectionDestination.Factory
import com.hotels.styx.client.connectionpool.CloseAfterUseConnectionDestination
import com.hotels.styx.client.netty.connectionpool.NettyConnectionFactory
import com.hotels.styx.client.{ConnectionSettings, SimpleNettyHttpClient}
import rx.lang.scala.JavaConversions.toScalaObservable
Expand All @@ -33,7 +33,7 @@ trait StyxClientSupplier {
val FIVE_SECONDS: Int = 5 * 1000

val client: HttpClient = new SimpleNettyHttpClient.Builder()
.connectionDestinationFactory(new Factory()
.connectionDestinationFactory(new CloseAfterUseConnectionDestination.Factory()
.connectionSettings(new ConnectionSettings(1000))
.connectionFactory(new NettyConnectionFactory.Builder()
.name("scalatest-e2e-client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.hotels.styx.proxy

import com.hotels.styx.api.HttpRequest.Builder.get
import com.hotels.styx.api.messages.HttpResponseStatus.OK
import com.hotels.styx.support.configuration.{ConnectionPoolSettings, HttpBackend, Origins}
import com.hotels.styx.support.{NettyOrigins, TestClientSupport}
Expand Down Expand Up @@ -59,7 +60,7 @@ class OriginCancellationMetrics extends FunSpec
HttpHeader(CONTENT_LENGTH, "0")
))

val request = api.HttpRequest.Builder.get(styxServer.routerURL("/OriginCancellationMetrics/1")).build()
val request = get(styxServer.routerURL("/OriginCancellationMetrics/1")).build()
val response = decodedRequest(request)
response.status() should be(OK)

Expand All @@ -73,7 +74,7 @@ class OriginCancellationMetrics extends FunSpec
HttpHeader(CONTENT_LENGTH, "0")
))

val request = api.HttpRequest.Builder.get(styxServer.routerURL("/OriginCancellationMetrics/2")).build()
val request = get(styxServer.routerURL("/OriginCancellationMetrics/2")).build()
val response = decodedRequest(request)
response.status() should be(BAD_GATEWAY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class OutstandingRequestsSpec extends FunSpec

client.disconnect()

eventually(timeout(2 seconds)) {
// Eventually times out as per responseTimeout configuration:
eventually(timeout(4 seconds)) {
styxServer.metricsSnapshot.count("requests.outstanding").get should be(0)
}
}
Expand Down

0 comments on commit 16d01cf

Please sign in to comment.