Skip to content

Commit

Permalink
Merge pull request #14119 from FroMage/13988
Browse files Browse the repository at this point in the history
RESTEasy Reactive: implement cancel for Multi on both client and server
  • Loading branch information
geoand authored Jan 8, 2021
2 parents 5fda4b3 + e9700e6 commit 771d0b7
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public void close() {
}
}

@Override
public ServerHttpResponse addCloseHandler(Runnable onClose) {
context.response().closeHandler(v -> onClose.run());
return this;
}

@Override
public ServerHttpRequest serverRequest() {
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package io.quarkus.resteasy.reactive.server.test.stream;

import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
Expand All @@ -9,6 +14,7 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.buffer.Buffer;

@Path("stream")
Expand Down Expand Up @@ -76,4 +82,59 @@ public static Uni<Buffer> concatenateBuffers(Multi<Buffer> multi) {
return multi.collectItems().in(() -> Buffer.buffer(INITIAL_BUFFER_SIZE),
(accumulatingBuffer, receivedBuffer) -> accumulatingBuffer.appendBuffer(receivedBuffer));
}

private boolean receivedCancel = false;

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("infinite/stream")
public Multi<String> infiniteStream() {
receivedCancel = false;
return Multi.createFrom().emitter(emitter -> {
ScheduledExecutorService scheduler = Infrastructure.getDefaultWorkerPool();
// this should never complete, but let's kill it after 30 seconds
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
String str = "Called at " + new Date();
emitter.emit(str);
}, 0, 1, TimeUnit.SECONDS);

// catch client close
emitter.onTermination(() -> {
if (emitter.isCancelled()) {
receivedCancel = true;
if (!future.isCancelled())
future.cancel(true);
}
});

// die in 30s max
scheduler.schedule(() -> {
if (!future.isCancelled()) {
future.cancel(true);
// just in case
emitter.complete();
}
}, 30, TimeUnit.SECONDS);
});
}

@GET
@Path("infinite/stream-was-cancelled")
public String infiniteStreamWasCancelled() {
return receivedCancel ? "OK" : "KO";
}

@Path("sse")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> sse() {
return Multi.createFrom().items("a", "b", "c");
}

@Path("sse/throw")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> sseThrows() {
throw new IllegalStateException("STOP");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.SseEventSource;

import org.hamcrest.Matchers;
import org.jboss.resteasy.reactive.client.impl.MultiInvoker;
Expand All @@ -20,6 +25,7 @@
import io.quarkus.test.common.http.TestHTTPResource;
import io.restassured.RestAssured;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.Cancellable;

public class StreamTestCase {

Expand Down Expand Up @@ -80,4 +86,88 @@ public void testClientStreaming() throws Exception {
Assertions.assertEquals("foo", list.get(0));
Assertions.assertEquals("bar", list.get(1));
}

@Test
public void testInfiniteStreamClosedByClientImmediately() throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "stream/infinite/stream");
Multi<String> multi = target.request().rx(MultiInvoker.class).get(String.class);
Cancellable cancellable = multi.subscribe().with(item -> {
System.err.println("Received " + item);
});
// immediately cancel
cancellable.cancel();
// give it some time and check
Thread.sleep(2000);

WebTarget checkTarget = client.target(uri.toString() + "stream/infinite/stream-was-cancelled");
String check = checkTarget.request().get(String.class);
Assertions.assertEquals("OK", check);
}

@Test
public void testInfiniteStreamClosedByClientAfterRegistration() throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "stream/infinite/stream");
Multi<String> multi = target.request().rx(MultiInvoker.class).get(String.class);
// cancel after two items
CountDownLatch latch = new CountDownLatch(2);
Cancellable cancellable = multi.subscribe().with(item -> {
System.err.println("Received " + item);
latch.countDown();
});
Assertions.assertTrue(latch.await(30, TimeUnit.SECONDS));
// now cancel
cancellable.cancel();
// give it some time and check
Thread.sleep(2000);

WebTarget checkTarget = client.target(uri.toString() + "stream/infinite/stream-was-cancelled");
String check = checkTarget.request().get(String.class);
Assertions.assertEquals("OK", check);
}

@Test
public void testSse() throws InterruptedException {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "stream/sse");
try (SseEventSource sse = SseEventSource.target(target).build()) {
CountDownLatch latch = new CountDownLatch(1);
List<Throwable> errors = new ArrayList<>();
List<String> results = new ArrayList<>();
sse.register(event -> {
results.add(event.readData());
}, error -> {
errors.add(error);
}, () -> {
latch.countDown();
});
sse.open();
Assertions.assertTrue(latch.await(20, TimeUnit.SECONDS));
Assertions.assertEquals(Arrays.asList("a", "b", "c"), results);
Assertions.assertEquals(0, errors.size());
}
}

@Test
public void testSseThrows() throws InterruptedException {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "stream/sse/throws");
try (SseEventSource sse = SseEventSource.target(target).build()) {
CountDownLatch latch = new CountDownLatch(1);
List<Throwable> errors = new ArrayList<>();
List<String> results = new ArrayList<>();
sse.register(event -> {
results.add(event.readData());
}, error -> {
errors.add(error);
}, () -> {
latch.countDown();
});
sse.open();
Assertions.assertTrue(latch.await(20, TimeUnit.SECONDS));
Assertions.assertEquals(0, results.size());
Assertions.assertEquals(1, errors.size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import io.vertx.core.net.impl.ConnectionBase;
import java.io.ByteArrayInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.jboss.resteasy.reactive.client.impl.MultiInvoker.MultiRequest;

public class MultiInvoker extends AbstractRxInvoker<Multi<?>> {

Expand All @@ -34,34 +36,87 @@ public <R> Multi<R> get(GenericType<R> responseType) {
return (Multi<R>) super.get(responseType);
}

/**
* We need this class to work around a bug in Mutiny where we can register our cancel listener
* after the subscription is cancelled and we never get notified
* See https://github.com/smallrye/smallrye-mutiny/issues/417
*/
static class MultiRequest<R> {

private final AtomicReference<Runnable> onCancel = new AtomicReference<>();

private MultiEmitter<? super R> emitter;

private static final Runnable CLEARED = () -> {
};

public MultiRequest(MultiEmitter<? super R> emitter) {
this.emitter = emitter;
emitter.onTermination(() -> {
if (emitter.isCancelled()) {
this.cancel();
}
});
}

public boolean isCancelled() {
return onCancel.get() == CLEARED;
}

private void cancel() {
Runnable action = onCancel.getAndSet(CLEARED);
if (action != null && action != CLEARED) {
action.run();
}
}

public void onCancel(Runnable onCancel) {
if (this.onCancel.compareAndSet(null, onCancel)) {
// this was a first set
} else if (this.onCancel.get() == CLEARED) {
// already cleared
if (onCancel != null)
onCancel.run();
} else {
// it was already set
throw new IllegalArgumentException("onCancel was already called");
}
}
}

@Override
public <R> Multi<R> method(String name, Entity<?> entity, GenericType<R> responseType) {
AsyncInvokerImpl invoker = (AsyncInvokerImpl) target.request().rx();
// FIXME: backpressure setting?
return Multi.createFrom().emitter(emitter -> {
MultiRequest<R> multiRequest = new MultiRequest<>(emitter);
RestClientRequestContext restClientRequestContext = invoker.performRequestInternal(name, entity, responseType,
false);
restClientRequestContext.getResult().handle((response, connectionError) -> {
if (connectionError != null) {
emitter.fail(connectionError);
} else {
HttpClientResponse vertxResponse = restClientRequestContext.getVertxClientResponse();
// FIXME: this is probably not good enough
if (response.getStatus() == 200
&& MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) {
registerForSse(emitter, responseType, response, vertxResponse);
if (!emitter.isCancelled()) {
// FIXME: this is probably not good enough
if (response.getStatus() == 200
&& MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) {
registerForSse(multiRequest, responseType, response, vertxResponse);
} else {
// read stuff in chunks
registerForChunks(multiRequest, restClientRequestContext, responseType, response, vertxResponse);
}
vertxResponse.resume();
} else {
// read stuff in chunks
registerForChunks(emitter, restClientRequestContext, responseType, response, vertxResponse);
vertxResponse.request().connection().close();
}
vertxResponse.resume();
}
return null;
});
});
}

private <R> void registerForSse(MultiEmitter<? super R> emitter,
private <R> void registerForSse(MultiRequest<? super R> multiRequest,
GenericType<R> responseType,
Response response,
HttpClientResponse vertxResponse) {
Expand All @@ -73,16 +128,20 @@ private <R> void registerForSse(MultiEmitter<? super R> emitter,
sseSource.register(event -> {
// DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or
// the content-type SSE field
emitter.emit((R) event.readData(responseType));
multiRequest.emitter.emit((R) event.readData(responseType));
}, error -> {
emitter.fail(error);
multiRequest.emitter.fail(error);
}, () -> {
emitter.complete();
multiRequest.emitter.complete();
});
// watch for user cancelling
multiRequest.onCancel(() -> {
sseSource.close();
});
sseSource.registerAfterRequest(vertxResponse);
}

private <R> void registerForChunks(MultiEmitter<? super R> emitter,
private <R> void registerForChunks(MultiRequest<? super R> multiRequest,
RestClientRequestContext restClientRequestContext,
GenericType<R> responseType,
Response response,
Expand All @@ -93,13 +152,13 @@ private <R> void registerForChunks(MultiEmitter<? super R> emitter,
if (t == ConnectionBase.CLOSED_EXCEPTION) {
// we can ignore this one since we registered a closeHandler
} else {
emitter.fail(t);
multiRequest.emitter.fail(t);
}
});
HttpConnection connection = vertxClientResponse.request().connection();
// this captures the server closing
connection.closeHandler(v -> {
emitter.complete();
multiRequest.emitter.complete();
});
vertxClientResponse.handler(new Handler<Buffer>() {
@Override
Expand All @@ -108,18 +167,22 @@ public void handle(Buffer buffer) {
ByteArrayInputStream in = new ByteArrayInputStream(buffer.getBytes());
R item = restClientRequestContext.readEntity(in, responseType, response.getMediaType(),
response.getMetadata());
emitter.emit(item);
multiRequest.emitter.emit(item);
} catch (Throwable t) {
// FIXME: probably close the client too? watch out that it doesn't call our close handler
// which calls emitter.complete()
emitter.fail(t);
multiRequest.emitter.fail(t);
}
}
});
// this captures the end of the response
// FIXME: won't this call complete twice()?
vertxClientResponse.endHandler(v -> {
emitter.complete();
multiRequest.emitter.complete();
});
// watch for user cancelling
multiRequest.onCancel(() -> {
vertxClientResponse.request().connection().close();
});
}

Expand Down
Loading

0 comments on commit 771d0b7

Please sign in to comment.