Skip to content

Commit

Permalink
Ensure response filters are not left waiting for buffered body
Browse files Browse the repository at this point in the history
  • Loading branch information
artgon committed Oct 20, 2022
1 parent ef2934e commit 3218584
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ protected final AtomicInteger getRunningFilterIndex(I zuulMesg) {
return (AtomicInteger) Preconditions.checkNotNull(ctx.get(RUNNING_FILTER_IDX_SESSION_CTX_KEY), "runningFilterIndex");
}

protected final boolean isFilterAwaitingBody(I zuulMesg) {
return zuulMesg.getContext().containsKey(AWAITING_BODY_FLAG_SESSION_CTX_KEY);
protected final boolean isFilterAwaitingBody(SessionContext context) {
return context.containsKey(AWAITING_BODY_FLAG_SESSION_CTX_KEY);
}

protected final void setFilterAwaitingBody(I zuulMesg, boolean flag) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void filter(final HttpRequestMessage zuulReq, final HttpContent chunk) {
chunk.release();
}

if (isFilterAwaitingBody(zuulReq) && zuulReq.hasCompleteBody() && !(endpoint instanceof ProxyEndpoint)) {
if (isFilterAwaitingBody(zuulReq.getContext()) && zuulReq.hasCompleteBody() && !(endpoint instanceof ProxyEndpoint)) {
//whole body has arrived, resume filter chain
newChunk.touch("Endpoint body complete, resume chain, ZuulMessage: " + zuulReq);
invokeNextStage(filter(endpoint, zuulReq));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import static com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteReason.SESSION_COMPLETE;
import static com.netflix.zuul.context.CommonContextKeys.NETTY_SERVER_CHANNEL_HANDLER_CONTEXT;
import static com.netflix.zuul.netty.server.ClientRequestReceiver.ATTR_ZUUL_RESP;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_CLIENT_CANCELLED;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_CLIENT_TIMEOUT;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_LOCAL;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_LOCAL_IDLE_TIMEOUT;

import com.google.common.base.Preconditions;
import com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteEvent;
import com.netflix.netty.common.HttpRequestReadTimeoutEvent;
Expand All @@ -39,6 +39,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.unix.Errors;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -88,7 +89,7 @@ else if ((msg instanceof HttpContent)&&(zuulRequest != null)) {
public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof CompleteEvent) {
final CompleteEvent completeEvent = (CompleteEvent)evt;
fireEndpointFinish(completeEvent.getReason() != SESSION_COMPLETE);
fireEndpointFinish(completeEvent.getReason() != SESSION_COMPLETE, ctx);
}
else if (evt instanceof HttpRequestReadTimeoutEvent) {
sendResponse(FAILURE_CLIENT_TIMEOUT, 408, ctx);
Expand All @@ -101,7 +102,7 @@ else if (evt instanceof RequestCancelledEvent) {
zuulRequest.getContext().cancel();
StatusCategoryUtils.storeStatusCategoryIfNotAlreadyFailure(zuulRequest.getContext(), FAILURE_CLIENT_CANCELLED);
}
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
ctx.close();
}
super.userEventTriggered(ctx, evt);
Expand All @@ -121,15 +122,18 @@ private void sendResponse(final StatusCategory statusCategory, final int status,
headers.add("Content-Length", "0");
zuulResponse.finishBufferedBodyIfIncomplete();
responseFilterChain.filter(zuulResponse);
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
}
}

protected HttpRequestMessage getZuulRequest() {
return zuulRequest;
}

protected void fireEndpointFinish(final boolean error) {
protected void fireEndpointFinish(final boolean error, final ChannelHandlerContext ctx) {
// make sure filter chain is not left hanging
finishResponseFilters(ctx);

final ZuulFilter endpoint = ZuulEndPointRunner.getEndpoint(zuulRequest);
if (endpoint instanceof ProxyEndpoint) {
final ProxyEndpoint edgeProxyEndpoint = (ProxyEndpoint) endpoint;
Expand All @@ -138,6 +142,17 @@ protected void fireEndpointFinish(final boolean error) {
zuulRequest = null;
}

private void finishResponseFilters(ChannelHandlerContext ctx) {
// check if there are any response filters awaiting a buffered body
if (zuulRequest != null && responseFilterChain.isFilterAwaitingBody(zuulRequest.getContext())) {
HttpResponseMessage zuulResponse = ctx.channel().attr(ATTR_ZUUL_RESP).get();
if (zuulResponse != null) {
// fire a last content into the filter chain to unblock any filters awaiting a buffered body
responseFilterChain.filter(zuulResponse, new DefaultLastHttpContent());
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("zuul filter chain handler caught exception. cause=" + String.valueOf(cause), cause);
Expand All @@ -147,7 +162,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
zuulCtx.setShouldSendErrorResponse(true);
sendResponse(FAILURE_LOCAL, 500, ctx);
} else {
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
ctx.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void filter(T inMesg, HttpContent chunk) {
chunk.touch("Filter runner buffering chunk, message: " + inMesg);
inMesg.bufferBodyContents(chunk);

boolean isAwaitingBody = isFilterAwaitingBody(inMesg);
boolean isAwaitingBody = isFilterAwaitingBody(inMesg.getContext());

// Record passport states for start and end of buffering bodies.
if (isAwaitingBody) {
Expand Down

0 comments on commit 3218584

Please sign in to comment.