Skip to content

Commit

Permalink
Merge pull request #1353 from Netflix/finish-outfilters
Browse files Browse the repository at this point in the history
Ensure response filters are not left waiting for buffered body
  • Loading branch information
artgon authored Oct 21, 2022
2 parents 9de28bb + 8a8868c commit 9e7c086
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.netflix.zuul.message.http.HttpRequestInfo;
import com.netflix.zuul.message.http.HttpRequestMessage;
import com.netflix.zuul.message.http.HttpResponseMessage;
import com.netflix.zuul.netty.SpectatorUtils;
import com.netflix.zuul.netty.server.MethodBinding;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpContent;
Expand Down Expand Up @@ -115,8 +116,8 @@ protected final AtomicInteger getRunningFilterIndex(I zuulMesg) {
return (AtomicInteger) Preconditions.checkNotNull(ctx.get(RUNNING_FILTER_IDX_SESSION_CTX_KEY), "runningFilterIndex");
}

protected final boolean isFilterAwaitingBody(I zuulMesg) {
return zuulMesg.getContext().containsKey(AWAITING_BODY_FLAG_SESSION_CTX_KEY);
protected final boolean isFilterAwaitingBody(SessionContext context) {
return context.containsKey(AWAITING_BODY_FLAG_SESSION_CTX_KEY);
}

protected final void setFilterAwaitingBody(I zuulMesg, boolean flag) {
Expand All @@ -140,7 +141,15 @@ protected final void invokeNextStage(final O zuulMesg, final HttpContent chunk)
try (TaskCloseable ignored =
traceTask(this, s -> s.getClass().getSimpleName() + ".fireChannelReadChunk")) {
addPerfMarkTags(zuulMesg);
getChannelHandlerContext(zuulMesg).fireChannelRead(chunk);
ChannelHandlerContext channelHandlerContext = getChannelHandlerContext(zuulMesg);
if (!channelHandlerContext.channel().isActive()) {
zuulMesg.getContext().cancel();
zuulMesg.disposeBufferedBody();
SpectatorUtils.newCounter("zuul.filterChain.chunk.hanging",
zuulMesg.getClass().getSimpleName()).increment();
} else {
channelHandlerContext.fireChannelRead(chunk);
}
}
}
}
Expand All @@ -157,7 +166,16 @@ protected final void invokeNextStage(final O zuulMesg) {
try (TaskCloseable ignored =
traceTask(this, s -> s.getClass().getSimpleName() + ".fireChannelRead")) {
addPerfMarkTags(zuulMesg);
getChannelHandlerContext(zuulMesg).fireChannelRead(zuulMesg);
ChannelHandlerContext channelHandlerContext = getChannelHandlerContext(zuulMesg);
if (!channelHandlerContext.channel().isActive()) {
zuulMesg.getContext().cancel();
zuulMesg.disposeBufferedBody();
SpectatorUtils.newCounter("zuul.filterChain.message.hanging",
zuulMesg.getClass().getSimpleName()).increment();
}
else {
channelHandlerContext.fireChannelRead(zuulMesg);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void filter(final HttpRequestMessage zuulReq, final HttpContent chunk) {
chunk.release();
}

if (isFilterAwaitingBody(zuulReq) && zuulReq.hasCompleteBody() && !(endpoint instanceof ProxyEndpoint)) {
if (isFilterAwaitingBody(zuulReq.getContext()) && zuulReq.hasCompleteBody() && !(endpoint instanceof ProxyEndpoint)) {
//whole body has arrived, resume filter chain
ByteBufUtil.touch(newChunk, "Endpoint body complete, resume chain, ZuulMessage: ", zuulReq);
invokeNextStage(filter(endpoint, zuulReq));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import static com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteReason.SESSION_COMPLETE;
import static com.netflix.zuul.context.CommonContextKeys.NETTY_SERVER_CHANNEL_HANDLER_CONTEXT;
import static com.netflix.zuul.netty.server.ClientRequestReceiver.ATTR_ZUUL_RESP;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_CLIENT_CANCELLED;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_CLIENT_TIMEOUT;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_LOCAL;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_LOCAL_IDLE_TIMEOUT;

import com.google.common.base.Preconditions;
import com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteEvent;
import com.netflix.netty.common.HttpRequestReadTimeoutEvent;
Expand All @@ -34,11 +34,13 @@
import com.netflix.zuul.message.http.HttpResponseMessage;
import com.netflix.zuul.message.http.HttpResponseMessageImpl;
import com.netflix.zuul.netty.RequestCancelledEvent;
import com.netflix.zuul.netty.SpectatorUtils;
import com.netflix.zuul.stats.status.StatusCategory;
import com.netflix.zuul.stats.status.StatusCategoryUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.unix.Errors;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -88,7 +90,7 @@ else if ((msg instanceof HttpContent)&&(zuulRequest != null)) {
public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof CompleteEvent) {
final CompleteEvent completeEvent = (CompleteEvent)evt;
fireEndpointFinish(completeEvent.getReason() != SESSION_COMPLETE);
fireEndpointFinish(completeEvent.getReason() != SESSION_COMPLETE, ctx);
}
else if (evt instanceof HttpRequestReadTimeoutEvent) {
sendResponse(FAILURE_CLIENT_TIMEOUT, 408, ctx);
Expand All @@ -101,7 +103,7 @@ else if (evt instanceof RequestCancelledEvent) {
zuulRequest.getContext().cancel();
StatusCategoryUtils.storeStatusCategoryIfNotAlreadyFailure(zuulRequest.getContext(), FAILURE_CLIENT_CANCELLED);
}
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
ctx.close();
}
super.userEventTriggered(ctx, evt);
Expand All @@ -121,15 +123,18 @@ private void sendResponse(final StatusCategory statusCategory, final int status,
headers.add("Content-Length", "0");
zuulResponse.finishBufferedBodyIfIncomplete();
responseFilterChain.filter(zuulResponse);
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
}
}

protected HttpRequestMessage getZuulRequest() {
return zuulRequest;
}

protected void fireEndpointFinish(final boolean error) {
protected void fireEndpointFinish(final boolean error, final ChannelHandlerContext ctx) {
// make sure filter chain is not left hanging
finishResponseFilters(ctx);

final ZuulFilter endpoint = ZuulEndPointRunner.getEndpoint(zuulRequest);
if (endpoint instanceof ProxyEndpoint) {
final ProxyEndpoint edgeProxyEndpoint = (ProxyEndpoint) endpoint;
Expand All @@ -138,6 +143,19 @@ protected void fireEndpointFinish(final boolean error) {
zuulRequest = null;
}

private void finishResponseFilters(ChannelHandlerContext ctx) {
// check if there are any response filters awaiting a buffered body
if (zuulRequest != null && responseFilterChain.isFilterAwaitingBody(zuulRequest.getContext())) {
HttpResponseMessage zuulResponse = ctx.channel().attr(ATTR_ZUUL_RESP).get();
if (zuulResponse != null) {
// fire a last content into the filter chain to unblock any filters awaiting a buffered body
responseFilterChain.filter(zuulResponse, new DefaultLastHttpContent());
SpectatorUtils.newCounter("zuul.filterChain.bodyBuffer.hanging",
zuulRequest.getContext().getRouteVIP()).increment();
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("zuul filter chain handler caught exception. cause=" + String.valueOf(cause), cause);
Expand All @@ -147,7 +165,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
zuulCtx.setShouldSendErrorResponse(true);
sendResponse(FAILURE_LOCAL, 500, ctx);
} else {
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
ctx.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void filter(T inMesg, HttpContent chunk) {
ByteBufUtil.touch(chunk, "Filter runner buffering chunk, message: ", inMesg);
inMesg.bufferBodyContents(chunk);

boolean isAwaitingBody = isFilterAwaitingBody(inMesg);
boolean isAwaitingBody = isFilterAwaitingBody(inMesg.getContext());

// Record passport states for start and end of buffering bodies.
if (isAwaitingBody) {
Expand Down

0 comments on commit 9e7c086

Please sign in to comment.