Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Handling of RejectedExecutionException in ExecutorScheduler #5964

Closed
runeflobakk opened this issue Apr 17, 2018 · 3 comments
Closed

2.x: Handling of RejectedExecutionException in ExecutorScheduler #5964

runeflobakk opened this issue Apr 17, 2018 · 3 comments

Comments

@runeflobakk
Copy link

runeflobakk commented Apr 17, 2018

Hello,

I am a relatively new user of RxJava 2 (v2.1.12), and really enjoy using it! Thanks!

I have started to go through the typical "what can go wrong" in a feature I am currently implementing in a server application. I want to be somewhat conservative with the available threads and give a "resource busy"-response back to the client in case all threads are currently in use.

I have created a Scheduler based on a ThreadPoolExecutor with a sparse amount of threads, and not much room for queuing tasks, for subscribing on the Flowable I use for streaming. When testing threadpool exhaustion I see that the call to Flowable.subscribe prints out a RejectedExecutionException, pretty much as expected.

I have read about the error handling, and as well seen a comment about RxJava only passing fatal exceptions upwards, which make very much sense.

I was initially hoping to somehow be able to catch the RejectedExecutionException from trying to schedule a subscription, and use it to return a "resource busy"-response to the client. From inspecting the source code of ExecutorScheduler, I see that this exception from trying to submit to the underlying ExecutorService are hardcoded to be passed to RxJavaPlugins.onError(..).

So my question is really, am I thinking wrong with wanting to actually use the semantics of my ThreadPoolExecutor for controlling if I should handle the request from the client? Hooking on to the global RxJavaPlugins.setErrorHandler does not seem like something I want to do for per-request error handling, so that's why I am thinking that I have got something wrong with my design. I could do some heuristics on my ExecutorService/thread pool/queue to pre-emptively decide if I should acquire and subscribe on the Flowable, and if not then return an appropriate response to the client, but it seem a bit less error-prone to just configure my ExecutorService to handle the maximum of threads I would like, and followingly lean on not being able to schedule a subscription to be able to return the appropriate resource busy-response. As I understand, I will not be able to detect i subscribing fails because of the ExecutorService rejected it.

Anyone have any thoughts on this matter? Is it a deliberate choice not to offer any callback or other means for catching RejectedExecutionException on a per-subscription basis? How do you cope with ExecutorService exhaustion and task rejection? Should I instead inspect the ExecutorService before subscribing, and abort the request myself instead of relying on the ExecutorService rejecting tasks when it is exhausted?

Thank you in advance for your input! And please tell me if anything is unclear and I need to include more context. I would really like to get some more insight on the design around this particular kind of error handling 😃

@akarnokd
Copy link
Member

Limiting the use of threads is usually done by relying on backpressure and setting maxConcurrency parameters in flows. The Scheduler API is an abstraction over an asynchronous boundary and often there is no good way to notify the client flow about a rejected task without violating sequential requirements of of the Reactive Streams protocol. There exist Scheduler implementations that lets you control the number of total worker threads and thus limit flows that way.

@runeflobakk
Copy link
Author

I understand. I need to read up on backpressure. As I understand it then, I should more or less treat Schedulers/ExecutorServices as more or less unbounded with RxJava2, and be sure to properly configure backpressure to restrict concurrency.

I think maybe my problem is that I create one Flowable per client request, and using FlowableOnSubscribe to push database results to a FlowableEmitter, and using a cursor-based ResultSet. So the maxConcurrency does not apply to me as I understand it, as I have one subscription per Flowable, per request. The subscription is to stream the elements to the client.

@akarnokd
Copy link
Member

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants