-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Schedulers.from(Executor) used by ObserveOn can block underlying Executor indefinitely. #4630
Comments
What is your source that keeps pinning the Executor? We optimize for throughput so it is possible resources get monopolized in high utilization scenarios. You could use a multi-threaded Executor or use |
Thanks for the response! I've created a commented, runnable gist to hopefully better articulate the issue (using RxJava 1.2.0). https://gist.github.com/devsr/5c6f3e5478a1773391d3eb12daa5789f The Delay operator does not seem to be a viable work around as the issue is really in the In some respects the Delay operator is actually worse. If an observer is simply computationally slow instead of using reactive pull, delay will consume the upstream source in an unbounded manner creating an unbounded number of tasks in The fundamental issue is that the Specifically in the case of the |
You are right with your observations. observeOn doesn't need trampolining but the Scheduler API has to work for all kinds of underlying executors where a Guaranteed single-threaded Executors automatically provide this guarantee but You may want to implement your own scheduler with the required fairness guarantees. Here's an example how to do it. |
I believe the FIFO guarantee could be maintained while deferring to the underlying executor. Most simply, I think, by having the current An alternative would be introducing a max drain value of some sort that could limit the number of scheduler tasks executed per executor task. That would allow preserving the current behavior and performance by setting the default for that max drain value to I'd be happy to create PR if someone would consider looking at it. |
Or you could just write a single threaded scheduler I pointed at; 2.x already has one and it may be useful in 1 afterall. |
This has less to do with single-thread vs. multi-thread and more with having |
I'm not convinced this change should be done in RxJava. You are free to implement your own |
Understood. I just think that This means for integrating with I've no problem doing what you recommend, I just figured there might be a better way via a contribution. I appreciate your time and the discussion, thank you! |
I created a commit that implements what I was talking about earlier, in case it is helpful to anyone. It keeps the current behavior by default. |
Something I missed. I originally found this behavior in v1.1.2. I didn't notice that the internals of ObserveOn changed with Issue #3795 in v1.1.3. The Any changes to the |
@akarnokd Is It seems like the decision to pin (or not pin) the thread should be left to the |
No, it is an integral part of the operator's stable-fetch behavior. |
Doesn't that make it a source of races for constructions like |
It is possible you chose the wrong structure or operators; overwhelming a single target thread is generally not desired. How does your use case look like? |
I've got a fast-producing observable feeding a state machine that is synchronized by loop thread. The issue is there are other inputs to the system and they can be bursty, so it isn't full throttle all the time, but when the consumer is overwhelmed I need the processing to be more or less fair so that one producer isn't starving all the others. I suppose it is similar to GUI design, I still need the thread to process other sources even if one source is producing lots of data. I think what I might do is create an operator downstream that schedules reactive-pull requests on the same scheduler to relieve the upstream pin. That seems like it would solve both |
I created a fair scheduling operator that protects a given scheduler from pinning by other operators in the chain. I also updated the gist from earlier so that it demonstrates the behavior of the fair scheduler. If you think this is worth cleaning up and writing unit tests for a feature request I'd be happy to do it. Otherwise I'll stop bothering you about this issue and say thanks for the discussion and help. (Enjoyed your blogging about operator implementation pitfalls btw) |
We have the |
|
I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one. |
I'm having a problem with the
Schedulers.from(Executor)
wrapper when theExecutor
is singled threaded and the resultingScheduler
is used with the ObserveOn operator.A fast producer and a slow subscriber (either with backpressure or callstack blocking) creates a situation where the ObserveOn operator never yields the
Executor
's thread and nothing submitted directly to theExecutor
can run.I think the fundamental problem is in the implementation of
rx.internal.schedulers.ExecutorScheduler.ExecutorSchedulerWorker
in therun()
method. The method will continue looping if more scheduler tasks are available instead of resubmitting itself to theExecutor
and allowing other executor tasks to run.The text was updated successfully, but these errors were encountered: