Skip to content

Commit

Permalink
Merge pull request #18470 from stuartwdouglas/18445
Browse files Browse the repository at this point in the history
Don't close the connection when returning Multi
  • Loading branch information
gsmet authored Jul 12, 2021
2 parents 24ab3a3 + 26a2506 commit 10ba8ac
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.quarkus.resteasy.reactive.server.test.stream;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,6 +30,12 @@
import io.restassured.RestAssured;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.Cancellable;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;

@DisabledOnOs(OS.WINDOWS)
public class StreamTestCase {
Expand All @@ -40,6 +48,56 @@ public class StreamTestCase {
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(StreamResource.class));

@Test
public void testStreamingDoesNotCloseConnection() throws Exception {
Vertx v = Vertx.vertx();
try {
final CompletableFuture<Object> latch = new CompletableFuture<>();
HttpClient client = v
.createHttpClient(
new HttpClientOptions().setKeepAlive(true).setIdleTimeout(10).setIdleTimeoutUnit(TimeUnit.SECONDS));
sendRequest(latch, client, () -> sendRequest(latch, client, () -> latch.complete(null)));

//should not have been closed
latch.get();

} finally {
v.close().toCompletionStage().toCompletableFuture().get();
}
}

private void sendRequest(CompletableFuture<Object> latch, HttpClient client, Runnable runnable) {
Handler<Throwable> failure = latch::completeExceptionally;
client.request(HttpMethod.GET, RestAssured.port, "localhost", "/stream/text/stream")
.onFailure(failure)
.onSuccess(new Handler<HttpClientRequest>() {
@Override
public void handle(HttpClientRequest event) {
event.end();
event.connect().onFailure(failure)
.onSuccess(response -> {
response.request().connection().closeHandler(new Handler<Void>() {
@Override
public void handle(Void event) {
latch.completeExceptionally(new Throwable("Connection was closed"));
}
});
response.body().onFailure(failure)
.onSuccess(buffer -> {
try {
Assertions.assertEquals("foobar",
buffer.toString(StandardCharsets.US_ASCII));
} catch (Throwable t) {
latch.completeExceptionally(t);
}
runnable.run();
});
});

}
});
}

@Test
public void testStreaming() throws Exception {
RestAssured.get("/stream/text/stream")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ public void onComplete() {
// no need to cancel on complete
// FIXME: are we interested in async completion?
requestContext.serverResponse().end();
// so, if I don't also close the connection, the client isn't notified that the request is over
// I guess that's true of chunked responses, but not clear why I need to close the connection
// because it means it can't get reused, right?
requestContext.serverRequest().closeConnection();
requestContext.close();
}

Expand Down

0 comments on commit 10ba8ac

Please sign in to comment.