Skip to content

Commit

Permalink
HttpServerRequest::receiveContent() never emits any value nor complet…
Browse files Browse the repository at this point in the history
…es when HTTP/1.1 TLS Upgrade (RFC-2817) kicks in (#3540)

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Dec 17, 2024
1 parent b35d005 commit 89c4896
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,13 @@ else if (msg instanceof HttpRequest) {
channel().config().setAutoRead(true);
onInboundComplete();
}
else if (request.headers().contains(HttpHeaderNames.UPGRADE)) {
// HTTP/1.1 TLS Upgrade (RFC-2817) requests (GET/HEAD/OPTIONS) with empty / non-empty payload
stopReadTimeout();
//force auto read to enable more accurate close selection now inbound is done
channel().config().setAutoRead(true);
onInboundComplete();
}
}
}
else if (msg instanceof LastHttpContent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
Expand All @@ -82,6 +84,7 @@
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
Expand All @@ -101,7 +104,10 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.BaseHttpTest;
Expand Down Expand Up @@ -1662,6 +1668,104 @@ void testIssue632() throws Exception {
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
}

@Test
void testIssue3538() throws Exception {
disposableServer =
createServer()
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();

req
.receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);

return res.sendObject(action);
}
))
.bindNow();
assertThat(disposableServer).isNotNull();

final ByteBuf content = createHttpClientForContextWithPort()
.protocol(HttpProtocol.HTTP11)
.headers(h ->
h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)
.add(HttpHeaderNames.UPGRADE, "TLS/1.2"))
.get()
.uri("/")
.responseContent()
.blockLast(Duration.ofSeconds(30));

assertThat(content).isNull();
}

@Test
void testIssue3538GetWithPayload() throws Exception {
disposableServer =
createServer()
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();

req
.receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);

return res.sendObject(action);
}
))
.bindNow();
assertThat(disposableServer).isNotNull();

// The H2C max content length is 0 by default (no content is expected),
// so the request is rejected with HTTP/413 Content Too Large
StepVerifier.create(createHttpClientForContextWithPort()
.protocol(HttpProtocol.HTTP11)
.headers(h ->
h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)
.add(HttpHeaderNames.UPGRADE, "TLS/1.2"))
.request(HttpMethod.GET)
.send((req, res) -> res.sendString(Mono.just("testIssue3538")))
.uri("/")
.response((r, buf) -> Mono.just(r.status().code())))
.expectNextMatches(status -> status == 413)
.expectComplete()
.verify(Duration.ofSeconds(30));
}

@Test
void testIssue3538GetWithPayloadAndH2cMaxContentLength() throws Exception {
disposableServer =
createServer()
.protocol(HttpProtocol.H2C, HttpProtocol.HTTP11)
.httpRequestDecoder(spec -> spec.h2cMaxContentLength(100))
.route(r -> r.get("/", (req, res) -> {
final EchoAction action = new EchoAction();

req
.receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(action);

return res.sendObject(action);
}
))
.bindNow();
assertThat(disposableServer).isNotNull();

final ByteBuf content = createHttpClientForContextWithPort()
.protocol(HttpProtocol.HTTP11)
.headers(h ->
h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)
.add(HttpHeaderNames.UPGRADE, "TLS/1.2"))
.request(HttpMethod.GET)
.send((req, res) -> res.sendString(Mono.just("testIssue3538")))
.uri("/")
.responseContent()
.blockLast(Duration.ofSeconds(30));

assertThat(content).isNotNull();
}

@Test
void testIssue694() {
disposableServer =
Expand Down Expand Up @@ -3513,4 +3617,29 @@ void testDeleteMethod(boolean chunked) {
.expectComplete()
.verify(Duration.ofSeconds(5));
}

private static class EchoAction implements Publisher<HttpContent>, Consumer<HttpContent> {
private final Publisher<HttpContent> sender;
private volatile FluxSink<HttpContent> emitter;

EchoAction() {
this.sender = Flux.create(emitter -> this.emitter = emitter);
}

@Override
public void accept(HttpContent message) {
if (message.content().readableBytes() > 0) {
emitter.next(new DefaultHttpContent(message.content().retain()));
}

if (message instanceof LastHttpContent) {
emitter.complete();
}
}

@Override
public void subscribe(Subscriber<? super HttpContent> s) {
sender.subscribe(s);
}
}
}

0 comments on commit 89c4896

Please sign in to comment.