Skip to content

Commit

Permalink
Removed test for send(Flux).
Browse files Browse the repository at this point in the history
Adapted maxSwallowSize, and PAYLOAD size.
Do not configure Tomcat socket SO_RCVBUF size anymore.
Reduce HttpClient SNDBUF.
400 Bad Request is not used chunked encoding.

testing
  • Loading branch information
pderop committed Sep 21, 2023
1 parent 4ffdb9d commit f611bdd
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 48 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/check_transport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,5 @@ jobs:
distribution: 'temurin'
java-version: '8'
- name: Build with Gradle
run: ./gradlew clean check --no-daemon -x spotlessCheck -PforceTransport=${{ matrix.transport }}
#run: ./gradlew clean check --no-daemon -x spotlessCheck -PforceTransport=${{ matrix.transport }}
run: ./gradlew clean reactor-netty-http:test --tests HttpClientWithTomcatTest.testIssue2825 -i --no-daemon -x spotlessCheck -PforceTransport=${{ matrix.transport }}
11 changes: 5 additions & 6 deletions reactor-netty-http/src/test/java/reactor/netty/TomcatServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
public class TomcatServer {
static final String TOMCAT_BASE_DIR = "./build/tomcat";
public static final String TOO_LARGE = "Request payload too large";
public static final int PAYLOAD_MAX = 5000000;
public static final String SO_RCVBUF = "socket.rxBufSize"; // The socket receive buffer (SO_RCVBUF) size in bytes. JVM default used if not set
public static final int PAYLOAD_MAX = 4096;

final Tomcat tomcat;

Expand All @@ -58,13 +57,13 @@ public TomcatServer(int port) {
this.tomcat.setBaseDir(baseDir.getAbsolutePath());
}

public void setProtocolHandlerProperty(String name, String value) {
public void setMaxSwallowSize(int bytes) {
ProtocolHandler protoHandler = tomcat.getConnector().getProtocolHandler();
if (!(protoHandler instanceof AbstractProtocol<?>)) {
throw new IllegalStateException("Connection protocol handler is not an instance of AbstractProtocol: " + protoHandler.getClass().getName());
}
AbstractHttp11Protocol<?> protocol = (AbstractHttp11Protocol<?>) protoHandler;
protocol.setProperty(name, value);
protocol.setMaxSwallowSize(bytes);
}

public int port() {
Expand Down Expand Up @@ -187,7 +186,7 @@ static final class PayloadSizeServlet extends HttpServlet {
protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException {
InputStream in = req.getInputStream();
int count = 0;
byte[] buf = new byte[4096];
byte[] buf = new byte[32];
int n;

while ((n = in.read(buf, 0, buf.length)) != -1) {
Expand All @@ -207,7 +206,7 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws

private void sendResponse(HttpServletResponse resp, String message, int status) throws IOException {
resp.setStatus(status);
resp.setHeader("Transfer-Encoding", "chunked");
resp.setHeader("Content-Length", String.valueOf(message.length()));
resp.setHeader("Content-Type", "text/plain");
PrintWriter out = resp.getWriter();
out.print(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package reactor.netty.http.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
Expand All @@ -29,12 +29,7 @@
import io.netty.handler.codec.http.multipart.HttpData;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
Expand All @@ -43,7 +38,6 @@
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

import java.io.InputStream;
import java.lang.reflect.Field;
Expand All @@ -61,8 +55,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static reactor.netty.http.client.HttpClientOperations.SendForm.DEFAULT_FACTORY;
Expand All @@ -72,8 +64,8 @@
*/
class HttpClientWithTomcatTest {
private static TomcatServer tomcat;
private static final String TOMCAT_RCVBUF_SIZE = String.valueOf(32 * 1024);
private static final byte[] PAYLOAD = String.join("", Collections.nCopies(TomcatServer.PAYLOAD_MAX + (10 * 1024 * 1024), "X"))
private static final int MAX_SWALLOW_SIZE = 1024 * 1024;
private static final byte[] PAYLOAD = String.join("", Collections.nCopies(TomcatServer.PAYLOAD_MAX + MAX_SWALLOW_SIZE + (1024 * 1024), "X"))
.getBytes(Charset.defaultCharset());

@BeforeAll
Expand Down Expand Up @@ -335,44 +327,30 @@ void contentHeader() {
fixed.dispose();
}

static Stream<Arguments> testIssue2825Args() {
Supplier<Publisher<ByteBuf>> postMono = () -> Mono.just(Unpooled.wrappedBuffer(PAYLOAD));
Supplier<Publisher<ByteBuf>> postFlux = () -> Flux.just(Unpooled.wrappedBuffer(PAYLOAD));

return Stream.of(
Arguments.of(Named.of("postMono", postMono), Named.of("bytes", PAYLOAD.length)),
Arguments.of(Named.of("postFlux", postFlux), Named.of("bytes", PAYLOAD.length))
);
}

@ParameterizedTest
@MethodSource("testIssue2825Args")
void testIssue2825(Supplier<Publisher<ByteBuf>> payload, long bytesToSend) {
// manage to get the server reads slow, to avoid having the server closing the connection while the client is writing.
// Even if we cannot totally avoid that situation, reducing the Tomcat server socket receive buffer will reduce the
// probability to have the server closing the connection while the client is writing, hence will reduce the probability
// to get a "Connection has been closed BEFORE response, while sending request body".
tomcat.setProtocolHandlerProperty(TomcatServer.SO_RCVBUF, TOMCAT_RCVBUF_SIZE);
@Test
void testIssue2825() {
tomcat.setMaxSwallowSize(MAX_SWALLOW_SIZE);

AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
HttpClient client = HttpClient.create()
.port(getPort())
.wiretap(false)
.metrics(true, ClientMetricsRecorder::reset)
.doOnConnected(conn -> serverAddress.set(conn.address()));

StepVerifier.create(Flux.defer(() -> client
.headers(hdr -> hdr.set("Content-Type", "text/plain"))
.post()
.uri("/payload-size")
.send(payload.get())
.response((r, buf) -> buf.aggregate().asString().zipWith(Mono.just(r))))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3))
.filter(throwable -> throwable instanceof PrematureCloseException)))
.doOnConnected(conn -> {
conn.channel().config().setOption(ChannelOption.SO_SNDBUF, 4 * 1024);
serverAddress.set(conn.address());
});

StepVerifier.create(client
.headers(hdr -> hdr.set("Content-Type", "text/plain"))
.post()
.uri("/payload-size")
.send(Mono.just(Unpooled.wrappedBuffer(PAYLOAD)))
.response((r, buf) -> buf.aggregate().asString().zipWith(Mono.just(r))))
.expectNextMatches(tuple -> TomcatServer.TOO_LARGE.equals(tuple.getT1())
&& tuple.getT2().status().equals(HttpResponseStatus.BAD_REQUEST))
.expectComplete()
.verify(Duration.ofMinutes(1));
.verify(Duration.ofSeconds(30));

assertThat(ClientMetricsRecorder.INSTANCE.recordDataSentTimeMethod).isEqualTo("POST");
assertThat(ClientMetricsRecorder.INSTANCE.recordDataSentTimeTime).isNotNull();
Expand All @@ -382,7 +360,7 @@ void testIssue2825(Supplier<Publisher<ByteBuf>> payload, long bytesToSend) {

assertThat(ClientMetricsRecorder.INSTANCE.recordDataSentRemoteAddr).isEqualTo(serverAddress.get());
assertThat(ClientMetricsRecorder.INSTANCE.recordDataSentUri).isEqualTo("/payload-size");
assertThat(ClientMetricsRecorder.INSTANCE.recordDataSentBytes).isEqualTo(bytesToSend);
assertThat(ClientMetricsRecorder.INSTANCE.recordDataSentBytes).isEqualTo(PAYLOAD.length);
}

private int getPort() {
Expand Down

0 comments on commit f611bdd

Please sign in to comment.