Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedulers.from(Executor) used by ObserveOn can block underlying Executor indefinitely. #4630

Closed
devsr opened this issue Sep 29, 2016 · 19 comments
Labels

Comments

@devsr
Copy link

devsr commented Sep 29, 2016

I'm having a problem with the Schedulers.from(Executor) wrapper when the Executor is singled threaded and the resulting Scheduler is used with the ObserveOn operator.

A fast producer and a slow subscriber (either with backpressure or callstack blocking) creates a situation where the ObserveOn operator never yields the Executor's thread and nothing submitted directly to the Executor can run.

I think the fundamental problem is in the implementation of rx.internal.schedulers.ExecutorScheduler.ExecutorSchedulerWorker in the run() method. The method will continue looping if more scheduler tasks are available instead of resubmitting itself to the Executor and allowing other executor tasks to run.

@akarnokd
Copy link
Member

What is your source that keeps pinning the Executor? We optimize for throughput so it is possible resources get monopolized in high utilization scenarios. You could use a multi-threaded Executor or use delay(0, TimeUnit.NANOSECONDS, scheduler) which schedules tasks individually, giving the opportunity for interleaving.

@devsr
Copy link
Author

devsr commented Sep 29, 2016

Thanks for the response!

I've created a commented, runnable gist to hopefully better articulate the issue (using RxJava 1.2.0).

https://gist.github.com/devsr/5c6f3e5478a1773391d3eb12daa5789f

The Delay operator does not seem to be a viable work around as the issue is really in the ExecutorScheduler not the ObserveOn operator. delay exhibits the same behavior.

In some respects the Delay operator is actually worse. If an observer is simply computationally slow instead of using reactive pull, delay will consume the upstream source in an unbounded manner creating an unbounded number of tasks in ExecutorScheduler which never yield to the underlying Executor. Contrast with ObserveOn which only consumes the upstream source in RxRingBuffer.SIZE chunks using backpressure.

The fundamental issue is that the ExecutorScheduler coalesces multiple task submissions in to a single call instead of deferring by resubmitting itself to the underlying Executor.

Specifically in the case of the ObserveOn operator the benefit of ExecutorScheduler task coalescence optimization is reduced because ObserveOn is already doing task coalescence itself. I don't think that deferring to the underlying Executor would introduce that much more overhead, but I couldn't be sure without benchmarking it.

@akarnokd
Copy link
Member

You are right with your observations. observeOn doesn't need trampolining but the Scheduler API has to work for all kinds of underlying executors where a Worker must be strictly FIFO for non-delayed tasks which is ensured by queue/drain you see.

Guaranteed single-threaded Executors automatically provide this guarantee but Schedulers.from can't know this.

You may want to implement your own scheduler with the required fairness guarantees. Here's an example how to do it.

@devsr
Copy link
Author

devsr commented Sep 29, 2016

I believe the FIFO guarantee could be maintained while deferring to the underlying executor. Most simply, I think, by having the current run() resubmit if there are there are still outstanding tasks (i.e. wip != 0) instead of looping would preserve FIFO.

An alternative would be introducing a max drain value of some sort that could limit the number of scheduler tasks executed per executor task. That would allow preserving the current behavior and performance by setting the default for that max drain value to Long.MAX_VALUE. A new static method on Schedulers could be introduced such as Schedulers.from(Executor, long) to accept that value.

I'd be happy to create PR if someone would consider looking at it.

@akarnokd
Copy link
Member

Or you could just write a single threaded scheduler I pointed at; 2.x already has one and it may be useful in 1 afterall.

@devsr
Copy link
Author

devsr commented Sep 29, 2016

This has less to do with single-thread vs. multi-thread and more with having Schedulers.from(Executor) allow cooperative multitasking with the other users of the underlying Executor. Even in a multi-threaded scenario if I'm using a fixed thread pool I don't want the Scheduler holding a thread for indefinite periods of time.

@akarnokd
Copy link
Member

I'm not convinced this change should be done in RxJava. You are free to implement your own Scheduler and use it in your project or publish it in its own library to those who also need that specific behavior.

@devsr
Copy link
Author

devsr commented Sep 29, 2016

Understood.

I just think that Schedulers.from(Executor) is being opinionated about multitasking. In order to do cooperative multitasking all users must use Scheduler API instead of the Executor API directly.

This means for integrating with Executor driven code, I'd have to rewrap Scheduler.from(Executor) again with an Executor facade and use that or implement a custom scheduler as you say.

I've no problem doing what you recommend, I just figured there might be a better way via a contribution. I appreciate your time and the discussion, thank you!

@devsr
Copy link
Author

devsr commented Sep 30, 2016

I created a commit that implements what I was talking about earlier, in case it is helpful to anyone. It keeps the current behavior by default.

@devsr
Copy link
Author

devsr commented Sep 30, 2016

Something I missed. I originally found this behavior in v1.1.2. I didn't notice that the internals of ObserveOn changed with Issue #3795 in v1.1.3. The request(int) call was moved inside the emission loop instead of being the last thing the method does before returning.

Any changes to the ExecutorScheduler are moot because ObserveOn itself will not yield in a high throughput situation.

@devsr
Copy link
Author

devsr commented Sep 30, 2016

@akarnokd Is ObserveOn's thread pinning worth opening a new issue?

It seems like the decision to pin (or not pin) the thread should be left to the Scheduler which is basically what we discussed in this issue isn't it?

@akarnokd
Copy link
Member

No, it is an integral part of the operator's stable-fetch behavior.

@devsr
Copy link
Author

devsr commented Sep 30, 2016

Doesn't that make it a source of races for constructions like .observeOn(AndroidSchedulers.mainThread()) or basically any other situation where you'd want to use the operator to feed a single (or fixed) threaded environment?

@akarnokd
Copy link
Member

It is possible you chose the wrong structure or operators; overwhelming a single target thread is generally not desired. How does your use case look like?

@devsr
Copy link
Author

devsr commented Sep 30, 2016

I've got a fast-producing observable feeding a state machine that is synchronized by loop thread. The issue is there are other inputs to the system and they can be bursty, so it isn't full throttle all the time, but when the consumer is overwhelmed I need the processing to be more or less fair so that one producer isn't starving all the others.

I suppose it is similar to GUI design, I still need the thread to process other sources even if one source is producing lots of data.

I think what I might do is create an operator downstream that schedules reactive-pull requests on the same scheduler to relieve the upstream pin. That seems like it would solve both ExecutorScheduler and ObserveOn pins at the same time.

@devsr
Copy link
Author

devsr commented Oct 1, 2016

@akarnokd

I created a fair scheduling operator that protects a given scheduler from pinning by other operators in the chain.

I also updated the gist from earlier so that it demonstrates the behavior of the fair scheduler.

If you think this is worth cleaning up and writing unit tests for a feature request I'd be happy to do it. Otherwise I'll stop bothering you about this issue and say thanks for the discussion and help. (Enjoyed your blogging about operator implementation pitfalls btw)

@akarnokd
Copy link
Member

akarnokd commented Oct 1, 2016

We have the rebatchRequests and you can limit the fetch amount on observeOn.

@devsr
Copy link
Author

devsr commented Oct 1, 2016

rebatchRequests does not appear to relieve pinning.

Demonstration gist

@akarnokd
Copy link
Member

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants