diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index 0b17f299d2572..6ae2a5109697d 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -59,6 +59,26 @@ public MultiRequest(MultiEmitter emitter) { }); } + void emit(R item) { + if (!isCancelled()) { + emitter.emit(item); + } + } + + void fail(Throwable t) { + if (!isCancelled()) { + emitter.fail(t); + cancel(); + } + } + + void complete() { + if (!isCancelled()) { + emitter.complete(); + cancel(); + } + } + public boolean isCancelled() { return onCancel.get() == CLEARED; } @@ -130,20 +150,20 @@ private void registerForSse(MultiRequest multiRequest, // For now we don't want multi to reconnect SseEventSourceImpl sseSource = new SseEventSourceImpl(invocationBuilder.getTarget(), invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS); - // FIXME: deal with cancellation + + multiRequest.onCancel(() -> { + sseSource.close(); + }); sseSource.register(event -> { // DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or // the content-type SSE field - multiRequest.emitter.emit((R) event.readData(responseType)); + multiRequest.emit(event.readData(responseType)); }, error -> { - multiRequest.emitter.fail(error); + multiRequest.fail(error); }, () -> { - multiRequest.emitter.complete(); + multiRequest.complete(); }); // watch for user cancelling - multiRequest.onCancel(() -> { - sseSource.close(); - }); sseSource.registerAfterRequest(vertxResponse); }