diff --git a/reactor-netty-http/build.gradle b/reactor-netty-http/build.gradle index 19b93d4312..fc684926c8 100644 --- a/reactor-netty-http/build.gradle +++ b/reactor-netty-http/build.gradle @@ -249,6 +249,9 @@ task japicmp(type: JapicmpTask) { compatibilityChangeExcludes = [ "METHOD_NEW_DEFAULT" ] methodExcludes = [ + 'reactor.netty.http.HttpOperations$PostHeadersNettyOutbound#accept(java.lang.Object)', + 'reactor.netty.http.HttpOperations$PostHeadersNettyOutbound#accept(java.lang.Throwable)', + 'reactor.netty.http.HttpOperations$PostHeadersNettyOutbound#run()' ] } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java index 42eacc8a4b..3722638a0e 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java @@ -45,6 +45,7 @@ import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.FutureMono; @@ -469,7 +470,7 @@ protected HttpMessage prepareHttpMessage(ByteBuf buffer) { static final Pattern SCHEME_PATTERN = Pattern.compile("^(https?|wss?)://.*$"); - protected static final class PostHeadersNettyOutbound extends AtomicBoolean implements NettyOutbound, Consumer, Runnable { + protected static final class PostHeadersNettyOutbound extends AtomicBoolean implements NettyOutbound { final Mono source; final HttpOperations parent; @@ -478,8 +479,13 @@ protected static final class PostHeadersNettyOutbound extends AtomicBoolean impl public PostHeadersNettyOutbound(Mono source, HttpOperations parent, @Nullable ByteBuf msg) { this.msg = msg; if (msg != null) { - this.source = source.doOnError(this) - .doOnCancel(this); + this.source = source.doFinally(signalType -> { + if (signalType == SignalType.CANCEL || signalType == SignalType.ON_ERROR) { + if (msg.refCnt() > 0 && compareAndSet(false, true)) { + msg.release(); + } + } + }); } else { this.source = source; @@ -487,20 +493,6 @@ public PostHeadersNettyOutbound(Mono source, HttpOperations parent, this.parent = parent; } - @Override - public void run() { - if (msg != null && msg.refCnt() > 0 && compareAndSet(false, true)) { - msg.release(); - } - } - - @Override - public void accept(Throwable throwable) { - if (msg != null && msg.refCnt() > 0 && compareAndSet(false, true)) { - msg.release(); - } - } - @Override public Mono then() { return source;