Skip to content

Commit

Permalink
Netty Expect 100-Continue client: chunked/buffered input (with example)
Browse files Browse the repository at this point in the history
Signed-off-by: Maxim Nesen <[email protected]>
  • Loading branch information
senivam committed Feb 7, 2025
1 parent 382f69e commit cbe8e84
Show file tree
Hide file tree
Showing 10 changed files with 977 additions and 220 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2025 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,111 +16,130 @@

package org.glassfish.jersey.netty.connector;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import org.glassfish.jersey.client.ClientRequest;
import io.netty.handler.codec.http.LastHttpContent;

import javax.ws.rs.ProcessingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

public class JerseyExpectContinueHandler extends ChannelInboundHandlerAdapter {

private boolean isExpected;
private ExpectationState currentState = ExpectationState.IDLE;

private static final List<HttpResponseStatus> statusesToBeConsidered = Arrays.asList(HttpResponseStatus.CONTINUE,
HttpResponseStatus.UNAUTHORIZED, HttpResponseStatus.EXPECTATION_FAILED,
HttpResponseStatus.METHOD_NOT_ALLOWED, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
private static final List<HttpResponseStatus> finalErrorStatuses = Arrays.asList(HttpResponseStatus.UNAUTHORIZED,
HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
private static final List<HttpResponseStatus> reSendErrorStatuses = Arrays.asList(
HttpResponseStatus.METHOD_NOT_ALLOWED,
HttpResponseStatus.EXPECTATION_FAILED);

private CompletableFuture<HttpResponseStatus> expectedFuture = new CompletableFuture<>();
private static final List<HttpResponseStatus> errorStatuses = new ArrayList<>(finalErrorStatuses);
private static final List<HttpResponseStatus> statusesToBeConsidered = new ArrayList<>(reSendErrorStatuses);

static {
errorStatuses.addAll(reSendErrorStatuses);
statusesToBeConsidered.addAll(finalErrorStatuses);
statusesToBeConsidered.add(HttpResponseStatus.CONTINUE);
}

private HttpResponseStatus status = null;

private CountDownLatch latch = null;

private boolean propagateLastMessage = false;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (isExpected && msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
if (statusesToBeConsidered.contains(response.status())) {
expectedFuture.complete(response.status());
}
if (!HttpResponseStatus.CONTINUE.equals(response.status())) {

if (checkExpectResponse(msg)) {
currentState = ExpectationState.AWAITING;
}
switch (currentState) {
case AWAITING:
final HttpResponse response = (HttpResponse) msg;
status = response.status();

boolean handshakeDone = processErrorStatuses(status, ctx) || msg instanceof FullHttpMessage;
currentState = (handshakeDone) ? ExpectationState.IDLE : ExpectationState.FINISHING;
processLatch();
return;
case FINISHING:
if (msg instanceof LastHttpContent) {
currentState = ExpectationState.IDLE;
if (propagateLastMessage) {
propagateLastMessage = false;
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}
}
return;
default:
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
} else {
ctx.pipeline().remove(JerseyExpectContinueHandler.class);
}
} else {
if (!isExpected) {
ctx.pipeline().remove(JerseyExpectContinueHandler.class);
}
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
}
}

CompletableFuture<HttpResponseStatus> processExpect100ContinueRequest(HttpRequest nettyRequest,
ClientRequest jerseyRequest,
Channel ch,
Integer timeout)
throws InterruptedException, ExecutionException, TimeoutException {
//check for 100-Continue presence/availability
final Expect100ContinueConnectorExtension expect100ContinueExtension
= new Expect100ContinueConnectorExtension();

final DefaultFullHttpRequest nettyRequestHeaders =
new DefaultFullHttpRequest(nettyRequest.protocolVersion(), nettyRequest.method(), nettyRequest.uri());
nettyRequestHeaders.headers().setAll(nettyRequest.headers());

if (!nettyRequestHeaders.headers().contains(HttpHeaderNames.HOST)) {
nettyRequestHeaders.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
private boolean checkExpectResponse(Object msg) {
if (currentState == ExpectationState.IDLE && latch != null && msg instanceof HttpResponse) {
return statusesToBeConsidered.contains(((HttpResponse) msg).status());
}
return false;
}

//If Expect:100-continue feature is enabled and client supports it, the nettyRequestHeaders will be
//enriched with the 'Expect:100-continue' header.
expect100ContinueExtension.invoke(jerseyRequest, nettyRequestHeaders);

final ChannelFuture expect100ContinueFuture = (HttpUtil.is100ContinueExpected(nettyRequestHeaders))
// Send only head of the HTTP request enriched with Expect:100-continue header.
? ch.writeAndFlush(nettyRequestHeaders)
// Expect:100-Continue either is not supported or is turned off
: null;
isExpected = expect100ContinueFuture != null;
if (!isExpected) {
ch.pipeline().remove(JerseyExpectContinueHandler.class);
} else {
final HttpResponseStatus status = expectedFuture
.get(timeout, TimeUnit.MILLISECONDS);

processExpectationStatus(status);
boolean processErrorStatuses(HttpResponseStatus status, ChannelHandlerContext ctx)
throws InterruptedException {
if (reSendErrorStatuses.contains(status)) {
propagateLastMessage = true;
}
return expectedFuture;
return (finalErrorStatuses.contains(status));
}

private void processExpectationStatus(HttpResponseStatus status)
throws TimeoutException {
boolean processExpectationStatus()
throws TimeoutException, IOException {
if (status == null) {
throw new TimeoutException(); // continue without expectations
}
if (!statusesToBeConsidered.contains(status)) {
throw new ProcessingException(LocalizationMessages
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
}
if (!expectedFuture.isDone() || HttpResponseStatus.EXPECTATION_FAILED.equals(status)) {
isExpected = false;
throw new TimeoutException(); // continue without expectations

if (finalErrorStatuses.contains(status)) {
throw new IOException(LocalizationMessages
.EXPECT_100_CONTINUE_FAILED_REQUEST_FAILED(), null);
}
if (!HttpResponseStatus.CONTINUE.equals(status)) {
throw new ProcessingException(LocalizationMessages
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);

if (reSendErrorStatuses.contains(status)) {
throw new TimeoutException(LocalizationMessages
.EXPECT_100_CONTINUE_FAILED_REQUEST_SHOULD_BE_RESENT()); // Re-send request without expectations
}

return true;
}

void resetHandler() {
latch = null;
}

void attachCountDownLatch(CountDownLatch latch) {
this.latch = latch;
}

private void processLatch() {
if (latch != null) {
latch.countDown();
}
}

boolean isExpected() {
return isExpected;
private enum ExpectationState {
AWAITING,
FINISHING,
IDLE
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -69,6 +68,7 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.ssl.ApplicationProtocolConfig;
Expand Down Expand Up @@ -256,6 +256,8 @@ protected void execute(final ClientRequest jerseyRequest, final Set<URI> redirec
}
}

final JerseyExpectContinueHandler expect100ContinueHandler = new JerseyExpectContinueHandler();

if (chan == null) {
Integer connectTimeout = jerseyRequest.resolveProperty(ClientProperties.CONNECT_TIMEOUT, 0);
Bootstrap b = new Bootstrap();
Expand Down Expand Up @@ -328,8 +330,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
final Integer maxInitialLineLength = ClientProperties.getValue(config.getProperties(),
NettyClientProperties.MAX_INITIAL_LINE_LENGTH,
NettyClientProperties.DEFAULT_INITIAL_LINE_LENGTH);

p.addLast(new HttpClientCodec(maxInitialLineLength, maxHeaderSize, maxChunkSize));
p.addLast(EXPECT_100_CONTINUE_HANDLER, expect100ContinueHandler);
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpContentDecompressor());
}
Expand Down Expand Up @@ -358,11 +360,10 @@ protected void initChannel(SocketChannel ch) throws Exception {
final Channel ch = chan;
JerseyClientHandler clientHandler =
new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone, redirectUriHistory, this);
final JerseyExpectContinueHandler expect100ContinueHandler = new JerseyExpectContinueHandler();

// read timeout makes sense really as an inactivity timeout
ch.pipeline().addLast(READ_TIMEOUT_HANDLER,
new IdleStateHandler(0, 0, timeout, TimeUnit.MILLISECONDS));
ch.pipeline().addLast(EXPECT_100_CONTINUE_HANDLER, expect100ContinueHandler);
ch.pipeline().addLast(REQUEST_HANDLER, clientHandler);

responseDone.whenComplete((_r, th) -> {
Expand Down Expand Up @@ -445,22 +446,11 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
// // Set later after the entity is "written"
// break;
}
try {
expect100ContinueHandler.processExpect100ContinueRequest(nettyRequest, jerseyRequest,
ch, expect100ContinueTimeout);
} catch (ExecutionException e) {
responseDone.completeExceptionally(e);
} catch (TimeoutException e) {
//Expect:100-continue allows timeouts by the spec
//just removing the pipeline from processing
if (ch.pipeline().context(JerseyExpectContinueHandler.class) != null) {
ch.pipeline().remove(EXPECT_100_CONTINUE_HANDLER);
}
}

final CountDownLatch headersSet = new CountDownLatch(1);
final CountDownLatch contentLengthSet = new CountDownLatch(1);


jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(int contentLength) throws IOException {
Expand All @@ -485,7 +475,6 @@ public void run() {

try {
jerseyRequest.writeEntity();

if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
contentLengthSet.countDown();
Expand All @@ -505,12 +494,36 @@ public void run() {
});

headersSet.await();
if (!expect100ContinueHandler.isExpected()) {
// Send the HTTP request. Expect:100-continue processing is not applicable
// in this case.
new Expect100ContinueConnectorExtension().invoke(jerseyRequest, nettyRequest);

boolean continueExpected = HttpUtil.is100ContinueExpected(nettyRequest);
boolean expectationsFailed = false;

if (continueExpected) {
final CountDownLatch expect100ContinueLatch = new CountDownLatch(1);
expect100ContinueHandler.attachCountDownLatch(expect100ContinueLatch);
//send expect request, sync and wait till either response or timeout received
entityWriter.writeAndFlush(nettyRequest);
expect100ContinueLatch.await(expect100ContinueTimeout, TimeUnit.MILLISECONDS);
try {
expect100ContinueHandler.processExpectationStatus();
} catch (TimeoutException e) {
//Expect:100-continue allows timeouts by the spec
//so, send request directly without Expect header.
expectationsFailed = true;
} finally {
//restore request and handler to the original state.
HttpUtil.set100ContinueExpected(nettyRequest, false);
expect100ContinueHandler.resetHandler();
}
}

if (!continueExpected || expectationsFailed) {
if (expectationsFailed) {
ch.pipeline().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).sync();
}
entityWriter.writeAndFlush(nettyRequest);
}
if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2025 Oracle and/or its affiliates. All rights reserved.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -23,3 +23,5 @@ redirect.error.determining.location="Error determining redirect location: ({0}).
redirect.infinite.loop="Infinite loop in chained redirects detected."
redirect.limit.reached="Max chained redirect limit ({0}) exceeded."
unexpected.value.for.expect.100.continue.statuses=Unexpected value: ("{0}").
expect.100.continue.failed.request.should.be.resent=Expect 100-continue failed. Request should be resent.
expect.100.continue.failed.request.failed=Expect 100-continue failed. Request failed.
Loading

0 comments on commit cbe8e84

Please sign in to comment.