Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"IllegalReferenceCountException Occurs with HTTP/2 Due to Race Condition Between HttpServerOperation and SimpleCompressionHandler" #3366

Closed
ojh3636 opened this issue Jul 24, 2024 · 17 comments · Fixed by #3369
Assignees
Labels
type/bug A general bug
Milestone

Comments

@ojh3636
Copy link

ojh3636 commented Jul 24, 2024

I use Spring Cloud Gateway. After changing the inbound protocol from HTTP/1.1 to HTTP/2, we occasionally encounter IllegalReferenceCountException.
I observed that the error occurs only in two specific locations: the finally release block in SimpleCompressionHandler#decode and the request.release() block in HttpServerOperations#onInboundNext.

This issue happens only when an HTTP/2 frame is decoded into a FullHttpRequest (Header Frame with END_STREAM flgas that can be created by a simple GET request).

In the code below, the stateChange hook triggers Spring Cloud Gateway to proxy the downstream request to upstream on other thread. Subsequently, request.release() is called.

@Override
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        try {
            listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);
        } catch (Exception e) {
            onInboundError(e);
            ReferenceCountUtil.release(msg);
            return;
        }
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            if (request.content().readableBytes() > 0) {
                super.onInboundNext(ctx, msg);
            } else {
                request.release();
            }
            if (isHttp2()) {
                // Force auto read to enable more accurate close selection now that inbound is done
                channel().config().setAutoRead(true);
                onInboundComplete();
            }
        }
        return;
    }
}

However, if the upstream response arrives and meets the compression condition before request.release() is called, the decode function in SimpleCompressionHandler is invoked. Since HttpServerOperations' request.release() hasn't been called yet, the code does not create a new DefaultHttpRequest. Instead, it proceeds with the decode process and tries to retain the FullHttpRequest. Simultaneously, HttpServerOperations releases the same content. After HttpServerOperations releases the content (which causes the byteBuf to be freed), the retain operation at decoder throws an IllegalReferenceCountException. Then the finally block attempts to release the byteBuf again, leading to the exception.

void decode(ChannelHandlerContext ctx, HttpRequest msg) {
    List<Object> out = new ArrayList<>();
    HttpRequest request = msg;
    try {
        if (msg instanceof FullHttpRequest && ((FullHttpRequest) msg).content().refCnt() == 0) {
            // This can happen only in HTTP2 use case and delayed response
            // When the incoming FullHttpRequest content has 0 readableBytes, it is released immediately
            // decode(...) will observe a freed content
            request = new DefaultHttpRequest(msg.protocolVersion(), msg.method(), msg.uri(), msg.headers());
        }
        super.decode(ctx, request, out);
    } catch (DecoderException e) {
        throw e;
    } catch (Exception e) {
        throw new DecoderException(e);
    } finally {
        // ReferenceCountUtil.retain(...) is invoked in decode(...) so release(...) here
        ReferenceCountUtil.release(request);
        out.clear();
    }
}

Expected Behavior

There should be no IllegalReferenceCountException inside Reactor Netty logic.

Actual Behavior

IllegalReferenceCountException can occur.

Steps to Reproduce

Possible Solution

Move the hook timing of listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED); after request.release() or use some other external synchronization to avoid double release.

Your Environment

  • Reactor version(s) used: 3.4.39
  • Other relevant libraries versions (e.g., netty, ...):
    • org.springframework.cloud:spring-cloud-gateway-server:3.1.9
    • reactor-netty: 1.0.46
    • netty: 4.1.111.FINAL
  • JVM version (java -version): OpenJDK Runtime Environment Temurin-21.0.1+12
  • OS and version (e.g., uname -a): Linux 6.5.3-1.el8.elrepo.x86_64
@ojh3636 ojh3636 added status/need-triage A new issue that still need to be evaluated as a whole type/bug A general bug labels Jul 24, 2024
@ojh3636
Copy link
Author

ojh3636 commented Jul 24, 2024

The Full stack trace is here.

	

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:141)
	at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90)
	at reactor.netty.http.server.SimpleCompressionHandler.decode(SimpleCompressionHandler.java:76)
	at reactor.netty.http.server.HttpServerOperations.compression(HttpServerOperations.java:611)
	at reactor.netty.http.server.HttpServerOperations.afterMarkSentHeaders(HttpServerOperations.java:676)
	at reactor.netty.http.HttpOperations.lambda$then$3(HttpOperations.java:199)
	at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:100)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:236)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
	at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:197)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at im.toss.server.apigateway.internal.reactor.TossChannelSendOperator$ConditionalCancelSubscriber.onNext(TossChannelSendOperator.java:95)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	at reactor.netty.channel.FluxReceive.request(FluxReceive.java:133)
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
	at im.toss.server.apigateway.internal.reactor.TossChannelSendOperator$ConditionalCancelSubscriber.request(TossChannelSendOperator.java:127)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onSubscribe(ChannelSendOperator.java:166)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
	at im.toss.server.apigateway.internal.reactor.TossChannelSendOperator$ConditionalCancelSubscriber.onSubscribe(TossChannelSendOperator.java:80)
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171)
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:172)
	at reactor.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:150)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:405)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:840)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:141)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:639)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at reactor.netty.http.server.Http2StreamBridgeServerHandler.channelRead(Http2StreamBridgeServerHandler.java:163)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	at io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.doRead0(AbstractHttp2StreamChannel.java:981)
	at io.netty.handler.codec.http2.AbstractHttp2StreamChannel.fireChildRead(AbstractHttp2StreamChannel.java:601)
	at io.netty.handler.codec.http2.Http2MultiplexHandler.channelRead(Http2MultiplexHandler.java:193)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.http2.Http2FrameCodec.onHttp2Frame(Http2FrameCodec.java:717)
	at io.netty.handler.codec.http2.Http2FrameCodec$FrameListener.onHeadersRead(Http2FrameCodec.java:649)
	at io.netty.handler.codec.http2.Http2FrameCodec$FrameListener.onHeadersRead(Http2FrameCodec.java:643)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onHeadersRead(Http2FrameListenerDecorator.java:46)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onHeadersRead(Http2FrameListenerDecorator.java:46)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onHeadersRead(Http2EmptyDataFrameListener.java:63)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:435)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:350)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:475)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:483)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:247)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:164)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:186)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:61)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:61)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:391)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:451)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:840)

@violetagg violetagg self-assigned this Jul 24, 2024
@violetagg violetagg removed the status/need-triage A new issue that still need to be evaluated as a whole label Jul 24, 2024
@ojh3636
Copy link
Author

ojh3636 commented Jul 24, 2024

I think the second trace is not related to this issue but related other code that release nettyRequest (maybe other request decoder, I'll do further investigation)

@violetagg
Copy link
Member

@ojh3636 If I prepare a change, will you be able to test the SNAPSHOT version?

@ojh3636
Copy link
Author

ojh3636 commented Jul 24, 2024

Thanks for your attention.
Yes I can test with SNAPSHOT version.

violetagg added a commit that referenced this issue Jul 25, 2024
@violetagg violetagg linked a pull request Jul 25, 2024 that will close this issue
@violetagg violetagg reopened this Jul 25, 2024
@violetagg
Copy link
Member

@ojh3636 Depending on the version that you use, can you try - 2020.0.47-SNAPSHOT, 2022.0.22-SNAPSHOT or 2023.0.9-SNAPSHOT?
You can change it using Spring Boot properties:

<properties>
	...
	<reactor-bom.version>version</reactor-bom.version>
</properties>

@violetagg violetagg added this to the 1.0.48 milestone Jul 25, 2024
@ojh3636
Copy link
Author

ojh3636 commented Jul 25, 2024

@violetagg Wow! Thank you for the quick follow-up!
I'll test it in the development and production environments.
I should have results in about 24 hours. Is that okay?

@violetagg
Copy link
Member

thanks

@ojh3636
Copy link
Author

ojh3636 commented Jul 25, 2024

The error still occurs 😢
Here is the stack trace.

Original Stack Trace:
		at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
		at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
		at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
		at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:141)
		at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90)
		at reactor.netty.http.server.SimpleCompressionHandler.decode(SimpleCompressionHandler.java:94)
		at reactor.netty.http.server.SimpleCompressionHandler.write(SimpleCompressionHandler.java:64)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:891)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:956)
		at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1263)
		at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
		at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:405)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:840)

Seconds one is here.

		at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
		at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
		at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
		at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:141)
		at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90)
		at reactor.netty.http.server.SimpleCompressionHandler.decode(SimpleCompressionHandler.java:94)
		at reactor.netty.http.server.SimpleCompressionHandler.write(SimpleCompressionHandler.java:64)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:891)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:956)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:982)
		at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:950)
		at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:1000)
		at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1024)
		at io.netty.handler.codec.http2.AbstractHttp2StreamChannel.writeAndFlush(AbstractHttp2StreamChannel.java:535)
		at reactor.netty.http.HttpOperations.lambda$then$3(HttpOperations.java:206)
		at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:100)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:236)
		at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
		at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:197)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
		at reactor.netty.channel.FluxReceive.request(FluxReceive.java:133)
		at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
		at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
		at im.toss.server.apigateway.internal.reactor.TossChannelSendOperator$ConditionalCancelSubscriber.request(TossChannelSendOperator.java:127)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
		at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onSubscribe(ChannelSendOperator.java:166)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
		at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171)
		at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
		at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:172)
		at reactor.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:150)
		at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
		at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:405)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:840)

@ojh3636
Copy link
Author

ojh3636 commented Jul 25, 2024

I don't know if this helps, but when I patched the SimpleCompressionHandler as shown below, the error did not occur for about 2 days (approximately 800 million requests)

    void decode(ChannelHandlerContext ctx, HttpRequest msg) {
        List<Object> out = new ArrayList<>();
        HttpRequest request = msg;
        try {
            // CUSTOM: when FullHttpRequest in come, always make DefaultHttpRequest
            if (msg instanceof FullHttpRequest) {
                request = new DefaultHttpRequest(msg.protocolVersion(), msg.method(), msg.uri(), msg.headers());
            }
            super.decode(ctx, request, out);
        }
        catch (DecoderException e) {
            throw e;
        }
        catch (Exception e) {
            throw new DecoderException(e);
        }
        finally {
            // ReferenceCountUtil.retain(...) is invoked in decode(...) so release(...) here
            ReferenceCountUtil.release(request);
            out.clear();
        }
    }

@violetagg
Copy link
Member

violetagg commented Jul 25, 2024

@ojh3636 The problem might be different now. The change ensures that SimpleCompressionHandler.decode/HttpServerOperations.onInboundNext are always invoked in an event loop - this means they cannot happen in parallel.

@ojh3636
Copy link
Author

ojh3636 commented Jul 25, 2024

@violetagg
Yes, it might be a consequence of several different issues being mixed. As far as I can see from the code, there is no way for the application code to directly reference the nettyRequest (contained in HttpServerOperations) to release that content.

Do you have any ideas on how this can be resolved?

@violetagg
Copy link
Member

@ojh3636 Actually it can (do you have any caching in the gateway?):



@ojh3636
Copy link
Author

ojh3636 commented Jul 25, 2024

@violetagg
Ah, I see. So the exchange.request.getBody might actually return the fullHttpRequest's content. But we don't use any caching behavior in our code, especially for this gateway (we operate several kinds of gateways).

Below is our reactor checkpoint log. The logging filter only logs the path and headers, and AddContext filter only mutate headers or write reactor context.

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ [CatchEnvoyErrorsFilter]
	*__checkpoint ⇢ [AddContextFilter] 
	*__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ ...StatusLoggingExtraDataFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ ...StatusLoggingWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ ...HandlerMDCInfoWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ ...InternalHeaderWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ [AddEventIdFilter] CQ3V3GQVQL
	|_ checkpoint ⇢ AddEventIdWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP GET "{url}" [ExceptionHandlingWebHand

@ojh3636
Copy link
Author

ojh3636 commented Jul 25, 2024

@violetagg
Oh, I found a clue. Unlike the previous version, all requests that encounter this error triggered by a downstream connection close. (which means client close the connection first)
Below is the Envoy access log. The public-gateway is our application.
Is it possible that FluxReceive's doCancel and SimpleCompressionHandler have some race condition?
Or else where in ChannelOperation#send doOnDiscard? because spring cloud gateway relay the downstream request of the HttpServerOperation to HttpClient's request body

	@Override
	public NettyOutbound send(Publisher<? extends ByteBuf> dataStream, Predicate<ByteBuf> predicate) {
		requireNonNull(predicate, "predicate");
		if (!channel().isActive()) {
			return then(Mono.error(AbortedException.beforeSend()));
		}
		if (dataStream instanceof Mono) {
			return then(((Mono<?>) dataStream).flatMap(m -> FutureMono.from(channel().writeAndFlush(m)))
			                                 .doOnDiscard(ByteBuf.class, ByteBuf::release));
		}
		return then(MonoSendMany.byteBufSource(dataStream, channel(), predicate));
	}
image

@ojh3636
Copy link
Author

ojh3636 commented Jul 25, 2024

  1. doCancel is also running on eventloop -> there is no race condition
  2. cloud gateway get body through retain() -> there is no issue when doOnDiscard called.

I'll investigate our application code... thank you

@violetagg violetagg added the for/user-attention This issue needs user attention (feedback, rework, etc...) label Jul 25, 2024
Copy link

github-actions bot commented Aug 2, 2024

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

Copy link

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Aug 10, 2024
@violetagg violetagg removed for/user-attention This issue needs user attention (feedback, rework, etc...) status/need-feedback labels Aug 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants