Skip to content

Commit

Permalink
Add default Scheduler parameter to cover up removed RxJava defined me…
Browse files Browse the repository at this point in the history
…thods

Did not remove some Scheduler parameter counterpart method candidates or add a default Scheduler value, due to various reasons for these methods:

```
Error:(106, 7) in trait Observable, multiple overloaded alternatives of method tumblingBuffer define default arguments.
trait Observable[+T]

def tumblingBuffer(Duration)
def tumbling(Duration, Int)
def replay(Int, Duration)
def replay(Int)
def replay(Duration)
def takeRight(Int, Duration)
def timeout[U >: T](Duration, Observable[U])
def repeat
def timer(Duration, Duration)
```

```
No default Scheduler defined in RxJava:

def sliding(Duration, Duration, Int, Scheduler)
def replay[R](Observable[T] => Observable[R], Duration, Scheduler)
```

```
Does not operate on a specific Scheduler by default:

def replay[R](Observable[T] => Observable[R], Int)
def replay[R](Observable[T] => Observable[R])
```
  • Loading branch information
jbripley committed Oct 11, 2014
1 parent c822f6f commit 362db2a
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rx.lang.scala

import rx.functions.FuncN
import rx.lang.scala.observables.ConnectableObservable
import rx.lang.scala.schedulers.{ImmediateScheduler, TrampolineScheduler, ComputationScheduler}
import scala.concurrent.duration
import java.util
import collection.JavaConversions._
Expand Down Expand Up @@ -353,7 +354,7 @@ trait Observable[+T]
* @return an Observable that emits timestamped items from the source
* Observable with timestamps provided by the given Scheduler
*/
def timestamp(scheduler: Scheduler): Observable[(Long, T)] = {
def timestamp(scheduler: Scheduler = ImmediateScheduler()): Observable[(Long, T)] = {
toScalaObservable[rx.schedulers.Timestamped[_ <: T]](asJavaObservable.timestamp(scheduler))
.map((t: rx.schedulers.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
}
Expand Down Expand Up @@ -548,7 +549,7 @@ trait Observable[+T]
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers which are emitted after
* a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
*/
def tumblingBuffer(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Seq[T]] = {
def tumblingBuffer(timespan: Duration, count: Int, scheduler: Scheduler = ComputationScheduler()): Observable[Seq[T]] = {
val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, count, scheduler)
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}
Expand All @@ -569,7 +570,7 @@ trait Observable[+T]
* An [[rx.lang.scala.Observable]] which produces new buffers periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
def slidingBuffer(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
def slidingBuffer(timespan: Duration, timeshift: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Seq[T]] = {
val span: Long = timespan.length
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
val unit: TimeUnit = timespan.unit
Expand Down Expand Up @@ -707,7 +708,7 @@ trait Observable[+T]
* @return
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows with a fixed duration.
*/
def tumbling(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
def tumbling(timespan: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Observable[T]] = {
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit, scheduler))
: Observable[Observable[T]] // SI-7818
}
Expand Down Expand Up @@ -770,7 +771,7 @@ trait Observable[+T]
* An [[rx.lang.scala.Observable]] which produces new windows periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
def sliding(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
def sliding(timespan: Duration, timeshift: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Observable[T]] = {
val span: Long = timespan.length
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
val unit: TimeUnit = timespan.unit
Expand Down Expand Up @@ -1257,7 +1258,7 @@ trait Observable[+T]
* replays no more than `bufferSize` items that were emitted within the window defined by `time`
* @throws IllegalArgumentException if `bufferSize` is less than zero
*/
def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler): Observable[R] = {
def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[R] = {
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
(jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
Expand Down Expand Up @@ -1664,7 +1665,7 @@ trait Observable[+T]
* @return an Observable that emits the results of sampling the items emitted by the source
* Observable at the specified time interval
*/
def sample(duration: Duration, scheduler: Scheduler): Observable[T] = {
def sample(duration: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.sample(duration.length, duration.unit, scheduler))
}

Expand Down Expand Up @@ -1775,7 +1776,7 @@ trait Observable[+T]
* @return an Observable that drops values emitted by the source Observable before the time window defined
* by `time` elapses and emits the remainder
*/
def drop(time: Duration, scheduler: Scheduler): Observable[T] = {
def drop(time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable(asJavaObservable.skip(time.length, time.unit, scheduler))
}

Expand Down Expand Up @@ -1826,7 +1827,7 @@ trait Observable[+T]
* @return an Observable that drops those items emitted by the source Observable in a time window before the
* source completes defined by `time` and `scheduler`
*/
def dropRight(time: Duration, scheduler: Scheduler): Observable[T] = {
def dropRight(time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable(asJavaObservable.skipLast(time.length, time.unit, scheduler))
}

Expand Down Expand Up @@ -1877,7 +1878,7 @@ trait Observable[+T]
* @return an Observable that emits those items emitted by the source Observable before the time runs out,
* according to the specified Scheduler
*/
def take(time: Duration, scheduler: Scheduler) {
def take(time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.take(time.length, time.unit, scheduler.asJavaScheduler))
}

Expand Down Expand Up @@ -1926,7 +1927,7 @@ trait Observable[+T]
* time before the Observable completed specified by `time`, where the timing information is
* provided by `scheduler`
*/
def takeRight(time: Duration, scheduler: Scheduler): Observable[T] = {
def takeRight(time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit, scheduler.asJavaScheduler))
}

Expand Down Expand Up @@ -2345,7 +2346,7 @@ trait Observable[+T]
* @return Observable which performs the throttle operation.
* @see `Observable.throttleWithTimeout`
*/
def debounce(timeout: Duration, scheduler: Scheduler): Observable[T] = {
def debounce(timeout: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.debounce(timeout.length, timeout.unit, scheduler))
}

Expand All @@ -2363,7 +2364,7 @@ trait Observable[+T]
* @return Observable which performs the throttle operation.
* @see `Observable.debounce`
*/
def throttleWithTimeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
def throttleWithTimeout(timeout: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit, scheduler))
}

Expand All @@ -2380,7 +2381,7 @@ trait Observable[+T]
* The [[rx.lang.scala.Scheduler]] to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
def throttleFirst(skipDuration: Duration, scheduler: Scheduler): Observable[T] = {
def throttleFirst(skipDuration: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit, scheduler))
}

Expand Down Expand Up @@ -2410,7 +2411,7 @@ trait Observable[+T]
* Duration of windows within with the last value will be chosen.
* @return Observable which performs the throttle operation.
*/
def throttleLast(intervalDuration: Duration, scheduler: Scheduler): Observable[T] = {
def throttleLast(intervalDuration: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
}

Expand Down Expand Up @@ -2447,7 +2448,7 @@ trait Observable[+T]
* @return the source Observable modified to notify observers of a
* `TimeoutException` in case of a timeout
*/
def timeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
def timeout(timeout: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit, scheduler.asJavaScheduler))
}

Expand Down Expand Up @@ -2992,7 +2993,7 @@ trait Observable[+T]
* @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler): Observable[T] = {
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler = TrampolineScheduler()): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] =
(jOt: rx.Observable[_ <: Throwable]) => {
val ot = toScalaObservable[Throwable](jOt)
Expand Down Expand Up @@ -3044,7 +3045,7 @@ trait Observable[+T]
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeat()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
*/
def repeat(count: Long, scheduler: Scheduler): Observable[T] = {
def repeat(count: Long, scheduler: Scheduler = TrampolineScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.repeat(count, scheduler))
}

Expand Down Expand Up @@ -3098,7 +3099,7 @@ trait Observable[+T]
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = {
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler = TrampolineScheduler()): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] =
(jOv: rx.Observable[_ <: Void]) => {
val ov = toScalaObservable[Void](jOv)
Expand Down Expand Up @@ -3311,7 +3312,7 @@ trait Observable[+T]
* @param scheduler the Scheduler to use for delaying
* @return the source Observable shifted in time by the specified delay
*/
def delay(delay: Duration, scheduler: Scheduler): Observable[T] = {
def delay(delay: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit, scheduler))
}

Expand Down Expand Up @@ -3377,7 +3378,7 @@ trait Observable[+T]
* @return an Observable that delays the subscription to the source Observable by a given
* amount, waiting and subscribing on the given Scheduler
*/
def delaySubscription(delay: Duration, scheduler: Scheduler): Observable[T] = {
def delaySubscription(delay: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler))
}

Expand Down Expand Up @@ -3522,7 +3523,7 @@ trait Observable[+T]
* @param scheduler the [[Scheduler]] used to compute time intervals
* @return an Observable that emits time interval information items
*/
def timeInterval(scheduler: Scheduler): Observable[(Duration, T)] = {
def timeInterval(scheduler: Scheduler = ImmediateScheduler()): Observable[(Duration, T)] = {
toScalaObservable(asJavaObservable.timeInterval(scheduler.asJavaScheduler))
.map(inv => (Duration(inv.getIntervalInMilliseconds, MILLISECONDS), inv.getValue))
}
Expand Down Expand Up @@ -4249,7 +4250,7 @@ object Observable {
* the scheduler to use
* @return An Observable that emits a number each time interval.
*/
def interval(period: Duration, scheduler: Scheduler): Observable[Long] = {
def interval(period: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Long] = {
toScalaObservable[java.lang.Long](rx.Observable.interval(period.length, period.unit, scheduler)).map(_.longValue())
}

Expand Down Expand Up @@ -4299,7 +4300,7 @@ object Observable {
* @param scheduler the Scheduler to use for scheduling the item
* @return Observable that emits `0L` after a specified delay, on a specified Scheduler, and then completes
*/
def timer(delay: Duration, scheduler: Scheduler): Observable[Long] = {
def timer(delay: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Long] = {
toScalaObservable[java.lang.Long](rx.Observable.timer(delay.length, delay.unit, scheduler)).map(_.longValue())
}

Expand Down

0 comments on commit 362db2a

Please sign in to comment.