Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix retrying requests with body contents by resetting the reader index of ByteBuf #1281

Merged
merged 3 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.net.InetAddress;
import java.net.URLDecoder;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -153,13 +152,9 @@ public class ProxyEndpoint extends SyncZuulFilterAdapter<HttpRequestMessage, Htt
protected RequestAttempt currentRequestAttempt;
protected List<RequestStat> requestStats = new ArrayList<>();
protected RequestStat currentRequestStat;
private final byte[] retryBodyCache;

public static final Set<String> IDEMPOTENT_HTTP_METHODS = Sets.newHashSet("GET", "HEAD", "OPTIONS");
private static final DynamicIntegerSetProperty RETRIABLE_STATUSES_FOR_IDEMPOTENT_METHODS = new DynamicIntegerSetProperty("zuul.retry.allowed.statuses.idempotent", "500");
private static final DynamicBooleanProperty ENABLE_CACHING_BODIES = new DynamicBooleanProperty("zuul.cache.bodies", true);
private static final DynamicBooleanProperty ENABLE_CACHING_PLAINTEXT_BODIES =
new DynamicBooleanProperty("zuul.cache.bodies.plaintext", true);

/**
* Indicates how long Zuul should remember throttle events for an origin. As of this writing, throttling is used
Expand All @@ -175,7 +170,6 @@ public class ProxyEndpoint extends SyncZuulFilterAdapter<HttpRequestMessage, Htt
private static final Logger LOG = LoggerFactory.getLogger(ProxyEndpoint.class);
private static final Counter NO_RETRY_INCOMPLETE_BODY = SpectatorUtils.newCounter("zuul.no.retry","incomplete_body");
private static final Counter NO_RETRY_RESP_STARTED = SpectatorUtils.newCounter("zuul.no.retry","resp_started");
private final Counter populatedRetryBody;

public ProxyEndpoint(final HttpRequestMessage inMesg, final ChannelHandlerContext ctx,
final FilterRunner<HttpResponseMessage, ?> filters, MethodBinding<?> methodBinding) {
Expand All @@ -196,11 +190,6 @@ public ProxyEndpoint(final HttpRequestMessage inMesg, final ChannelHandlerContex
chosenServer = new AtomicReference<>(DiscoveryResult.EMPTY);
chosenHostAddr = new AtomicReference<>();

// This must happen after origin is set, since it depends on it.
this.retryBodyCache = preCacheBodyForRetryingRequests();
this.populatedRetryBody = SpectatorUtils.newCounter(
"zuul.populated.retry.body", origin == null ? "null" : origin.getName().getTarget());

this.methodBinding = methodBinding;
this.requestAttemptFactory = requestAttemptFactory;
}
Expand Down Expand Up @@ -522,40 +511,6 @@ private void onOriginConnectFailed(Throwable cause) {
}
}

@Nullable
private byte[] preCacheBodyForRetryingRequests() {
// Netty SSL handler clears body ByteBufs, so we need to cache the body if we want to retry POSTs
// Followup: We expect most origin connections to be secure, so it's okay to unconditionally cache here.
// Additionally, it's risky to assume the plaintext handlers won't clear the body (they do), so just pay the
// cost caching regardless.
if (ENABLE_CACHING_BODIES.get() && origin != null && zuulRequest.hasCompleteBody()) {
// This second check to see if the origin is secure is a kludge to avoid spending too much CPU on
// plaintext requests. Unfortunately, the cost of cahcing the body is non trivial, and as of the
// current implementation, it's only technically required for SSL. See comment above.
if (origin.getClientConfig().get(Keys.IsSecure, false) || ENABLE_CACHING_PLAINTEXT_BODIES.get()) {
ZonedDateTime lastThrottleEvent = origin.stats().lastThrottleEvent();
if (lastThrottleEvent != null) {
// This is technically the wrong method to call, but the toSeconds() method is only present in JDK9.
long timeSinceLastThrottle = Duration.between(lastThrottleEvent, ZonedDateTime.now()).getSeconds();
if (timeSinceLastThrottle <= THROTTLE_MEMORY_SECONDS.get()) {
// only cache requests if already buffered
return zuulRequest.getBody();
}
}
}
}
return null;
}

private void repopulateRetryBody() {
// note: it's not null but is empty because the content chunks exist but the actual readable bytes are 0
if (retryBodyCache != null && attemptNum > 1
&& zuulRequest.getBodyLength() == 0 && zuulRequest.getBody() != null) {
zuulRequest.setBody(retryBodyCache);
populatedRetryBody.increment();
}
}

private void writeClientRequestToOrigin(final PooledConnection conn, Duration readTimeout) {
final Channel ch = conn.getChannel();
passport.setOnChannel(ch);
Expand All @@ -572,9 +527,6 @@ private void writeClientRequestToOrigin(final PooledConnection conn, Duration re
originResponseReceiver = getOriginResponseReceiver();
pipeline.addBefore("connectionPoolHandler", OriginResponseReceiver.CHANNEL_HANDLER_NAME, originResponseReceiver);

// check if body needs to be repopulated for retry
repopulateRetryBody();

ch.write(zuulRequest);
writeBufferedBodyContent(zuulRequest, ch);
ch.flush();
Expand Down Expand Up @@ -855,8 +807,6 @@ protected void handleOriginNonSuccessResponse(final HttpResponse originResponse,
statusCategory = FAILURE_ORIGIN_THROTTLED;
niwsErrorType = ClientException.ErrorType.SERVER_THROTTLED;
obe = new OutboundException(OutboundErrorType.SERVICE_UNAVAILABLE, requestAttempts);
// TODO(carl-mastrangelo): pass in the clock for testing.
origin.stats().lastThrottleEvent(ZonedDateTime.now());
if (originConn != null) {
originConn.getServer().incrementSuccessiveConnectionFailureCount();
originConn.getServer().addToFailureCount();
Expand Down Expand Up @@ -896,6 +846,10 @@ protected void handleOriginNonSuccessResponse(final HttpResponse originResponse,
startedSendingResponseToClient, zuulRequest.hasCompleteBody(), zuulRequest.getMethod());
//detach from current origin.
unlinkFromOrigin();

// ensure body reader indexes are reset so retry is able to access the body buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth clarifying in this comment that we want Netty to have access to that ByteBuf, rather than Zuul itself.

zuulRequest.resetBodyReader();

//retry request with different origin
passport.add(ORIGIN_RETRY_START);
origin.adjustRetryPolicyIfNeeded(zuulRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ public interface ZuulMessage extends Cloneable {
@Nullable
String getBodyAsText();

/**
* Reset the chunked body reader indexes. Users SHOULD call this method before retrying requests
* as the chunked body buffer will have had the reader indexes changed during channel writes.
*/
void resetBodyReader();

/**
* Returns the maximum body size that this message is willing to hold. This value value should be more than the
* sum of lengths of the body chunks. The max body size may not be strictly enforced, and is informational.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ public Iterable<HttpContent> getBodyContents() {
return Collections.unmodifiableList(bodyChunks);
}

@Override
public void resetBodyReader() {
for (final HttpContent chunk : bodyChunks) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth only resetting this when we know we have a body, i.e. effectively setBody has been called?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the soft contract we don't set the hasBody boolean in the setBody method. So going to not check that here as the hasBody doesn't determine any logic within the message impl currently.

chunk.content().resetReaderIndex();
}
}

@Override
public boolean finishBufferedBodyIfIncomplete() {
if (! bodyBufferedCompletely) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ public int getBodyLength() {
return message.getBodyLength();
}

@Override
public void resetBodyReader() {
message.resetBodyReader();
}

@Override
public boolean hasCompleteBody() {
return message.hasCompleteBody();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public Iterable<HttpContent> getBodyContents() {
return message.getBodyContents();
}

@Override
public void resetBodyReader() {
message.resetBodyReader();
}

@Override
public void runBufferedBodyContentThroughFilter(ZuulFilter filter) {
message.runBufferedBodyContentThroughFilter(filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class BasicNettyOrigin implements NettyOrigin {
private final IClientConfig config;
private final ClientChannelManager clientChannelManager;
private final NettyRequestAttemptFactory requestAttemptFactory;
private final OriginStats stats = new OriginStats();

private final AtomicInteger concurrentRequests;
private final Counter rejectedRequests;
Expand Down Expand Up @@ -203,11 +202,6 @@ public void recordProxyRequestEnd() {
concurrentRequests.decrementAndGet();
}

@Override
public final OriginStats stats() {
return stats;
}

/* Not required for basic operation */

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,4 @@ public interface InstrumentedOrigin extends Origin {
void recordSuccessResponse();

void recordProxyRequestEnd();

/**
* Returns the mutable origin stats for this origin. Unlike the other methods in this interface,
* External callers are expected to update these numbers, rather than this object itself.
* @return
*/
default OriginStats stats() {
throw new UnsupportedOperationException();
}
}
60 changes: 0 additions & 60 deletions zuul-core/src/main/java/com/netflix/zuul/origins/OriginStats.java

This file was deleted.