Skip to content

Commit

Permalink
Fix H2 Flush Strategy Aggregated API Optimization (#2033)
Browse files Browse the repository at this point in the history
Motivation:
72a0e64 made it such that
flushOnEnd optimizations weren't always applied when using
HTTP/2 and gRPC. This may have around a 20% performance impact.

Modifications:
- NettyHttpServer should use SplittingFLushStrategy in all cases.
  This is necessary because the flush strategy needs to be applied
  in a lazy fashion after the write starts, which is a capability
  provided by SplittingFLushStrategy. The terminal conditions are
  also commonly enforced by SplittingFLushStrategy and should be
  reused.
- SplittingFLushStrategy should handle exceptions from the flush
  boundaries. This may happen if header parsing throws (duplicate
  content-length header).
- While aggregating an in memory request track the content-length
  to avoid multiple iterations.

Result:
20% rps performance improvement for aggregated h2/gRPC use cases.
  • Loading branch information
Scottmitch committed Dec 21, 2021
1 parent d925eae commit 30550d5
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;

import static io.servicetalk.buffer.api.CharSequences.parseLong;
Expand Down Expand Up @@ -204,6 +203,26 @@ static Publisher<Object> flatEmptyMessage(final HttpProtocolVersion protocolVers
flatMessage.concat(messageBody.ignoreElements());
}

private static final class ContentLengthList<T> extends ArrayList<T> {
int contentLength;

ContentLengthList(int contentLength, int arraySize) {
super(arraySize);
this.contentLength = contentLength;
}

@Override
public int hashCode() {
return 31 * contentLength + super.hashCode();
}

@Override
public boolean equals(Object o) {
return o instanceof ContentLengthList && ((ContentLengthList<?>) o).contentLength == contentLength &&
super.equals(o);
}
}

private static Publisher<Object> setContentLength(final HttpMetaData metadata,
final Publisher<Object> messageBody,
final BiIntConsumer<HttpHeaders> contentLengthUpdater,
Expand All @@ -217,16 +236,21 @@ private static Publisher<Object> setContentLength(final HttpMetaData metadata,
// avoid allocating a list if the Publisher emits only a single Buffer
return item;
}
List<Object> items;
if (reduction instanceof List) {
final ContentLengthList<Object> items;
if (reduction instanceof ContentLengthList) {
@SuppressWarnings("unchecked")
List<Object> itemsUnchecked = (List<Object>) reduction;
ContentLengthList<Object> itemsUnchecked = (ContentLengthList<Object>) reduction;
items = itemsUnchecked;
} else {
// this method is called if the payload has been aggregated, we expect <buffer*, trailers?>.
items = new ArrayList<>(2);
items = new ContentLengthList<>(
reduction instanceof Buffer ? ((Buffer) reduction).readableBytes() : 0, 2);
items.add(reduction);
}
if (item instanceof Buffer) {
items.contentLength += ((Buffer) item).readableBytes();
}

items.add(item);
return items;
}).flatMapPublisher(reduction -> {
Expand All @@ -240,20 +264,16 @@ private static Publisher<Object> setContentLength(final HttpMetaData metadata,
} else if (reduction instanceof Buffer) {
final Buffer buffer = (Buffer) reduction;
contentLength = buffer.readableBytes();
if (appendTrailers) {
flatRequest = contentLength != 0 ? from(metadata, buffer, EmptyHttpHeaders.INSTANCE) :
from(metadata, EmptyHttpHeaders.INSTANCE);
if (contentLength == 0) {
flatRequest = appendTrailers ? from(metadata, EmptyHttpHeaders.INSTANCE) : from(metadata);
} else {
flatRequest = contentLength != 0 ? from(metadata, buffer) : from(metadata);
flatRequest = appendTrailers ? from(metadata, buffer, EmptyHttpHeaders.INSTANCE) :
from(metadata, buffer);
}
} else if (reduction instanceof List) {
} else if (reduction instanceof ContentLengthList) {
@SuppressWarnings("unchecked")
final List<Object> items = (List<Object>) reduction;
for (Object item : items) {
if (item instanceof Buffer) {
contentLength += ((Buffer) item).readableBytes();
}
}
final ContentLengthList<Object> items = (ContentLengthList<Object>) reduction;
contentLength = items.contentLength;
if (appendTrailers && !(items.get(items.size() - 1) instanceof HttpHeaders)) {
items.add(EmptyHttpHeaders.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import static io.servicetalk.transport.netty.internal.SplittingFlushStrategy.FlushBoundaryProvider.FlushBoundary.Start;

final class NettyHttpServer {

private static final Logger LOGGER = LoggerFactory.getLogger(NettyHttpServer.class);

private NettyHttpServer() {
Expand Down Expand Up @@ -249,8 +248,7 @@ static final class NettyHttpServerConnection extends HttpServiceContext implemen
private final NettyConnection<Object, Object> connection;
private final HttpHeadersFactory headersFactory;
private final HttpExecutionContext executionContext;
@Nullable
private final SplittingFlushStrategy splittingFlushStrategy;
private final SplittingFlushStrategy flushStrategy;
private final boolean drainRequestPayloadBody;
private final boolean requireTrailerHeader;

Expand All @@ -273,41 +271,39 @@ static final class NettyHttpServerConnection extends HttpServiceContext implemen
connection.executionContext().ioExecutor(), connection.executionContext().executor(),
HttpExecutionStrategies.offloadNever());
this.service = service;
// H2 uses child channels, doesn't support pipelining, and doesn't repeat the write operation on the same
// channel. We therefore don't need the splitting flush in this case.
if (protocol().major() <= 1) {
this.splittingFlushStrategy = new SplittingFlushStrategy(connection.defaultFlushStrategy(),
new FlushBoundaryProvider() {
private long contentLength;
@Override
public FlushBoundary detectBoundary(@Nullable final Object itemWritten) {
if (itemWritten instanceof HttpResponseMetaData) {
final HttpResponseMetaData metadata = (HttpResponseMetaData) itemWritten;
contentLength = getContentLength(metadata);
// The content length maybe unknown at this point (e.g. 204 response) but then later
// determined to be 0. In that case we should conservatively use End and rely upon
// adjustForMissingBoundaries to accommodate if more data comes. Otherwise the
// FlushStrategy may not trigger a flush (e.g. flushOnEnd) and if so the response
// won't actually be written.
return contentLength > 0 ||
(contentLength < 0 && isTransferEncodingChunked(metadata.headers())) ?
Start : End;
}
if (itemWritten instanceof Buffer) {
return contentLength > 0 &&
(contentLength -= ((Buffer) itemWritten).readableBytes()) <= 0 ?
End : InProgress;
}
if (itemWritten instanceof HttpHeaders) {
return End;
}
return InProgress;
flushStrategy = new SplittingFlushStrategy(connection.defaultFlushStrategy(),
// h2 may return a single HttpResponseMetaData for an empty response in some scenarios,
// otherwise a trailers object will be included to indicate the end because content-length isn't
// mutually exclusive from trailers in h2.
protocol().major() > 1 ?
itemWritten -> (itemWritten instanceof HttpResponseMetaData &&
emptyMessageBody((HttpResponseMetaData) itemWritten)) ||
itemWritten instanceof HttpHeaders ? End : InProgress :
new FlushBoundaryProvider() {
private long contentLength;
@Override
public FlushBoundary detectBoundary(@Nullable final Object itemWritten) {
if (itemWritten instanceof HttpResponseMetaData) {
final HttpResponseMetaData metadata = (HttpResponseMetaData) itemWritten;
contentLength = getContentLength(metadata);
// The content length maybe unknown at this point (e.g. 204 response) but then later
// determined to be 0. In that case we should conservatively use End and rely upon
// adjustForMissingBoundaries to accommodate if more data comes. Otherwise the
// FlushStrategy may not trigger a flush (e.g. flushOnEnd) and if so the response
// won't actually be written.
return contentLength > 0 || (contentLength < 0 &&
(!emptyMessageBody(metadata) &&
isTransferEncodingChunked(metadata.headers()))) ? Start : End;
}
});
connection.updateFlushStrategy((current, isCurrentOriginal) -> splittingFlushStrategy);
} else {
this.splittingFlushStrategy = null;
}
if (itemWritten instanceof Buffer) {
return contentLength > 0 &&
(contentLength -= ((Buffer) itemWritten).readableBytes()) <= 0 ?
End : InProgress;
}
return itemWritten instanceof HttpHeaders ? End : InProgress;
}
});
connection.updateFlushStrategy((current, isCurrentOriginal) -> flushStrategy);
this.drainRequestPayloadBody = drainRequestPayloadBody;
this.requireTrailerHeader = requireTrailerHeader;
}
Expand All @@ -325,8 +321,7 @@ void process(final boolean handleMultipleRequests) {

@Override
public Cancellable updateFlushStrategy(final FlushStrategyProvider strategyProvider) {
return splittingFlushStrategy == null ? connection.updateFlushStrategy(strategyProvider) :
splittingFlushStrategy.updateFlushStrategy(strategyProvider);
return flushStrategy.updateFlushStrategy(strategyProvider);
}

@Override
Expand Down Expand Up @@ -390,22 +385,19 @@ public void onComplete() {
// adjustForMissingBoundaries to accommodate for missing End boundaries, so just flush on
// each. SplittingFlushStrategy should be removed when NettyHttpServer writes per request
// instead of a single stream with repeat() operator, and this code can also be removed.
if (isHeadRequest && splittingFlushStrategy != null) {
splittingFlushStrategy.updateFlushStrategy(
Cancellable c = null;
if (isHeadRequest) {
flushStrategy.updateFlushStrategy(
(prev, isOriginal) -> isOriginal ? flushOnEach() : prev, 1);
} else {
final FlushStrategy flushStrategy = determineFlushStrategyForApi(response);
if (flushStrategy != null) {
final FlushStrategyProvider provider = (prev, isOriginal) ->
isOriginal ? flushStrategy : prev;
if (splittingFlushStrategy != null) {
splittingFlushStrategy.updateFlushStrategy(provider, 1);
} else {
updateFlushStrategy(provider);
}
c = updateFlushStrategy((prev, isOriginal) -> isOriginal ? flushStrategy : prev);
}
}
return handleResponse(protocol(), requestMethod, response);

Publisher<Object> pub = handleResponse(protocol(), requestMethod, response);
return c == null ? pub : pub.beforeFinally(c::cancel);
});

if (drainRequestPayloadBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void updateFlushStrategy(FlushStrategyProvider strategyProvider, int boun
*/
@FunctionalInterface
public interface FlushBoundaryProvider {

/**
* An enumeration for boundary of flushes on which this {@link SplittingFlushStrategy} splits writes.
*/
Expand All @@ -115,7 +114,7 @@ enum FlushBoundary {
* Detect the {@link FlushBoundary} for the passed {@code itemWritten}.
*
* @param itemWritten Item written which determines the {@link FlushBoundary}.
* @return {@link FlushBoundary} representing the passed {@code itemWritten}.
* @return {@link FlushBoundary} representing the passed {@code itemWritten}.
*/
FlushBoundary detectBoundary(@Nullable Object itemWritten);
}
Expand Down Expand Up @@ -150,7 +149,14 @@ public void writeStarted() {

@Override
public void itemWritten(@Nullable final Object written) {
FlushBoundary boundary = flushBoundaryProvider.detectBoundary(written);
FlushBoundary boundary;
try {
boundary = flushBoundaryProvider.detectBoundary(written);
} catch (Throwable cause) {
// Exceptions are not supported, consider this a boundary to force a flush. This may happen if there are
// multiple content-length headers which haven't been caught by other validation yet.
boundary = End;
}
adjustForMissingBoundaries(boundary);
previousBoundary = boundary;
switch (boundary) {
Expand Down

0 comments on commit 30550d5

Please sign in to comment.