From c37330647fefd5993f568ec5a48de84707bfbc8e Mon Sep 17 00:00:00 2001 From: Stephane Epardaud Date: Tue, 5 Jan 2021 16:59:30 +0100 Subject: [PATCH] RESTEasy Reactive: implement cancel for Multi on both client and server Fixes #13988 --- .../runtime/ServletRequestContext.java | 6 ++ .../server/test/stream/StreamResource.java | 47 +++++++++ .../server/test/stream/StreamTestCase.java | 43 ++++++++ .../reactive/client/impl/MultiInvoker.java | 97 +++++++++++++++---- .../server/handlers/MultiResponseHandler.java | 8 ++ .../server/spi/ServerHttpResponse.java | 2 + .../VertxResteasyReactiveRequestContext.java | 8 ++ 7 files changed, 194 insertions(+), 17 deletions(-) diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java index babee7adfb7b7..37a67aa9a142c 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java @@ -88,6 +88,12 @@ public void close() { } } + @Override + public ServerHttpResponse addCloseHandler(Runnable onClose) { + context.response().closeHandler(v -> onClose.run()); + return this; + } + @Override public ServerHttpRequest serverRequest() { return this; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java index 9c0143ca8b750..79f6a6494f06b 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java @@ -1,5 +1,10 @@ package io.quarkus.resteasy.reactive.server.test.stream; +import java.util.Date; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -9,6 +14,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.buffer.Buffer; @Path("stream") @@ -76,4 +82,45 @@ public static Uni concatenateBuffers(Multi multi) { return multi.collectItems().in(() -> Buffer.buffer(INITIAL_BUFFER_SIZE), (accumulatingBuffer, receivedBuffer) -> accumulatingBuffer.appendBuffer(receivedBuffer)); } + + private boolean receivedCancel = false; + + @GET + @Produces(MediaType.TEXT_PLAIN) + @Path("infinite/stream") + public Multi infiniteStream() { + receivedCancel = false; + return Multi.createFrom().emitter(emitter -> { + ScheduledExecutorService scheduler = Infrastructure.getDefaultWorkerPool(); + // this should never complete, but let's kill it after 30 seconds + ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { + String str = "Called at " + new Date(); + emitter.emit(str); + }, 0, 1, TimeUnit.SECONDS); + + // catch client close + emitter.onTermination(() -> { + if (emitter.isCancelled()) { + receivedCancel = true; + if (!future.isCancelled()) + future.cancel(true); + } + }); + + // die in 30s max + scheduler.schedule(() -> { + if (!future.isCancelled()) { + future.cancel(true); + // just in case + emitter.complete(); + } + }, 30, TimeUnit.SECONDS); + }); + } + + @GET + @Path("infinite/stream-was-cancelled") + public String infiniteStreamWasCancelled() { + return receivedCancel ? "OK" : "KO"; + } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java index 6b69dc5cd473a..35819889e0114 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java @@ -3,6 +3,8 @@ import java.net.URI; import java.time.Duration; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; @@ -20,6 +22,7 @@ import io.quarkus.test.common.http.TestHTTPResource; import io.restassured.RestAssured; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.Cancellable; public class StreamTestCase { @@ -80,4 +83,44 @@ public void testClientStreaming() throws Exception { Assertions.assertEquals("foo", list.get(0)); Assertions.assertEquals("bar", list.get(1)); } + + @Test + public void testInfiniteStreamClosedByClientImmediately() throws Exception { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(uri.toString() + "stream/infinite/stream"); + Multi multi = target.request().rx(MultiInvoker.class).get(String.class); + Cancellable cancellable = multi.subscribe().with(item -> { + System.err.println("Received " + item); + }); + // immediately cancel + cancellable.cancel(); + // give it some time and check + Thread.sleep(2000); + + WebTarget checkTarget = client.target(uri.toString() + "stream/infinite/stream-was-cancelled"); + String check = checkTarget.request().get(String.class); + Assertions.assertEquals("OK", check); + } + + @Test + public void testInfiniteStreamClosedByClientAfterRegistration() throws Exception { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(uri.toString() + "stream/infinite/stream"); + Multi multi = target.request().rx(MultiInvoker.class).get(String.class); + // cancel after two items + CountDownLatch latch = new CountDownLatch(2); + Cancellable cancellable = multi.subscribe().with(item -> { + System.err.println("Received " + item); + latch.countDown(); + }); + Assertions.assertTrue(latch.await(30, TimeUnit.SECONDS)); + // now cancel + cancellable.cancel(); + // give it some time and check + Thread.sleep(2000); + + WebTarget checkTarget = client.target(uri.toString() + "stream/infinite/stream-was-cancelled"); + String check = checkTarget.request().get(String.class); + Assertions.assertEquals("OK", check); + } } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index 6c0a31ba78490..192e11eeea04d 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -9,10 +9,12 @@ import io.vertx.core.net.impl.ConnectionBase; import java.io.ByteArrayInputStream; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.client.Entity; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.jboss.resteasy.reactive.client.impl.MultiInvoker.MultiRequest; public class MultiInvoker extends AbstractRxInvoker> { @@ -34,11 +36,60 @@ public Multi get(GenericType responseType) { return (Multi) super.get(responseType); } + /** + * We need this class to work around a bug in Mutiny where we can register our cancel listener + * after the subscription is cancelled and we never get notified + * See https://github.com/smallrye/smallrye-mutiny/issues/417 + */ + static class MultiRequest { + + private final AtomicReference onCancel = new AtomicReference<>(); + + private MultiEmitter emitter; + + private static final Runnable CLEARED = () -> { + }; + + public MultiRequest(MultiEmitter emitter) { + this.emitter = emitter; + emitter.onTermination(() -> { + if (emitter.isCancelled()) { + this.cancel(); + } + }); + } + + public boolean isCancelled() { + return onCancel.get() == CLEARED; + } + + private void cancel() { + Runnable action = onCancel.getAndSet(CLEARED); + if (action != null && action != CLEARED) { + action.run(); + } + } + + public void onCancel(Runnable onCancel) { + if (this.onCancel.compareAndSet(null, onCancel)) { + // this was a first set + } else if (this.onCancel.get() == CLEARED) { + // already cleared + if (onCancel != null) + onCancel.run(); + } else { + // it was already set + throw new IllegalArgumentException("onCancel was already called"); + } + } + } + @Override public Multi method(String name, Entity entity, GenericType responseType) { AsyncInvokerImpl invoker = (AsyncInvokerImpl) target.request().rx(); // FIXME: backpressure setting? return Multi.createFrom().emitter(emitter -> { + MultiRequest multiRequest = new MultiRequest<>(emitter); RestClientRequestContext restClientRequestContext = invoker.performRequestInternal(name, entity, responseType, false); restClientRequestContext.getResult().handle((response, connectionError) -> { @@ -46,22 +97,26 @@ public Multi method(String name, Entity entity, GenericType respons emitter.fail(connectionError); } else { HttpClientResponse vertxResponse = restClientRequestContext.getVertxClientResponse(); - // FIXME: this is probably not good enough - if (response.getStatus() == 200 - && MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) { - registerForSse(emitter, responseType, response, vertxResponse); + if (!emitter.isCancelled()) { + // FIXME: this is probably not good enough + if (response.getStatus() == 200 + && MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) { + registerForSse(multiRequest, responseType, response, vertxResponse); + } else { + // read stuff in chunks + registerForChunks(multiRequest, restClientRequestContext, responseType, response, vertxResponse); + } + vertxResponse.resume(); } else { - // read stuff in chunks - registerForChunks(emitter, restClientRequestContext, responseType, response, vertxResponse); + vertxResponse.request().connection().close(); } - vertxResponse.resume(); } return null; }); }); } - private void registerForSse(MultiEmitter emitter, + private void registerForSse(MultiRequest multiRequest, GenericType responseType, Response response, HttpClientResponse vertxResponse) { @@ -73,16 +128,20 @@ private void registerForSse(MultiEmitter emitter, sseSource.register(event -> { // DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or // the content-type SSE field - emitter.emit((R) event.readData(responseType)); + multiRequest.emitter.emit((R) event.readData(responseType)); }, error -> { - emitter.fail(error); + multiRequest.emitter.fail(error); }, () -> { - emitter.complete(); + multiRequest.emitter.complete(); + }); + // watch for user cancelling + multiRequest.onCancel(() -> { + sseSource.close(); }); sseSource.registerAfterRequest(vertxResponse); } - private void registerForChunks(MultiEmitter emitter, + private void registerForChunks(MultiRequest multiRequest, RestClientRequestContext restClientRequestContext, GenericType responseType, Response response, @@ -93,13 +152,13 @@ private void registerForChunks(MultiEmitter emitter, if (t == ConnectionBase.CLOSED_EXCEPTION) { // we can ignore this one since we registered a closeHandler } else { - emitter.fail(t); + multiRequest.emitter.fail(t); } }); HttpConnection connection = vertxClientResponse.request().connection(); // this captures the server closing connection.closeHandler(v -> { - emitter.complete(); + multiRequest.emitter.complete(); }); vertxClientResponse.handler(new Handler() { @Override @@ -108,18 +167,22 @@ public void handle(Buffer buffer) { ByteArrayInputStream in = new ByteArrayInputStream(buffer.getBytes()); R item = restClientRequestContext.readEntity(in, responseType, response.getMediaType(), response.getMetadata()); - emitter.emit(item); + multiRequest.emitter.emit(item); } catch (Throwable t) { // FIXME: probably close the client too? watch out that it doesn't call our close handler // which calls emitter.complete() - emitter.fail(t); + multiRequest.emitter.fail(t); } } }); // this captures the end of the response // FIXME: won't this call complete twice()? vertxClientResponse.endHandler(v -> { - emitter.complete(); + multiRequest.emitter.complete(); + }); + // watch for user cancelling + multiRequest.onCancel(() -> { + vertxClientResponse.request().connection().close(); }); } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/MultiResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/MultiResponseHandler.java index 2927393d6cde6..ea26849b7540d 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/MultiResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/MultiResponseHandler.java @@ -72,11 +72,17 @@ public Object apply(Object v, Throwable t) { static abstract class AbstractMultiSubscriber implements Subscriber { protected Subscription subscription; protected ResteasyReactiveRequestContext requestContext; + private boolean weClosed = false; AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext) { this.requestContext = requestContext; // let's make sure we never restart by accident, also make sure we're not marked as completed requestContext.restart(AWOL, true); + requestContext.serverResponse().addCloseHandler(() -> { + if (!weClosed && this.subscription != null) { + subscription.cancel(); + } + }); } @Override @@ -88,6 +94,8 @@ public void onSubscribe(Subscription s) { @Override public void onComplete() { + // make sure we don't trigger cancel with our onCloseHandler + weClosed = true; // no need to cancel on complete // FIXME: are we interested in async completion? requestContext.serverResponse().end(); diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpResponse.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpResponse.java index 39761308baf99..f65b8e21bd57c 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpResponse.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpResponse.java @@ -37,4 +37,6 @@ public interface ServerHttpResponse { OutputStream createResponseOutputStream(); void setPreCommitListener(Consumer task); + + ServerHttpResponse addCloseHandler(Runnable onClose); } diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java index fbcdc67fe7645..0ac7c1993fdca 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java @@ -53,6 +53,14 @@ public VertxResteasyReactiveRequestContext(Deployment deployment, ProvidersImpl } } + @Override + public ServerHttpResponse addCloseHandler(Runnable onClose) { + this.response.closeHandler(v -> { + onClose.run(); + }); + return this; + } + public RoutingContext getContext() { return context; }