From c6b6ccdc890f980ac910f0ea01b828eb6720a15b Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Mon, 13 May 2024 16:25:46 +0200 Subject: [PATCH] Close ResponseBodyEmitter in case of write errors Prior to this commit, the `ReactiveTypeHandler` would handle `Flux`-like return types from controller methods and adapt them to SSE streams using the `SseEmitter`/`ResponseBodyEmitter` APIs. In case an `IOException` is thrown while writing to the HTTP response stream, the `ReactiveTypeHandler` would rely on the Servlet container to call `AsyncListener#onError` - this would be the signal for Spring MVC to complete the async exchange. To prevent racing issues between this signal and the actual handling of the exception, changes like gh-20173 were applied. Since then, robust checks were added with gh-32340 in `StandardServletAsyncWebRequest.LifecycleHttpServletResponse`. With Jetty 12, `AsyncListener#onError` would not be called as the error would happen while writing in blocking mode to the response (so, not using the Servlet WriteListener contract). But still, such `IOException` would still result in the closing of the HTTP connection. As of Jetty 12.0.4, this is no longer the case and the party managing the async lifecycle is in charge of completing the exchange, as it should. This means that the current behavior leaks HTTP connections for these cases and causes memory issues. This commit ensures that such exceptions happening during response writes are caught and result in the completion of the `SSEEmitter` and the closing of the exchange. Even if other Servlet containers still propagate the error `AsyncListener#onError`, competing signals are still managed with gh-32340. Closes gh-32629 --- .../annotation/ReactiveTypeHandler.java | 1 + .../annotation/ResponseBodyEmitter.java | 20 ------- .../annotation/ReactiveTypeHandlerTests.java | 54 ++++++++++++++++++- .../annotation/ResponseBodyEmitterTests.java | 17 ------ 4 files changed, 53 insertions(+), 39 deletions(-) diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java index 9bf2345f37ef..4d22dd957d95 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java @@ -337,6 +337,7 @@ public void run() { logger.trace("Send for " + this.emitter + " failed: " + ex); } terminate(); + this.emitter.completeWithError(ex); return; } } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index 055e5906df46..b801457c0ac5 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -79,16 +79,6 @@ public class ResponseBodyEmitter { @Nullable private Throwable failure; - /** - * After an I/O error, we don't call {@link #completeWithError} directly but - * wait for the Servlet container to call us via {@code AsyncListener#onError} - * on a container thread at which point we call completeWithError. - * This flag is used to ignore further calls to complete or completeWithError - * that may come for example from an application try-catch block on the - * thread of the I/O error. - */ - private boolean ioErrorOnSend; - private final DefaultCallback timeoutCallback = new DefaultCallback(); private final ErrorCallback errorCallback = new ErrorCallback(); @@ -198,7 +188,6 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro this.handler.send(object, mediaType); } catch (IOException ex) { - this.ioErrorOnSend = true; throw ex; } catch (Throwable ex) { @@ -234,7 +223,6 @@ private void sendInternal(Set items) throws IOException { this.handler.send(items); } catch (IOException ex) { - this.ioErrorOnSend = true; throw ex; } catch (Throwable ex) { @@ -255,10 +243,6 @@ private void sendInternal(Set items) throws IOException { * related events such as an error while {@link #send(Object) sending}. */ public synchronized void complete() { - // Ignore complete after IO failure on send - if (this.ioErrorOnSend) { - return; - } this.complete = true; if (this.handler != null) { this.handler.complete(); @@ -277,10 +261,6 @@ public synchronized void complete() { * {@link #send(Object) sending}. */ public synchronized void completeWithError(Throwable ex) { - // Ignore complete after IO failure on send - if (this.ioErrorOnSend) { - return; - } this.complete = true; this.failure = ex; if (this.handler != null) { diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java index ab0b41a5e2f9..e5422c7b8e81 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java @@ -16,6 +16,7 @@ package org.springframework.web.servlet.mvc.method.annotation; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -372,6 +373,24 @@ void writeText() throws Exception { assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog"); } + @Test + void failOnWriteShouldCompleteEmitter() throws Exception { + + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); + ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class)); + + ErroringEmitterHandler emitterHandler = new ErroringEmitterHandler(); + emitter.initialize(emitterHandler); + + sink.tryEmitNext("The quick"); + sink.tryEmitNext(" brown fox jumps over "); + sink.tryEmitNext("the lazy dog"); + sink.tryEmitComplete(); + + assertThat(emitterHandler.getHandlingStatus()).isEqualTo(HandlingStatus.ERROR); + assertThat(emitterHandler.getFailure()).isInstanceOf(IOException.class); + } + @Test void writeFluxOfString() throws Exception { @@ -451,6 +470,10 @@ private static class EmitterHandler implements ResponseBodyEmitter.Handler { private final List values = new ArrayList<>(); + private HandlingStatus handlingStatus; + + private Throwable failure; + public List getValues() { return this.values; @@ -460,22 +483,33 @@ public String getValuesAsText() { return this.values.stream().map(Object::toString).collect(Collectors.joining()); } + public HandlingStatus getHandlingStatus() { + return this.handlingStatus; + } + + public Throwable getFailure() { + return this.failure; + } + @Override - public void send(Object data, MediaType mediaType) { + public void send(Object data, MediaType mediaType) throws IOException { this.values.add(data); } @Override - public void send(Set items) { + public void send(Set items) throws IOException { items.forEach(item -> this.values.add(item.getData())); } @Override public void complete() { + this.handlingStatus = HandlingStatus.SUCCESS; } @Override public void completeWithError(Throwable failure) { + this.handlingStatus = HandlingStatus.ERROR; + this.failure = failure; } @Override @@ -491,6 +525,22 @@ public void onCompletion(Runnable callback) { } } + private enum HandlingStatus { + SUCCESS,ERROR + } + + private static class ErroringEmitterHandler extends EmitterHandler { + @Override + public void send(Object data, MediaType mediaType) throws IOException { + throw new IOException(); + } + + @Override + public void send(Set items) throws IOException { + throw new IOException(); + } + } + private static class Bar { private final String value; diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java index 2b74cde402c2..10f7bc639af6 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java @@ -148,23 +148,6 @@ void sendWithError() throws Exception { verifyNoMoreInteractions(this.handler); } - @Test // gh-30687 - void completeIgnoredAfterIOException() throws Exception { - this.emitter.initialize(this.handler); - verify(this.handler).onTimeout(any()); - verify(this.handler).onError(any()); - verify(this.handler).onCompletion(any()); - verifyNoMoreInteractions(this.handler); - - willThrow(new IOException()).given(this.handler).send("foo", MediaType.TEXT_PLAIN); - assertThatIOException().isThrownBy(() -> this.emitter.send("foo", MediaType.TEXT_PLAIN)); - verify(this.handler).send("foo", MediaType.TEXT_PLAIN); - verifyNoMoreInteractions(this.handler); - - this.emitter.complete(); - verifyNoMoreInteractions(this.handler); - } - @Test // gh-30687 void completeAfterNonIOException() throws Exception { this.emitter.initialize(this.handler);