Skip to content

Commit

Permalink
Merge pull request quarkusio#27559 from cescoffier/sse-client-cancell…
Browse files Browse the repository at this point in the history
…ation

Fix SSE cancellation in the Reactive REST Client
  • Loading branch information
geoand authored Aug 29, 2022
2 parents e9e11b3 + d876224 commit f327d4c
Showing 1 changed file with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ public MultiRequest(MultiEmitter<? super R> emitter) {
});
}

void emit(R item) {
if (!isCancelled()) {
emitter.emit(item);
}
}

void fail(Throwable t) {
if (!isCancelled()) {
emitter.fail(t);
cancel();
}
}

void complete() {
if (!isCancelled()) {
emitter.complete();
cancel();
}
}

public boolean isCancelled() {
return onCancel.get() == CLEARED;
}
Expand Down Expand Up @@ -130,20 +150,20 @@ private <R> void registerForSse(MultiRequest<? super R> multiRequest,
// For now we don't want multi to reconnect
SseEventSourceImpl sseSource = new SseEventSourceImpl(invocationBuilder.getTarget(),
invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS);
// FIXME: deal with cancellation

multiRequest.onCancel(() -> {
sseSource.close();
});
sseSource.register(event -> {
// DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or
// the content-type SSE field
multiRequest.emitter.emit((R) event.readData(responseType));
multiRequest.emit(event.readData(responseType));
}, error -> {
multiRequest.emitter.fail(error);
multiRequest.fail(error);
}, () -> {
multiRequest.emitter.complete();
multiRequest.complete();
});
// watch for user cancelling
multiRequest.onCancel(() -> {
sseSource.close();
});
sseSource.registerAfterRequest(vertxResponse);
}

Expand Down

0 comments on commit f327d4c

Please sign in to comment.