You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am a relatively new user of RxJava 2 (v2.1.12), and really enjoy using it! Thanks!
I have started to go through the typical "what can go wrong" in a feature I am currently implementing in a server application. I want to be somewhat conservative with the available threads and give a "resource busy"-response back to the client in case all threads are currently in use.
I have created a Scheduler based on a ThreadPoolExecutor with a sparse amount of threads, and not much room for queuing tasks, for subscribing on the Flowable I use for streaming. When testing threadpool exhaustion I see that the call to Flowable.subscribe prints out a RejectedExecutionException, pretty much as expected.
I have read about the error handling, and as well seen a comment about RxJava only passing fatal exceptions upwards, which make very much sense.
I was initially hoping to somehow be able to catch the RejectedExecutionException from trying to schedule a subscription, and use it to return a "resource busy"-response to the client. From inspecting the source code of ExecutorScheduler, I see that this exception from trying to submit to the underlying ExecutorService are hardcoded to be passed to RxJavaPlugins.onError(..).
So my question is really, am I thinking wrong with wanting to actually use the semantics of my ThreadPoolExecutor for controlling if I should handle the request from the client? Hooking on to the global RxJavaPlugins.setErrorHandler does not seem like something I want to do for per-request error handling, so that's why I am thinking that I have got something wrong with my design. I could do some heuristics on my ExecutorService/thread pool/queue to pre-emptively decide if I should acquire and subscribe on the Flowable, and if not then return an appropriate response to the client, but it seem a bit less error-prone to just configure my ExecutorService to handle the maximum of threads I would like, and followingly lean on not being able to schedule a subscription to be able to return the appropriate resource busy-response. As I understand, I will not be able to detect i subscribing fails because of the ExecutorService rejected it.
Anyone have any thoughts on this matter? Is it a deliberate choice not to offer any callback or other means for catching RejectedExecutionException on a per-subscription basis? How do you cope with ExecutorService exhaustion and task rejection? Should I instead inspect the ExecutorService before subscribing, and abort the request myself instead of relying on the ExecutorService rejecting tasks when it is exhausted?
Thank you in advance for your input! And please tell me if anything is unclear and I need to include more context. I would really like to get some more insight on the design around this particular kind of error handling 😃
The text was updated successfully, but these errors were encountered:
Limiting the use of threads is usually done by relying on backpressure and setting maxConcurrency parameters in flows. The Scheduler API is an abstraction over an asynchronous boundary and often there is no good way to notify the client flow about a rejected task without violating sequential requirements of of the Reactive Streams protocol. There existScheduler implementations that lets you control the number of total worker threads and thus limit flows that way.
I understand. I need to read up on backpressure. As I understand it then, I should more or less treat Schedulers/ExecutorServices as more or less unbounded with RxJava2, and be sure to properly configure backpressure to restrict concurrency.
I think maybe my problem is that I create oneFlowable per client request, and using FlowableOnSubscribe to push database results to a FlowableEmitter, and using a cursor-based ResultSet. So the maxConcurrency does not apply to me as I understand it, as I have one subscription per Flowable, per request. The subscription is to stream the elements to the client.
Hello,
I am a relatively new user of RxJava 2 (v2.1.12), and really enjoy using it! Thanks!
I have started to go through the typical "what can go wrong" in a feature I am currently implementing in a server application. I want to be somewhat conservative with the available threads and give a "resource busy"-response back to the client in case all threads are currently in use.
I have created a
Scheduler
based on aThreadPoolExecutor
with a sparse amount of threads, and not much room for queuing tasks, for subscribing on the Flowable I use for streaming. When testing threadpool exhaustion I see that the call toFlowable.subscribe
prints out aRejectedExecutionException
, pretty much as expected.I have read about the error handling, and as well seen a comment about RxJava only passing fatal exceptions upwards, which make very much sense.
I was initially hoping to somehow be able to catch the
RejectedExecutionException
from trying to schedule a subscription, and use it to return a "resource busy"-response to the client. From inspecting the source code of ExecutorScheduler, I see that this exception from trying to submit to the underlyingExecutorService
are hardcoded to be passed toRxJavaPlugins.onError(..)
.So my question is really, am I thinking wrong with wanting to actually use the semantics of my
ThreadPoolExecutor
for controlling if I should handle the request from the client? Hooking on to the globalRxJavaPlugins.setErrorHandler
does not seem like something I want to do for per-request error handling, so that's why I am thinking that I have got something wrong with my design. I could do some heuristics on my ExecutorService/thread pool/queue to pre-emptively decide if I should acquire and subscribe on the Flowable, and if not then return an appropriate response to the client, but it seem a bit less error-prone to just configure my ExecutorService to handle the maximum of threads I would like, and followingly lean on not being able to schedule a subscription to be able to return the appropriate resource busy-response. As I understand, I will not be able to detect i subscribing fails because of the ExecutorService rejected it.Anyone have any thoughts on this matter? Is it a deliberate choice not to offer any callback or other means for catching
RejectedExecutionException
on a per-subscription basis? How do you cope with ExecutorService exhaustion and task rejection? Should I instead inspect the ExecutorService before subscribing, and abort the request myself instead of relying on the ExecutorService rejecting tasks when it is exhausted?Thank you in advance for your input! And please tell me if anything is unclear and I need to include more context. I would really like to get some more insight on the design around this particular kind of error handling 😃
The text was updated successfully, but these errors were encountered: