From 0bd9db9af87e7c014fe843ad2e18f0be3493aca8 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 17 Jul 2024 14:27:51 +0300 Subject: [PATCH] Use doFinally in order to handle cancel and error signals (#3356) --- reactor-netty-http/build.gradle | 3 +++ .../reactor/netty/http/HttpOperations.java | 26 +++++++------------ 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/reactor-netty-http/build.gradle b/reactor-netty-http/build.gradle index 19b93d4312..fc684926c8 100644 --- a/reactor-netty-http/build.gradle +++ b/reactor-netty-http/build.gradle @@ -249,6 +249,9 @@ task japicmp(type: JapicmpTask) { compatibilityChangeExcludes = [ "METHOD_NEW_DEFAULT" ] methodExcludes = [ + 'reactor.netty.http.HttpOperations$PostHeadersNettyOutbound#accept(java.lang.Object)', + 'reactor.netty.http.HttpOperations$PostHeadersNettyOutbound#accept(java.lang.Throwable)', + 'reactor.netty.http.HttpOperations$PostHeadersNettyOutbound#run()' ] } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java index 42eacc8a4b..3722638a0e 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java @@ -45,6 +45,7 @@ import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.FutureMono; @@ -469,7 +470,7 @@ protected HttpMessage prepareHttpMessage(ByteBuf buffer) { static final Pattern SCHEME_PATTERN = Pattern.compile("^(https?|wss?)://.*$"); - protected static final class PostHeadersNettyOutbound extends AtomicBoolean implements NettyOutbound, Consumer, Runnable { + protected static final class PostHeadersNettyOutbound extends AtomicBoolean implements NettyOutbound { final Mono source; final HttpOperations parent; @@ -478,8 +479,13 @@ protected static final class PostHeadersNettyOutbound extends AtomicBoolean impl public PostHeadersNettyOutbound(Mono source, HttpOperations parent, @Nullable ByteBuf msg) { this.msg = msg; if (msg != null) { - this.source = source.doOnError(this) - .doOnCancel(this); + this.source = source.doFinally(signalType -> { + if (signalType == SignalType.CANCEL || signalType == SignalType.ON_ERROR) { + if (msg.refCnt() > 0 && compareAndSet(false, true)) { + msg.release(); + } + } + }); } else { this.source = source; @@ -487,20 +493,6 @@ public PostHeadersNettyOutbound(Mono source, HttpOperations parent, this.parent = parent; } - @Override - public void run() { - if (msg != null && msg.refCnt() > 0 && compareAndSet(false, true)) { - msg.release(); - } - } - - @Override - public void accept(Throwable throwable) { - if (msg != null && msg.refCnt() > 0 && compareAndSet(false, true)) { - msg.release(); - } - } - @Override public Mono then() { return source;