Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implicit Schedulers #16

Closed
benjchristensen opened this issue Sep 23, 2014 · 2 comments
Closed

Implicit Schedulers #16

benjchristensen opened this issue Sep 23, 2014 · 2 comments

Comments

@benjchristensen
Copy link
Member

Originally filed by @samuelgruetter and ReactiveX/RxJava#815


This is a proposal to use Scala's implicits to improve the Schedulers part of the RxScala API, by @vjovanov and myself.

Current situtation

There are many methods depending on a Scheduler. Since we do not want to always pass the Scheduler argument explicitly, there's a second version of each method which chooses a reasonable default Scheduler, for instance

def window(timespan: Duration): Observable[Observable[T]]
def window(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]]

Advantages of this proposal

Compared to the current situation, this proposal brings the following adavantages:

  • For method calls without an explicitly provided Scheduler, it's possible to override the chosen default Scheduler in a per-scope and per-method way.
  • All methods which existed in a version with Scheduler and one without can be replaced by one, so the API size decreases by almost 50%.

Usage examples

There are three ways of using Schedulers:

1) With default implicits: For each method, the framework chooses a reasonable default scheduler. It can be a different Scheduler for each method: For instance, for buffer, ThreadPoolForComputation() is chosen, and for from, ImmediateScheduler() is chosen.

import rx.lang.scala.schedulers.DefaultImplicits._
Observable.interval(200 millis).take(10).buffer(300 millis)
Observable.interval(200 millis).take(10).window(300 millis).flatMap(obs => obs.sum)
Observable.from(List(1, 2, 3))

2) With explicitly specified Schedulers:

Observable.interval(200 millis)(NewThreadScheduler()).take(10).buffer(300 millis)(NewThreadScheduler())
Observable.interval(200 millis)(NewThreadScheduler()).take(10)
          .window(300 millis)(NewThreadScheduler())
          .flatMap(obs => obs.sum)
Observable.from(List(1, 2, 3))(ThreadPoolForIOScheduler())

3) With custom default Schedulers:

This feature is the whole point of this proposal.

val testScheduler = TestScheduler()

// specify which default scheduler to use for which method
implicit val myImplicitScheduler1 = new Scheduler(testScheduler) 
  with DefaultBufferScheduler with DefaultWindowScheduler with DefaultIntervalScheduler  
implicit val myImplicitScheduler2 = new Scheduler(ImmediateScheduler())
  with DefaultFromScheduler

Observable.interval(200 millis).take(10).buffer(300 millis)
Observable.interval(200 millis).take(10).window(300 millis).flatMap(obs => obs.sum)
Observable.from(List(1, 2, 3))

for (t <- 0 to 2500) {
 testScheduler.advanceTimeBy(1 millis)
}

How it works

There's one marker trait (without any members) for each method in Observable which takes a Scheduler:

trait DefaultBufferScheduler
trait DefaultWindowScheduler
trait DefaultFromScheduler
trait DefaultIntervalScheduler
...

All method pairs of a method with Scheduler and its corresponding method without Scheduler are replaced by one single method with an implicit Scheduler parameter of type Scheduler with DefaultXxxScheduler, for instance

def window(timespan: Duration): Observable[Observable[T]]
def window(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]]

is replaced by

def window(timespan: Duration)(implicit scheduler: Scheduler with DefaultWindowScheduler): Observable[Observable[T]]

Further, the original

trait Scheduler {
  private [scala] val asJavaScheduler: rx.Scheduler
  ...
}

is replaced by

class Scheduler(s: rx.Scheduler) {
  def this(s: Scheduler) = this(s.asJavaScheduler) // alternative constructor
  private [scala] val asJavaScheduler: rx.Scheduler = s
  ...
}

because we need to create new Schedulers using the new keyword, such that we can specify with DefaultXxxScheduler with DefaultYyyScheduler ....

The three ways of using Schedulers work as follows:

1) Default implicits

There's

object DefaultImplicits {
  implicit val computationScheduler = new Scheduler(ThreadPoolForComputationScheduler()) 
                                      with DefaultBufferScheduler 
                                      with DefaultWindowScheduler
                                      with DefaultIntervalScheduler
  implicit val immediateScheduler =   new Scheduler(ImmediateScheduler())
                                      with DefaultFromScheduler
}

which has to be imported by all users who don't want to customize:

import rx.lang.scala.schedulers.DefaultImplicits._

This is similar to Futures requiring all users to

import scala.concurrent.ExecutionContext.Implicits.global

so users should not get too upset about it ;-)

2) With explicitly specified Schedulers

There are implicit conversions to convert a regular Scheduler to a Scheduler with DefaultXxxScheduler:

// one implicit conversion for each DefaultXxxScheduler
implicit def toDefaultBufferScheduler(s: Scheduler) = new Scheduler(s) with DefaultBufferScheduler
implicit def toDefaultWindowScheduler(s: Scheduler) = new Scheduler(s) with DefaultWindowScheduler
implicit def toDefaultFromScheduler(s: Scheduler) = new Scheduler(s) with DefaultFromScheduler
implicit def toDefaultIntervalScheduler(s: Scheduler) = new Scheduler(s) with DefaultIntervalScheduler

These implicits just unwrap the underlying Java Scheduler and wrap it again in a differently labeled Scala Scheduler.

3) With custom default Schedulers

Whenever a method xxx is called without giving a Scheduler, the Scala compiler will look for an implicit value of type DefaultXxxScheduler, issue a "could not find implicit value" error if none was found or an "ambiguous implicit values" error if multiple implicit values matched.

Advanced usage

By defining new Scheduler marker traits, users can define their own method groups which should use the same Scheduler, and new groups can also be defined in terms of smaller groups:

trait SchedulerForMethodsGroup1 extends DefaultIntervalScheduler with DefaultWindowScheduler
trait SchedulerForMethodsGroup2 extends DefaultBufferScheduler with DefaultFromScheduler
trait SchedulerForBothGroups extends SchedulerForMethodsGroup1 with SchedulerForMethodsGroup2

These could then be used as follows:

implicit val group1Scheduler = new Scheduler(ThreadPoolForComputationScheduler()) with SchedulerForMethodsGroup1
implicit val group2Scheduler = new Scheduler(NewThreadScheduler()) with SchedulerForMethodsGroup2

but also as like this:

implicit val theScheduler = new Scheduler(ThreadPoolForComputationScheduler()) with SchedulerForBothGroups

Preview

An incomplete preview can be found in this branch.

@benjchristensen
Copy link
Member Author

See discussion history at ReactiveX/RxJava#815 prior to RxScala project.

@samuelgruetter
Copy link
Collaborator

I think this solution is too complicated to be adopted, and some customization of schedulers is already allowed by RxJava's RxJavaSchedulersHook, so I'm closing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants