Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implicit Schedulers #815

Closed
samuelgruetter opened this issue Feb 5, 2014 · 24 comments
Closed

Implicit Schedulers #815

samuelgruetter opened this issue Feb 5, 2014 · 24 comments
Assignees
Milestone

Comments

@samuelgruetter
Copy link
Contributor

This is a proposal to use Scala's implicits to improve the Schedulers part of the RxScala API, by @vjovanov and myself.

Current situtation

There are many methods depending on a Scheduler. Since we do not want to always pass the Scheduler argument explicitly, there's a second version of each method which chooses a reasonable default Scheduler, for instance

def window(timespan: Duration): Observable[Observable[T]]
def window(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]]

Advantages of this proposal

Compared to the current situation, this proposal brings the following adavantages:

  • For method calls without an explicitly provided Scheduler, it's possible to override the chosen default Scheduler in a per-scope and per-method way.
  • All methods which existed in a version with Scheduler and one without can be replaced by one, so the API size decreases by almost 50%.

Usage examples

There are three ways of using Schedulers:

1) With default implicits: For each method, the framework chooses a reasonable default scheduler. It can be a different Scheduler for each method: For instance, for buffer, ThreadPoolForComputation() is chosen, and for from, ImmediateScheduler() is chosen.

import rx.lang.scala.schedulers.DefaultImplicits._
Observable.interval(200 millis).take(10).buffer(300 millis)
Observable.interval(200 millis).take(10).window(300 millis).flatMap(obs => obs.sum)
Observable.from(List(1, 2, 3))

2) With explicitly specified Schedulers:

Observable.interval(200 millis)(NewThreadScheduler()).take(10).buffer(300 millis)(NewThreadScheduler())
Observable.interval(200 millis)(NewThreadScheduler()).take(10)
          .window(300 millis)(NewThreadScheduler())
          .flatMap(obs => obs.sum)
Observable.from(List(1, 2, 3))(ThreadPoolForIOScheduler())

3) With custom default Schedulers:

This feature is the whole point of this proposal.

val testScheduler = TestScheduler()

// specify which default scheduler to use for which method
implicit val myImplicitScheduler1 = new Scheduler(testScheduler) 
  with DefaultBufferScheduler with DefaultWindowScheduler with DefaultIntervalScheduler  
implicit val myImplicitScheduler2 = new Scheduler(ImmediateScheduler())
  with DefaultFromScheduler

Observable.interval(200 millis).take(10).buffer(300 millis)
Observable.interval(200 millis).take(10).window(300 millis).flatMap(obs => obs.sum)
Observable.from(List(1, 2, 3))

for (t <- 0 to 2500) {
 testScheduler.advanceTimeBy(1 millis)
}

How it works

There's one marker trait (without any members) for each method in Observable which takes a Scheduler:

trait DefaultBufferScheduler
trait DefaultWindowScheduler
trait DefaultFromScheduler
trait DefaultIntervalScheduler
...

All method pairs of a method with Scheduler and its corresponding method without Scheduler are replaced by one single method with an implicit Scheduler parameter of type Scheduler with DefaultXxxScheduler, for instance

def window(timespan: Duration): Observable[Observable[T]]
def window(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]]

is replaced by

def window(timespan: Duration)(implicit scheduler: Scheduler with DefaultWindowScheduler): Observable[Observable[T]]

Further, the original

trait Scheduler {
  private [scala] val asJavaScheduler: rx.Scheduler
  ...
}

is replaced by

class Scheduler(s: rx.Scheduler) {
  def this(s: Scheduler) = this(s.asJavaScheduler) // alternative constructor
  private [scala] val asJavaScheduler: rx.Scheduler = s
  ...
}

because we need to create new Schedulers using the new keyword, such that we can specify with DefaultXxxScheduler with DefaultYyyScheduler ....

The three ways of using Schedulers work as follows:

1) Default implicits

There's

object DefaultImplicits {
  implicit val computationScheduler = new Scheduler(ThreadPoolForComputationScheduler()) 
                                      with DefaultBufferScheduler 
                                      with DefaultWindowScheduler
                                      with DefaultIntervalScheduler
  implicit val immediateScheduler =   new Scheduler(ImmediateScheduler())
                                      with DefaultFromScheduler
}

which has to be imported by all users who don't want to customize:

import rx.lang.scala.schedulers.DefaultImplicits._

This is similar to Futures requiring all users to

import scala.concurrent.ExecutionContext.Implicits.global

so users should not get too upset about it ;-)

2) With explicitly specified Schedulers

There are implicit conversions to convert a regular Scheduler to a Scheduler with DefaultXxxScheduler:

// one implicit conversion for each DefaultXxxScheduler
implicit def toDefaultBufferScheduler(s: Scheduler) = new Scheduler(s) with DefaultBufferScheduler
implicit def toDefaultWindowScheduler(s: Scheduler) = new Scheduler(s) with DefaultWindowScheduler
implicit def toDefaultFromScheduler(s: Scheduler) = new Scheduler(s) with DefaultFromScheduler
implicit def toDefaultIntervalScheduler(s: Scheduler) = new Scheduler(s) with DefaultIntervalScheduler

These implicits just unwrap the underlying Java Scheduler and wrap it again in a differently labeled Scala Scheduler.

3) With custom default Schedulers

Whenever a method xxx is called without giving a Scheduler, the Scala compiler will look for an implicit value of type DefaultXxxScheduler, issue a "could not find implicit value" error if none was found or an "ambiguous implicit values" error if multiple implicit values matched.

Advanced usage

By defining new Scheduler marker traits, users can define their own method groups which should use the same Scheduler, and new groups can also be defined in terms of smaller groups:

trait SchedulerForMethodsGroup1 extends DefaultIntervalScheduler with DefaultWindowScheduler
trait SchedulerForMethodsGroup2 extends DefaultBufferScheduler with DefaultFromScheduler
trait SchedulerForBothGroups extends SchedulerForMethodsGroup1 with SchedulerForMethodsGroup2

These could then be used as follows:

implicit val group1Scheduler = new Scheduler(ThreadPoolForComputationScheduler()) with SchedulerForMethodsGroup1
implicit val group2Scheduler = new Scheduler(NewThreadScheduler()) with SchedulerForMethodsGroup2

but also as like this:

implicit val theScheduler = new Scheduler(ThreadPoolForComputationScheduler()) with SchedulerForBothGroups

Preview

An incomplete preview can be found in this branch.

@samuelgruetter
Copy link
Contributor Author

@headinthebox and RxScala users your opinion and comments are welcome ;-)

@headinthebox
Copy link
Contributor

Need some time to digest this ...

@benjchristensen
Copy link
Member

Considering the Scala bindings are broken right now (after the recent Scheduler changes) 0.17 is waiting until Scala either fixes to the current design or moves forward on this. 0.17 is a good time for changing design since the Scheduler redesign broke everything already.

@vigdorchik
Copy link
Contributor

I must be missing something, but given the possibility to explicitly specify the required scheduler, I don't see why per-method type of the scheduler is needed? Brings too much complexity for no obvious win...

@vigdorchik
Copy link
Contributor

Also implicit defs repacking schedulers with creating new instances brings runtime overhead, right?

@vigdorchik
Copy link
Contributor

And if ImmediateScheduler is the default for all overloads without Scheduler parameter now, why not include implicit val in Observable companion object?

@samuelgruetter
Copy link
Contributor Author

In the current version, the defaults for overloads without Scheduler parameter are already different per method. For instance, the default for repeat is Schedulers.currentThread(), but the default for buffer is Schedulers.computation().
And yes rewrapping Schedulers brings some runtime overhead, but it's during the "setup phase" where the operator chain is created, and during the performance-critical "push data phase", there is no overhead.

@vjovanov
Copy link

vjovanov commented Feb 5, 2014

As for the complexity, I think that 50% of less methods in Observable justifies a bit of complexity in the implicit parameters. Furthermore, the Scala community is used to passing ExecutionContexts in the same manner. The additional marker trait, with quite a descriptive name, should not make much confusion.

@vigdorchik
Copy link
Contributor

Ok, but why is the proposed change superior to just supplying default arguments for schedulers for all methods?

@samuelgruetter
Copy link
Contributor Author

@vigdorchik look at the "Usage examples" I posted above. Just supplying default arguments for schedulers for all methods would only allow case 1) and 2) to work, but not 3).

@samuelgruetter
Copy link
Contributor Author

I did some analysis on Observable.java and found that most methods which take a Scheduler have Schedulers.computation() as default Scheduler:

  • buffer
  • debounce
  • delay
  • delaySubscription
  • parallel
  • repeat
  • replay
  • sample
  • skip
  • skipLast
  • take
  • takeLast
  • takeLastBuffer
  • throttleFirst
  • throttleLast
  • throttleWithTimeout
  • timeout
  • window
  • interval
  • timer
  • timer

However, there are also methods with a different default Scheduler:

  • repeat: Schedulers.currentThread()
  • startWith: no scheduler
  • timeInterval: Schedulers.immediate()
  • timestamp: Schedulers.immediate()
  • empty: no scheduler
  • error: no scheduler
  • from: no scheduler
  • just: no scheduler
  • merge: no scheduler
  • parallelMerge: Schedulers.currentThread()
  • range: no scheduler

As we can see, the default is always either Schedulers.computation() or "don't introduce additional concurrency". So maybe there's a much simpler solution: For the methods in the first list, have an implicit parameter of type Scheduler, and leave the methods of the second list unchanged.

@vigdorchik
Copy link
Contributor

Like the simplified variant very much!
How about also including Schedulers.computation() as implicit val in object Observable?

@samuelgruetter
Copy link
Contributor Author

@headinthebox how's your digestion going ;-) Your opinion would be appreciated

@headinthebox
Copy link
Contributor

Actually, I am not so much convinced that schedulers should be implicit anymore, since the typical scenario is that you have several different schedulers in the same query

xs.subscribeOn(s0).groupBy(...).flatMap(g => g.obServerOn(s2)).observerOn(s3)

How would that work out?

If there is a default scheduler, you don't have to pass one.

@tnine
Copy link
Contributor

tnine commented Feb 19, 2014

My 2 cents as a user. Specifying a default scheduler would be amazing. We've implemented our own scheduler, which is essentially an I/O scheduler with an upper limit. We've had to add a load of calls of .subscribeOn to all our observables. It would be nice to just configure it once on init and everything just uses that scheduler.

@benjchristensen
Copy link
Member

Something I've been wanting for a while is the ability to use the RxJavaPlugins mechanism to assign the default Scheduler factories for Schedulers.computation(), Schedulers.io() and Schedulers.newThread() so that a system wanting to change this behavior can do so in one place without needing to use the overloads everywhere.

This could be useful to use lightweight threads for example (https://github.com/puniverse/pulsar) instead of normal threads everywhere in the system. It could allow specific customization of thread behavior when an app knows a better way to do things.

Here is some code I wrote (but have not yet tested) that would allow this behavior: #905

Obviously if someone is using a specific implementation directly it would not help, but generally the Schedulers factory methods should be used, and if they are then this plugin can help.

Perhaps we can deprecate the actual Scheduler implementation constructors so only Schedulers factory methods can be used for built-in schedulers?

@samuelgruetter
Copy link
Contributor Author

@headinthebox when I wrote this proposal I thought that observeOn and subscribeOn would still have an explicit Scheduler argument, but all the other operators would get implicit Schedulers.

@lJoublanc
Copy link

This would make rx consistent with futures and actors in scala/akka, which accept an implicit execution context. And that seems to have worked out ok for them!

@benjchristensen
Copy link
Member

@benjchristensen
Copy link
Member

@headinthebox @samuelgruetter Anything to do here?

@headinthebox
Copy link
Contributor

I'd love to discuss this in person this summer when I am at EPFL. Can we leave it open for now?

@benjchristensen
Copy link
Member

Yup, it's fine leaving open. Assigned it to you for tracking purposes.

@samuelgruetter
Copy link
Contributor Author

@headinthebox that sounds great

@benjchristensen
Copy link
Member

Moved to ReactiveX/RxScala#16

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants