diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index 5c75863d..049a02cd 100755 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -343,18 +343,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.serialize) } - /** - * Wraps each item emitted by a source Observable in a timestamped tuple. - * - * - * - * @return an Observable that emits timestamped items from the source Observable - */ - def timestamp: Observable[(Long, T)] = { - toScalaObservable[rx.schedulers.Timestamped[_ <: T]](asJavaObservable.timestamp()) - .map((t: rx.schedulers.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue)) - } - /** * Wraps each item emitted by a source Observable in a timestamped tuple * with timestamps provided by the given Scheduler. @@ -543,26 +531,6 @@ trait Observable[+T] Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) } - /** - * Creates an Observable which produces buffers of collected values. This Observable produces connected - * non-overlapping buffers, each of a fixed duration specified by the `timespan` argument or a maximum size - * specified by the `count` argument (which ever is reached first). When the source Observable completes - * or encounters an error, the current buffer is emitted and the event is propagated. - * - * @param timespan - * The period of time each buffer is collecting values before it should be emitted, and - * replaced with a new buffer. - * @param count - * The maximum size of each buffer before it should be emitted. - * @return - * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers which are emitted after - * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). - */ - def tumblingBuffer(timespan: Duration, count: Int): Observable[Seq[T]] = { - val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, count) - Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) - } - /** * Creates an Observable which produces buffers of collected values. This Observable produces connected * non-overlapping buffers, each of a fixed duration specified by the `timespan` argument or a maximum size @@ -585,28 +553,6 @@ trait Observable[+T] Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) } - /** - * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer - * periodically, which is determined by the `timeshift` argument. Each buffer is emitted after a fixed timespan - * specified by the `timespan` argument. When the source Observable completes or encounters an error, the - * current buffer is emitted and the event is propagated. - * - * @param timespan - * The period of time each buffer is collecting values before it should be emitted. - * @param timeshift - * The period of time after which a new buffer will be created. - * @return - * An [[rx.lang.scala.Observable]] which produces new buffers periodically, and these are emitted after - * a fixed timespan has elapsed. - */ - def slidingBuffer(timespan: Duration, timeshift: Duration): Observable[Seq[T]] = { - val span: Long = timespan.length - val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit) - val unit: TimeUnit = timespan.unit - val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(span, shift, unit) - Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) - } - /** * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer * periodically, which is determined by the `timeshift` argument. Each buffer is emitted after a fixed timespan @@ -748,22 +694,6 @@ trait Observable[+T] : Observable[Observable[T]] // SI-7818 } - /** - * Creates an Observable which produces windows of collected values. This Observable produces connected - * non-overlapping windows, each of a fixed duration specified by the `timespan` argument. When the source - * Observable completes or encounters an error, the current window is emitted and the event is propagated. - * - * @param timespan - * The period of time each window is collecting values before it should be emitted, and - * replaced with a new window. - * @return - * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows with a fixed duration. - */ - def tumbling(timespan: Duration): Observable[Observable[T]] = { - Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit)) - : Observable[Observable[T]] // SI-7818 - } - /** * Creates an Observable which produces windows of collected values. This Observable produces connected * non-overlapping windows, each of a fixed duration specified by the `timespan` argument. When the source @@ -824,28 +754,6 @@ trait Observable[+T] : Observable[Observable[T]] // SI-7818 } - /** - * Creates an Observable which produces windows of collected values. This Observable starts a new window - * periodically, which is determined by the `timeshift` argument. Each window is emitted after a fixed timespan - * specified by the `timespan` argument. When the source Observable completes or encounters an error, the - * current window is emitted and the event is propagated. - * - * @param timespan - * The period of time each window is collecting values before it should be emitted. - * @param timeshift - * The period of time after which a new window will be created. - * @return - * An [[rx.lang.scala.Observable]] which produces new windows periodically, and these are emitted after - * a fixed timespan has elapsed. - */ - def sliding(timespan: Duration, timeshift: Duration): Observable[Observable[T]] = { - val span: Long = timespan.length - val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit) - val unit: TimeUnit = timespan.unit - Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(span, shift, unit)) - : Observable[Observable[T]] // SI-7818 - } - /** * Creates an Observable which produces windows of collected values. This Observable starts a new window * periodically, which is determined by the `timeshift` argument. Each window is emitted after a fixed timespan @@ -1332,28 +1240,6 @@ trait Observable[+T] toScalaObservable[R](thisJava.replay(fJava, bufferSize)) } - /** - * Returns an Observable that emits items that are the results of invoking a specified selector on items - * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, - * replaying no more than `bufferSize` items that were emitted within a specified time window. - *

- * - * - * @param selector a selector function, which can use the multicasted sequence as many times as needed, without - * causing multiple subscriptions to the Observable - * @param bufferSize the buffer size that limits the number of items the connectable observable can replay - * @param time the duration of the window in which the replayed items must have been emitted - * @return an Observable that emits items that are the results of invoking the selector on items emitted by - * a `ConnectableObservable` that shares a single subscription to the source Observable, and - * replays no more than `bufferSize` items that were emitted within the window defined by `time` - */ - def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, time: Duration): Observable[R] = { - val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]] - val fJava: Func1[rx.Observable[T], rx.Observable[R]] = - (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] - toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit)) - } - /** * Returns an Observable that emits items that are the results of invoking a specified selector on items * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, @@ -1473,27 +1359,6 @@ trait Observable[+T] new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit, scheduler)) } - /** - * Returns an Observable that emits items that are the results of invoking a specified selector on items - * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, - * replaying all items that were emitted within a specified time window. - *

- * - * - * @param selector a selector function, which can use the multicasted sequence as many times as needed, without - * causing multiple subscriptions to the Observable - * @param time the duration of the window in which the replayed items must have been emitted - * @return an Observable that emits items that are the results of invoking the selector on items emitted by - * a `ConnectableObservable` that shares a single subscription to the source Observable, - * replaying all items that were emitted within the window defined by `time` - */ - def replay[R](selector: Observable[T] => Observable[R], time: Duration): Observable[R] = { - val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]] - val fJava: Func1[rx.Observable[T], rx.Observable[R]] = - (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] - toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit)) - } - /** * Returns a `ConnectableObservable` that shares a single subscription to the source Observable that * replays at most `bufferSize` items emitted by that Observable. @@ -1787,20 +1652,6 @@ trait Observable[+T] })) } - /** - * Returns an Observable that emits the results of sampling the items emitted by the source - * Observable at a specified time interval. - * - * - * - * @param duration the sampling rate - * @return an Observable that emits the results of sampling the items emitted by the source - * Observable at the specified time interval - */ - def sample(duration: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.sample(duration.length, duration.unit)) - } - /** * Returns an Observable that emits the results of sampling the items emitted by the source * Observable at a specified time interval. @@ -1913,20 +1764,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.skip(n)) } - /** - * Returns an Observable that drops values emitted by the source Observable before a specified time window - * elapses. - * - * - * - * @param time the length of the time window to drop - * @return an Observable that drops values emitted by the source Observable before the time window defined - * by `time` elapses and emits the remainder - */ - def drop(time: Duration): Observable[T] = { - toScalaObservable(asJavaObservable.skip(time.length, time.unit)) - } - /** * Returns an Observable that drops values emitted by the source Observable before a specified time window * elapses. @@ -1976,22 +1813,6 @@ trait Observable[+T] toScalaObservable(asJavaObservable.skipLast(n)) } - /** - * Returns an Observable that drops items emitted by the source Observable during a specified time window - * before the source completes. - *

- * - * - * Note: this action will cache the latest items arriving in the specified time window. - * - * @param time the length of the time window - * @return an Observable that drops those items emitted by the source Observable in a time window before the - * source completes defined by `time` - */ - def dropRight(time: Duration): Observable[T] = { - toScalaObservable(asJavaObservable.skipLast(time.length, time.unit)) - } - /** * Returns an Observable that drops items emitted by the source Observable during a specified time window * (defined on a specified scheduler) before the source completes. @@ -2045,18 +1866,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.take(n)) } - /** - * Returns an Observable that emits those items emitted by source Observable before a specified time runs out. - *

- * - * - * @param time the length of the time window - * @return an Observable that emits those items emitted by the source Observable before the time runs out - */ - def take(time: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.take(time.length, time.unit)) - } - /** * Returns an Observable that emits those items emitted by source Observable before a specified time (on * specified Scheduler) runs out @@ -2104,20 +1913,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.takeLast(count)) } - /** - * Return an Observable that emits the items from the source Observable that were emitted in a specified - * window of `time` before the Observable completed. - *

- * - * - * @param time the length of the time window - * @return an Observable that emits the items from the source Observable that were emitted in the window of - * time before the Observable completed specified by `time` - */ - def takeRight(time: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit)) - } - /** * Return an Observable that emits the items from the source Observable that were emitted in a specified * window of `time` before the Observable completed, where the timing information is provided by a specified @@ -2517,25 +2312,6 @@ trait Observable[+T] toScalaObservable[R](rx.Observable.combineLatest[T, U, R](this.asJavaObservable, that.asJavaObservable, selector)) } - /** - * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. - * - * NOTE: If events keep firing faster than the timeout then no data will be emitted. - * - * - * - * $debounceVsThrottle - * - * @param timeout - * The time each value has to be 'the most recent' of the [[rx.lang.scala.Observable]] to ensure that it's not dropped. - * - * @return An [[rx.lang.scala.Observable]] which filters out values which are too quickly followed up with newer values. - * @see `Observable.debounce` - */ - def throttleWithTimeout(timeout: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit)) - } - /** * Return an Observable that mirrors the source Observable, except that it drops items emitted by the source * Observable that are followed by another item within a computed debounce duration. @@ -2553,25 +2329,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.debounce[Any](fJava)) } - /** - * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. - * - * NOTE: If events keep firing faster than the timeout then no data will be emitted. - * - * - * - * $debounceVsThrottle - * - * @param timeout - * The time each value has to be 'the most recent' of the [[rx.lang.scala.Observable]] to ensure that it's not dropped. - * - * @return An [[rx.lang.scala.Observable]] which filters out values which are too quickly followed up with newer values. - * @see `Observable.throttleWithTimeout` - */ - def debounce(timeout: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.debounce(timeout.length, timeout.unit)) - } - /** * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. * @@ -2642,21 +2399,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit)) } - /** - * Throttles by returning the last value of each interval defined by 'intervalDuration'. - * - * This differs from `Observable.throttleFirst` in that this ticks along at a scheduled interval whereas `Observable.throttleFirst` does not tick, it just tracks passage of time. - * - * - * - * @param intervalDuration - * Duration of windows within with the last value will be chosen. - * @return Observable which performs the throttle operation. - */ - def throttleLast(intervalDuration: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit)) - } - /** * Throttles by returning the last value of each interval defined by 'intervalDuration'. * @@ -2672,22 +2414,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler)) } - /** - * Applies a timeout policy for each item emitted by the Observable, using - * the specified scheduler to run timeout timers. If the next item isn't - * observed within the specified timeout duration starting from its - * predecessor, observers are notified of a `TimeoutException`. - *

- * - * - * @param timeout maximum duration between items before a timeout occurs - * @return the source Observable modified to notify observers of a - * `TimeoutException` in case of a timeout - */ - def timeout(timeout: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit)) - } - /** * Applies a timeout policy for each item emitted by the Observable, using * the specified scheduler to run timeout timers. If the next item isn't @@ -3255,36 +2981,6 @@ trait Observable[+T] * *

*
Scheduler:
- *
`retryWhen` operates by default on the `trampoline` [[Scheduler]].
- *
- * - * @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting the - * retry - * @return the source Observable modified with retry logic - * @see RxJava Wiki: retryWhen() - * @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example - * @since 0.20 - */ - def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any]): Observable[T] = { - val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] = - (jOt: rx.Observable[_ <: Throwable]) => { - val ot = toScalaObservable[Throwable](jOt) - notificationHandler(ot).asJavaObservable - } - - toScalaObservable[T](asJavaObservable.retryWhen(f)) - } - - /** - * Returns an Observable that emits the same values as the source observable with the exception of an `onError`. - * An onError will emit a [[Throwable]] to the Observable provided as an argument to the notificationHandler - * func. If the Observable returned `onCompletes` or `onErrors` then retry will call `onCompleted` - * or `onError` on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler. - *

- * - *

- *

- *
Scheduler:
*
you specify which [[Scheduler]] this operator will use
*
* @@ -3334,22 +3030,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.repeat(scheduler)) } - /** - * Returns an Observable that repeats the sequence of items emitted by the source Observable at most `count` times. - *

- * - * - * @param count the number of times the source Observable items are repeated, - * a count of 0 will yield an empty sequence - * @return an Observable that repeats the sequence of items emitted by the source Observable at most `count` times - * @throws IllegalArgumentException if `count` is less than zero - * @see RxJava Wiki: repeat() - * @see MSDN: Observable.Repeat - */ - def repeat(count: Long): Observable[T] = { - toScalaObservable[T](asJavaObservable.repeat(count)) - } - /** * Returns an Observable that repeats the sequence of items emitted by the source Observable * at most `count` times, on a particular Scheduler. @@ -3377,37 +3057,6 @@ trait Observable[+T] * resubscribe to the source Observable, on a particular Scheduler. *

* - *

- *
Scheduler:
- *
you specify which [[Scheduler]] this operator will use
- *
- * - * @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat. - * @param scheduler the Scheduler to emit the items on - * @return the source Observable modified with repeat logic - * @see RxJava Wiki: repeatWhen() - * @see MSDN: Observable.Repeat - * @since 0.20 - */ - def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = { - val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] = - (jOv: rx.Observable[_ <: Void]) => { - val ov = toScalaObservable[Void](jOv) - notificationHandler(ov.map( _ => Unit )).asJavaObservable - } - - toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler)) - } - - /** - * Returns an Observable that emits the same values as the source Observable with the exception of an - * `onCompleted`. An `onCompleted` notification from the source will result in the emission of - * a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler` - * function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will - * call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will - * resubscribe to the source observable. - *

- * * * @example * @@ -3443,19 +3092,20 @@ trait Observable[+T] * * * @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat. + * @param scheduler the Scheduler to emit the items on * @return the source Observable modified with repeat logic * @see RxJava Wiki: repeatWhen() * @see MSDN: Observable.Repeat * @since 0.20 */ - def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any]): Observable[T] = { + def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = { val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] = (jOv: rx.Observable[_ <: Void]) => { val ov = toScalaObservable[Void](jOv) notificationHandler(ov.map( _ => Unit )).asJavaObservable } - toScalaObservable[T](asJavaObservable.repeatWhen(f)) + toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler)) } /** @@ -3651,19 +3301,6 @@ trait Observable[+T] toScalaObservable[U](rx.Observable.amb(thisJava, thatJava)) } - /** - * Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a - * specified delay. Error notifications from the source Observable are not delayed. - * - * - * - * @param delay the delay to shift the source by - * @return the source Observable shifted in time by the specified delay - */ - def delay(delay: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit)) - } - /** * Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a * specified delay. Error notifications from the source Observable are not delayed. @@ -3729,18 +3366,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.delay[Any, Any](subscriptionDelayJava, itemDelayJava)) } - /** - * Return an Observable that delays the subscription to the source Observable by a given amount of time. - * - * - * - * @param delay the time to delay the subscription - * @return an Observable that delays the subscription to the source Observable by the given amount - */ - def delaySubscription(delay: Duration): Observable[T] = { - toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit)) - } - /** * Return an Observable that delays the subscription to the source Observable by a given amount of time, * both waiting and subscribing on a given Scheduler. @@ -3888,19 +3513,6 @@ trait Observable[+T] toScalaObservable[java.lang.Boolean](rx.Observable.sequenceEqual[U](thisJava, thatJava, equalityJava)).map(_.booleanValue) } - /** - * Returns an Observable that emits records of the time interval between consecutive items emitted by the - * source Obsegrvable. - *

- * - * - * @return an Observable that emits time interval information items - */ - def timeInterval: Observable[(Duration, T)] = { - toScalaObservable(asJavaObservable.timeInterval()) - .map(inv => (Duration(inv.getIntervalInMilliseconds, MILLISECONDS), inv.getValue)) - } - /** * Returns an Observable that emits records of the time interval between consecutive items emitted by the * source Observable, where this interval is computed on a specified Scheduler. @@ -4626,19 +4238,6 @@ object Observable { toScalaObservable[Seq[T]](o) } - /** - * Emits `0`, `1`, `2`, `...` with a delay of `duration` between consecutive numbers. - * - * - * - * @param duration - * duration between two consecutive numbers - * @return An Observable that emits a number each time interval. - */ - def interval(duration: Duration): Observable[Long] = { - toScalaObservable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit)).map(_.longValue()) - } - /** * Emits `0`, `1`, `2`, `...` with a delay of `duration` between consecutive numbers. * @@ -4690,18 +4289,6 @@ object Observable { toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS, scheduler)).map(_.longValue()) } - /** - * Returns an Observable that emits `0L` after a specified delay, and then completes. - * - * - * - * @param delay the initial delay before emitting a single `0L` - * @return Observable that emits `0L` after a specified delay, and then completes - */ - def timer(delay: Duration): Observable[Long] = { - toScalaObservable[java.lang.Long](rx.Observable.timer(delay.length, delay.unit)).map(_.longValue()) - } - /** * Returns an Observable that emits `0L` after a specified delay, on a specified Scheduler, and then * completes.