Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

take() ignored when resuming after Exceptions$ErrorCallbackNotImplemented #858

Closed
yamass opened this issue Jun 2, 2020 · 9 comments
Closed
Labels
bug superseded Issue is superseded by another

Comments

@yamass
Copy link

yamass commented Jun 2, 2020

I am using the resume() functionality. I have experienced a very strange behavior that I consider one or multiple bugs. It might also be related to #857, since it also logs the message mentioned there.

When calling requestStream() and subscribing to the returned flux without an error handler, the connection gets closed with the following error log:

02-06-2020 19:20:31.306 [reactor-tcp-nio-2] ERROR r.n.c.ChannelOperationsHandler - [id: 0x8793d78e, L:/127.0.0.1:55222 - R:localhost/127.0.0.1:7890] Error was received while reading the incoming data. The connection will be closed.
reactor.core.Exceptions$ErrorCallbackNotImplemented: ApplicationErrorException (0x201): Some Exception for this test
Caused by: io.rsocket.exceptions.ApplicationErrorException: Some Exception for this test
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76)
	at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:646)
	at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:585)

When reusing the client connection for another request, the server does receive the request and starts processing it (seen from the logs). However, it does not react on cancel signals from the client, nor does the client receive any results (next signals).

See the code below.

Expected Behavior

After reconnect, cancel and next signals should still be honored.

Actual Behavior

They are not. See test code.

Steps to Reproduce

@Test
public void strangeResumeBehavior() throws Exception {
	CloseableChannel server = RSocketServer.create((setup, sendingRSocket) -> Mono.just(new RSocket() {
		@Override
		public Flux<Payload> requestStream(Payload payload) {
			if (payload.getDataUtf8().equals("flux")) {
				return Flux.interval(Duration.ofMillis(1))
						.doOnNext(aLong -> System.out.println("Server-side doOnNext: " + aLong))
						.doOnCancel(() -> System.out.println("Server-side doOnCancel"))
						.map(aLong -> DefaultPayload.create("" + aLong));
			} else {
				return Flux.error(new RuntimeException("Some Exception for this test"));
			}
		}
	}))
			.resume(new Resume()
					.sessionDuration(Duration.ofSeconds(30))
					.retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(2))
							.doBeforeRetry(retrySignal -> System.out.println("RETRYING to connect.")))) // never called. why not?
			.bind(TcpServerTransport.create(TcpServer.create().host("localhost").port(7890)))
			.block();

	RSocket client = RSocketConnector.create()
			.resume(new Resume()
					.sessionDuration(Duration.ofSeconds(30))
					.retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))))
			.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(7890)))
			.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10)))
			.cache()
			.block();


	client.requestStream(DefaultPayload.create("flux"))
			.take(10) // works
			.subscribe(payload -> System.out.println(payload.getDataUtf8()));

	Thread.sleep(1000);

	client.requestStream(DefaultPayload.create("exception"))
			.subscribe(payload -> System.out.println(payload.getDataUtf8()));  // No error handler, so "The connection will be closed."

	Thread.sleep(1000);

	client.requestStream(DefaultPayload.create("flux"))
			.take(10) // <<< Ignored. Why?
			.subscribe(payload -> System.out.println(payload.getDataUtf8())); // <<< Never arrives...

	Thread.sleep(6_000);
}

Possible Solution

Your Environment

  • RSocket version(s) used: 1.0.0
  • Other relevant libraries versions (eg. netty, ...): default rsocket dependencies
  • Platform (eg. JVM version (javar -version) or Node version (node --version)):
    openjdk 14.0.1 2020-04-14
    OpenJDK Runtime Environment AdoptOpenJDK (build 14.0.1+7)
    OpenJDK 64-Bit Server VM AdoptOpenJDK (build 14.0.1+7, mixed mode, sharing)
  • OS and version (eg uname -a): MacOS
@OlegDokuka
Copy link
Member

Connection close is related to #857. A memory leak may be related to the fact that resume implementation right now is unstable

@OlegDokuka
Copy link
Member

Let me fix #857 first and then we can double-check whether cancel works in that scenario

@OlegDokuka OlegDokuka added the blocked Blocked due to another dependency label Jun 2, 2020
@yamass
Copy link
Author

yamass commented Jun 2, 2020

Thanks, @OlegDokuka.

In the meantime, I will workaround by registering error handlers everywhere (a good idea anyway).

@OlegDokuka
Copy link
Member

OlegDokuka commented Jun 2, 2020

I will ping you shortly to check whether a fix for #857 causes a different issue with that one. (You would need to check it with a specific artifact from a PR build)

@OlegDokuka
Copy link
Member

@yamass is this issue can be reproduced when the .subscribe error handler is implemented?

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Jun 3, 2020

@OlegDokuka and are discussing this. Here are some findings.

The subscribe without an error callback results in a Reactor ErrorCallbackNotImplemented which propagates through onXxx methods and that is not allowed by Reactive Streams. The error reaches Reactor Netty and the connection is closed.

This is okay when resume is not enabled since the connection closing propagates to RSocketRequester which cleans up and causes further requests to fail as expected.

When resume is enabled, the connection closing is expected and ResumeableDuplexConnection obtains a new one. This is done within a switchMap operator but the bubbling of the exception leaves it in an inconsistent and while it allows new requests to be made, data to go out and data to be received, the but received messages are never drained.

We are going to look to Reactor to address the root cause of this, e.g. via reactor/reactor-core#2176 and possibly more broadly via reactor/reactor-core#1431 but in the mean time we may need to protect against this and stop the bubbling of the errors in RSocketRequester.

We may also need similar protections on the responder side as well.

@rstoyanchev rstoyanchev added this to the 1.0.1 milestone Jun 3, 2020
@rstoyanchev rstoyanchev added bug and removed blocked Blocked due to another dependency on-hold labels Jun 3, 2020
@yamass
Copy link
Author

yamass commented Jun 3, 2020

@OlegDokuka As @rstoyanchev already described, this does not happen when subscribing with an error handler. This is a practicable workaround in short term.

@rstoyanchev Is there any way I can make the connection not be closed on these kinds of errors? In my application, all requests are completely independent of each other, so if one of them fails with a bubbling Exception and causes the connection to close, this is not an optimal behavior for us. Are there configuration options for this?

@OlegDokuka
Copy link
Member

OlegDokuka commented Jun 3, 2020

@yamass There is no such configurations. We will provide a short time workaround as of now. As the further steps, it will be fixed in reactor

@OlegDokuka
Copy link
Member

Hey @yamass!

#863 should fix it and this fix is available as a part of 1.0.1 release.

Let us know if anything.

Cheers,
Oleh

Sent with GitHawk

@OlegDokuka OlegDokuka added the superseded Issue is superseded by another label Jun 9, 2020
@OlegDokuka OlegDokuka removed this from the 1.0.1 milestone Jun 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug superseded Issue is superseded by another
Projects
None yet
Development

No branches or pull requests

3 participants