From 89c489679182d3c8887523611ffe1eb6368b58e4 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 17 Dec 2024 03:21:56 -0500 Subject: [PATCH] HttpServerRequest::receiveContent() never emits any value nor completes when HTTP/1.1 TLS Upgrade (RFC-2817) kicks in (#3540) Signed-off-by: Andriy Redko --- .../http/server/HttpServerOperations.java | 7 + .../netty/http/client/HttpClientTest.java | 129 ++++++++++++++++++ 2 files changed, 136 insertions(+) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java index 790e4083db..a420862efa 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java @@ -820,6 +820,13 @@ else if (msg instanceof HttpRequest) { channel().config().setAutoRead(true); onInboundComplete(); } + else if (request.headers().contains(HttpHeaderNames.UPGRADE)) { + // HTTP/1.1 TLS Upgrade (RFC-2817) requests (GET/HEAD/OPTIONS) with empty / non-empty payload + stopReadTimeout(); + //force auto read to enable more accurate close selection now inbound is done + channel().config().setAutoRead(true); + onInboundComplete(); + } } } else if (msg instanceof LastHttpContent) { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 21c3e1b693..e48abac5d4 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -72,7 +72,9 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; @@ -82,6 +84,7 @@ import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; @@ -101,7 +104,10 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.netty.BaseHttpTest; @@ -1662,6 +1668,104 @@ void testIssue632() throws Exception { assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); } + @Test + void testIssue3538() throws Exception { + disposableServer = + createServer() + .protocol(HttpProtocol.H2C, HttpProtocol.HTTP11) + .route(r -> r.get("/", (req, res) -> { + final EchoAction action = new EchoAction(); + + req + .receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(action); + + return res.sendObject(action); + } + )) + .bindNow(); + assertThat(disposableServer).isNotNull(); + + final ByteBuf content = createHttpClientForContextWithPort() + .protocol(HttpProtocol.HTTP11) + .headers(h -> + h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE) + .add(HttpHeaderNames.UPGRADE, "TLS/1.2")) + .get() + .uri("/") + .responseContent() + .blockLast(Duration.ofSeconds(30)); + + assertThat(content).isNull(); + } + + @Test + void testIssue3538GetWithPayload() throws Exception { + disposableServer = + createServer() + .protocol(HttpProtocol.H2C, HttpProtocol.HTTP11) + .route(r -> r.get("/", (req, res) -> { + final EchoAction action = new EchoAction(); + + req + .receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(action); + + return res.sendObject(action); + } + )) + .bindNow(); + assertThat(disposableServer).isNotNull(); + + // The H2C max content length is 0 by default (no content is expected), + // so the request is rejected with HTTP/413 Content Too Large + StepVerifier.create(createHttpClientForContextWithPort() + .protocol(HttpProtocol.HTTP11) + .headers(h -> + h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE) + .add(HttpHeaderNames.UPGRADE, "TLS/1.2")) + .request(HttpMethod.GET) + .send((req, res) -> res.sendString(Mono.just("testIssue3538"))) + .uri("/") + .response((r, buf) -> Mono.just(r.status().code()))) + .expectNextMatches(status -> status == 413) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + + @Test + void testIssue3538GetWithPayloadAndH2cMaxContentLength() throws Exception { + disposableServer = + createServer() + .protocol(HttpProtocol.H2C, HttpProtocol.HTTP11) + .httpRequestDecoder(spec -> spec.h2cMaxContentLength(100)) + .route(r -> r.get("/", (req, res) -> { + final EchoAction action = new EchoAction(); + + req + .receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(action); + + return res.sendObject(action); + } + )) + .bindNow(); + assertThat(disposableServer).isNotNull(); + + final ByteBuf content = createHttpClientForContextWithPort() + .protocol(HttpProtocol.HTTP11) + .headers(h -> + h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE) + .add(HttpHeaderNames.UPGRADE, "TLS/1.2")) + .request(HttpMethod.GET) + .send((req, res) -> res.sendString(Mono.just("testIssue3538"))) + .uri("/") + .responseContent() + .blockLast(Duration.ofSeconds(30)); + + assertThat(content).isNotNull(); + } + @Test void testIssue694() { disposableServer = @@ -3513,4 +3617,29 @@ void testDeleteMethod(boolean chunked) { .expectComplete() .verify(Duration.ofSeconds(5)); } + + private static class EchoAction implements Publisher, Consumer { + private final Publisher sender; + private volatile FluxSink emitter; + + EchoAction() { + this.sender = Flux.create(emitter -> this.emitter = emitter); + } + + @Override + public void accept(HttpContent message) { + if (message.content().readableBytes() > 0) { + emitter.next(new DefaultHttpContent(message.content().retain())); + } + + if (message instanceof LastHttpContent) { + emitter.complete(); + } + } + + @Override + public void subscribe(Subscriber s) { + sender.subscribe(s); + } + } }