Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPE in websocket server if client has keepAlive #462

Closed
tsachev opened this issue Jan 24, 2018 · 4 comments
Closed

NPE in websocket server if client has keepAlive #462

tsachev opened this issue Jan 24, 2018 · 4 comments

Comments

@tsachev
Copy link

tsachev commented Jan 24, 2018

If I create my websocket client with keepAlive like this

RSocketFactory.connect().keepAlive().transport(WebsocketClientTransport.create(7878)).start();

the server throws npe (see below) when it receives the keep alive frame.
The strange thing is that I do not see this with tcp transport at least for ping pong tests.

I'm just guessing that the problem is somewhere in WebsocketDuplexConnection or probably in FragmentationDuplexConnection if the websocket frames are not sized same as tcp?

java.lang.NullPointerException
	at io.rsocket.frame.FrameHeaderFlyweight.frameType(FrameHeaderFlyweight.java:191)
	at io.rsocket.Frame.getType(Frame.java:219)
	at io.rsocket.Frame.ensureFrameType(Frame.java:595)
	at io.rsocket.Frame$Keepalive.hasRespondFlag(Frame.java:586)
	at io.rsocket.RSocketServer.lambda$handleKeepAliveFrame$17(RSocketServer.java:394)
	at reactor.core.publisher.MonoRunnable.subscribe(MonoRunnable.java:40)
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3008)
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.onNext(FluxMergeSequential.java:230)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:555)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:631)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:671)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108)
	at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211)
	at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326)
	at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:319)
	at reactor.ipc.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:381)
	at reactor.ipc.netty.http.server.HttpServerWSOperations.onInboundNext(HttpServerWSOperations.java:107)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:132)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at java.lang.Thread.run(Thread.java:748)
java.lang.NullPointerException
	at io.rsocket.frame.FrameHeaderFlyweight.frameType(FrameHeaderFlyweight.java:191)
	at io.rsocket.Frame.getType(Frame.java:219)
	at io.rsocket.Frame.ensureFrameType(Frame.java:595)
	at io.rsocket.Frame$Keepalive.hasRespondFlag(Frame.java:586)
	at io.rsocket.RSocketServer.lambda$handleKeepAliveFrame$17(RSocketServer.java:394)
	at reactor.core.publisher.MonoRunnable.subscribe(MonoRunnable.java:40)
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3008)
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.onNext(FluxMergeSequential.java:230)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:555)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:631)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:671)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108)
	at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211)
	at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326)
	at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:319)
	at reactor.ipc.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:381)
	at reactor.ipc.netty.http.server.HttpServerWSOperations.onInboundNext(HttpServerWSOperations.java:107)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:132)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at java.lang.Thread.run(Thread.java:748)
java.lang.NullPointerException
	at io.rsocket.frame.FrameHeaderFlyweight.frameType(FrameHeaderFlyweight.java:191)
	at io.rsocket.Frame.getType(Frame.java:219)
	at io.rsocket.Frame.ensureFrameType(Frame.java:595)
	at io.rsocket.Frame$Keepalive.hasRespondFlag(Frame.java:586)
	at io.rsocket.RSocketServer.lambda$handleKeepAliveFrame$17(RSocketServer.java:394)
	at reactor.core.publisher.MonoRunnable.subscribe(MonoRunnable.java:40)
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3008)
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.onNext(FluxMergeSequential.java:230)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:555)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:631)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:671)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108)
	at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211)
	at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326)
	at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:319)
	at reactor.ipc.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:381)
	at reactor.ipc.netty.http.server.HttpServerWSOperations.onInboundNext(HttpServerWSOperations.java:107)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:132)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at java.lang.Thread.run(Thread.java:748)

@mostroverkhov
Copy link
Member

mostroverkhov commented Jan 24, 2018

@tsachev Likely problem is in Core - Frame is already released when Keep-Alive handling code tries to decode It (one, two, three). This can happen when Mono's lambda inside handleKeepAliveFrame is executed on non-synchronous scheduler.

@tsachev
Copy link
Author

tsachev commented Jan 24, 2018

The release in the finally block is called before anyone has a chance to subscribe to the returned mono. So even on synchronous scheduler it won't work if the ref count of frame's content was 1. With tcp it's 2 but I don't know exactly why.

@tsachev tsachev changed the title NEP in websocket server if client has keepAlive NPE in websocket server if client has keepAlive Jan 25, 2018
@mostroverkhov
Copy link
Member

@tsachev 1 refCount is by reactor-netty, which pools Bytebuffs, another one is by RSocket-java. Also, I think this issue was fixed in #467, can you confirm?

@yschimke
Copy link
Member

Closing as I'm using this in rsocket-cli

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants