diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 3a548773e1..d90158c736 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -156,6 +156,14 @@ class RxScalaDemo extends JUnitSuite { ).subscribe(output(_)) } + @Test def windowExample2() { + val windowObservable = Observable.interval(500 millis) + val o = Observable.from(1 to 20).zip(Observable.interval(100 millis)).map(_._1) + (for ((o, i) <- o.window(windowObservable).zipWithIndex; n <- o) + yield s"Observable#$i emits $n" + ).toBlocking.foreach(println) + } + @Test def testReduce() { assertEquals(10, List(1, 2, 3, 4).toObservable.reduce(_ + _).toBlockingObservable.single) } @@ -731,6 +739,23 @@ class RxScalaDemo extends JUnitSuite { println(result) } + @Test def ambWithVarargsExample(): Unit = { + val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds) + val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds) + val o3 = List(10000L, 20000L, 30000L).toObservable.delay(4 seconds) + val result = Observable.amb(o1, o2, o3).toBlocking.toList + println(result) + } + + @Test def ambWithSeqExample(): Unit = { + val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds) + val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds) + val o3 = List(10000L, 20000L, 30000L).toObservable.delay(4 seconds) + val o = Seq(o1, o2, o3) + val result = Observable.amb(o: _*).toBlocking.toList + println(result) + } + @Test def delayExample(): Unit = { val o = List(100L, 200L, 300L).toObservable.delay(2 seconds) val result = o.toBlockingObservable.toList @@ -1012,4 +1037,75 @@ class RxScalaDemo extends JUnitSuite { subscription.unsubscribe() } + def createAHotObservable: Observable[String] = { + var first = true + Observable[String] { + subscriber => + if (first) { + subscriber.onNext("1st: First") + subscriber.onNext("1st: Last") + first = false + } + else { + subscriber.onNext("2nd: First") + subscriber.onNext("2nd: Last") + } + subscriber.onCompleted() + } + } + + @Test def withoutPublishLastExample() { + val hot = createAHotObservable + hot.takeRight(1).subscribe(n => println(s"subscriber 1 gets $n")) + hot.takeRight(1).subscribe(n => println(s"subscriber 2 gets $n")) + } + + @Test def publishLastExample() { + val hot = createAHotObservable + val o = hot.publishLast + o.subscribe(n => println(s"subscriber 1 gets $n")) + o.subscribe(n => println(s"subscriber 2 gets $n")) + o.connect + } + + @Test def publishLastExample2() { + val hot = createAHotObservable + val o = hot.publishLast(co => co ++ co) // "++" subscribes "co" twice + o.subscribe(n => println(s"subscriber gets $n")) + } + + @Test def unsubscribeOnExample() { + val o = Observable[String] { + subscriber => + subscriber.add(Subscription { + println("unsubscribe on " + Thread.currentThread().getName()) + }) + subscriber.onNext("RxScala") + subscriber.onCompleted() + } + o.unsubscribeOn(NewThreadScheduler()).subscribe(println(_)) + } + + @Test def parallelMergeExample() { + val o: Observable[Observable[Int]] = (1 to 100).toObservable.map(_ => (1 to 10).toObservable) + assertEquals(100, o.size.toBlockingObservable.single) + assertEquals(1000, o.flatten.size.toBlockingObservable.single) + + val o2: Observable[Observable[Int]] = o.parallelMerge(10, ComputationScheduler()) + assertEquals(10, o2.size.toBlockingObservable.single) + assertEquals(1000, o2.flatten.size.toBlockingObservable.single) + } + + @Test def debounceExample() { + val o = Observable.interval(100 millis).take(20).debounce { + n => + if (n % 2 == 0) { + Observable.interval(50 millis) + } + else { + Observable.interval(150 millis) + } + } + o.toBlockingObservable.foreach(println(_)) + } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index a3647cac7c..585da1da25 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -148,6 +148,22 @@ trait Observable[+T] thisJava.subscribe(subscriber.asJavaSubscriber) } + /** + * Subscribe to Observable and invoke `OnSubscribe` function without any + * contract protection, error handling, unsubscribe, or execution hooks. + * + * This should only be used for implementing an `Operator` that requires nested subscriptions. + * + * Normal use should use [[Observable.subscribe]] which ensures the Rx contract and other functionality. + * + * @param subscriber + * @return [[Subscription]] which is the Subscriber passed in + * @since 0.17 + */ + def unsafeSubscribe(subscriber: Subscriber[T]): Subscription = { + asJavaObservable.unsafeSubscribe(subscriber.asJavaSubscriber) + } + /** * $subscribeSubscriberMain * @@ -392,26 +408,6 @@ trait Observable[+T] zip(0 until Int.MaxValue) } - /** - * Creates an Observable which produces buffers of collected values. - * - * This Observable produces connected non-overlapping buffers. The current buffer is - * emitted and replaced with a new buffer when the Observable produced by the specified function produces an object. The function will then - * be used to create a new Observable to listen for the end of the next buffer. - * - * @param closings - * The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created. - * When this [[rx.lang.scala.Observable]] produces an object, the associated buffer - * is emitted and replaced with a new one. - * @return - * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers, which are emitted - * when the current [[rx.lang.scala.Observable]] created with the function argument produces an object. - */ - def buffer(closings: () => Observable[Any]) : Observable[Seq[T]] = { - val f: Func0[_ <: rx.Observable[_ <: Any]] = closings().asJavaObservable - val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Any](f) - Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) - } /** * Creates an Observable which produces buffers of collected values. * @@ -610,13 +606,16 @@ trait Observable[+T] * Completion of either the source or the boundary Observable causes the returned Observable to emit the * latest buffer and complete. * - * @param boundary the boundary Observable + * @param boundary the boundary Observable. Note: This is a by-name parameter, + * so it is only evaluated when someone subscribes to the returned Observable. * @return an Observable that emits buffered items from the source Observable when the boundary Observable * emits an item */ - def buffer(boundary: Observable[Any]): Observable[Seq[T]] = { - val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]] - toScalaObservable(thisJava.buffer(boundary.asJavaObservable)).map(_.asScala) + def buffer(boundary: => Observable[Any]): Observable[Seq[T]] = { + val f = new Func0[rx.Observable[_ <: Any]]() { + override def call(): rx.Observable[_ <: Any] = boundary.asJavaObservable + } + toScalaObservable(asJavaObservable.buffer[Any](f)).map(_.asScala) } /** @@ -640,27 +639,22 @@ trait Observable[+T] /** * Creates an Observable which produces windows of collected values. This Observable produces connected - * non-overlapping windows. The current window is emitted and replaced with a new window when the - * Observable produced by the specified function produces an object. - * The function will then be used to create a new Observable to listen for the end of the next - * window. + * non-overlapping windows. The boundary of each window is determined by the items emitted from a specified + * boundary-governing Observable. * - * @param closings - * The function which is used to produce an [[rx.lang.scala.Observable]] for every window created. - * When this [[rx.lang.scala.Observable]] produces an object, the associated window - * is emitted and replaced with a new one. - * @return - * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted - * when the current [[rx.lang.scala.Observable]] created with the function argument produces an object. + * + * + * @param boundary an Observable whose emitted items close and open windows. Note: This is a by-name parameter, + * so it is only evaluated when someone subscribes to the returned Observable. + * @return An Observable which produces connected non-overlapping windows. The boundary of each window is + * determined by the items emitted from a specified boundary-governing Observable. */ - def window(closings: () => Observable[Any]): Observable[Observable[T]] = { - val func : Func0[_ <: rx.Observable[_ <: Any]] = closings().asJavaObservable - val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Any](func) - val o2 = Observable.items(o1).map((x: rx.Observable[_]) => { - val x2 = x.asInstanceOf[rx.Observable[_ <: T]] - toScalaObservable[T](x2) - }) - o2 + def window(boundary: => Observable[Any]): Observable[Observable[T]] = { + val func = new Func0[rx.Observable[_ <: Any]]() { + override def call(): rx.Observable[_ <: Any] = boundary.asJavaObservable + } + val jo: rx.Observable[_ <: rx.Observable[_ <: T]] = asJavaObservable.window[Any](func) + toScalaObservable(jo).map(toScalaObservable[T](_)) } /** @@ -933,6 +927,17 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.subscribeOn(scheduler)) } + /** + * Asynchronously unsubscribes on the specified [[Scheduler]]. + * + * @param scheduler the [[Scheduler]] to perform subscription and unsubscription actions on + * @return the source Observable modified so that its unsubscriptions happen on the specified [[Scheduler]] + * @since 0.17 + */ + def unsubscribeOn(scheduler: Scheduler): Observable[T] = { + toScalaObservable[T](asJavaObservable.unsubscribeOn(scheduler)) + } + /** * Asynchronously notify [[rx.lang.scala.Observer]]s on the specified [[rx.lang.scala.Scheduler]]. * @@ -1430,6 +1435,22 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.cache()) } + /** + * Returns a new [[Observable]] that multicasts (shares) the original [[Observable]]. As long a + * there is more than 1 [[Subscriber]], this [[Observable]] will be subscribed and emitting data. + * When all subscribers have unsubscribed it will unsubscribe from the source [[Observable]]. + * + * This is an alias for `publish().refCount()` + * + * + * + * @return a [[Observable]] that upon connection causes the source Observable to emit items to its [[Subscriber]]s + * @since 0.19 + */ + def share: Observable[T] = { + toScalaObservable[T](asJavaObservable.share()) + } + /** * Returns an Observable that emits a Boolean that indicates whether the source Observable emitted a * specified item. @@ -1512,6 +1533,40 @@ trait Observable[+T] toScalaObservable[R](thisJava.publish(fJava, initialValue)) } + /** + * Returns a [[ConnectableObservable]] that emits only the last item emitted by the source Observable. + * A [[ConnectableObservable]] resembles an ordinary Observable, except that it does not begin emitting items + * when it is subscribed to, but only when its `connect` method is called. + *

+ * + * + * @return a [[ConnectableObservable]] that emits only the last item emitted by the source Observable + */ + def publishLast: ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.publishLast()) + } + + /** + * Returns an Observable that emits an item that results from invoking a specified selector on the last item + * emitted by a [[ConnectableObservable]] that shares a single subscription to the source Observable. + *

+ * + * + * @param selector a function that can use the multicasted source sequence as many times as needed, without + * causing multiple subscriptions to the source Observable. Subscribers to the source will only + * receive the last item emitted by the source. + * @return an Observable that emits an item that is the result of invoking the selector on a [[ConnectableObservable]] + * that shares a single subscription to the source Observable + */ + def publishLast[R](selector: Observable[T] => Observable[R]): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]] + val fJava = new rx.functions.Func1[rx.Observable[T], rx.Observable[R]]() { + override def call(jo: rx.Observable[T]): rx.Observable[R] = + selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + } + toScalaObservable[R](thisJava.publishLast(fJava)) + } + // TODO add Scala-like aggregate function /** @@ -2222,6 +2277,23 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit)) } + /** + * Return an Observable that mirrors the source Observable, except that it drops items emitted by the source + * Observable that are followed by another item within a computed debounce duration. + * + * + * + * @param debounceSelector function to retrieve a sequence that indicates the throttle duration for each item + * @return an Observable that omits items emitted by the source Observable that are followed by another item + * within a computed debounce duration + */ + def debounce(debounceSelector: T => Observable[Any]): Observable[T] = { + val fJava = new rx.functions.Func1[T, rx.Observable[Any]] { + override def call(t: T) = debounceSelector(t).asJavaObservable.asInstanceOf[rx.Observable[Any]] + } + toScalaObservable[T](asJavaObservable.debounce[Any](fJava)) + } + /** * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. * @@ -2557,10 +2629,20 @@ trait Observable[+T] * if the source Observable completes without emitting any item. */ def firstOrElse[U >: T](default: => U): Observable[U] = { - this.take(1).foldLeft[Option[U]](None)((v: Option[U], e: U) => Some(e)).map({ - case Some(element) => element - case None => default - }) + take(1).singleOrElse(default) + } + + /** + * Returns an Observable that emits only an `Option` with the very first item emitted by the source Observable, + * or `None` if the source Observable is empty. + * + * + * + * @return an Observable that emits only an `Option` with the very first item from the source, or `None` + * if the source Observable completes without emitting any item. + */ + def headOption: Observable[Option[T]] = { + take(1).singleOption } /** @@ -2642,6 +2724,34 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.last) } + /** + * Returns an Observable that emits only an `Option` with the last item emitted by the source Observable, + * or `None` if the source Observable completes without emitting any items. + * + * + * + * @return an Observable that emits only an `Option` with the last item emitted by the source Observable, + * or `None` if the source Observable is empty + */ + def lastOption: Observable[Option[T]] = { + takeRight(1).singleOption + } + + /** + * Returns an Observable that emits only the last item emitted by the source Observable, or a default item + * if the source Observable completes without emitting any items. + * + * + * + * @param default the default item to emit if the source Observable is empty. + * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything. + * @return an Observable that emits only the last item emitted by the source Observable, or a default item + * if the source Observable is empty + */ + def lastOrElse[U >: T](default: => U): Observable[U] = { + takeRight(1).singleOrElse(default) + } + /** * If the source Observable completes after emitting a single item, return an Observable that emits that * item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`. @@ -2658,6 +2768,42 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.single) } + /** + * If the source Observable completes after emitting a single item, return an Observable that emits an `Option` + * with that item; if the source Observable is empty, return an Observable that emits `None`. + * If the source Observable emits more than one item, throw an `IllegalArgumentException`. + * + * + * + * @return an Observable that emits an `Option` with the single item emitted by the source Observable, or + * `None` if the source Observable is empty + * @throws IllegalArgumentException if the source Observable emits more than one item + */ + def singleOption: Observable[Option[T]] = { + val jObservableOption = map(Some(_)).asJavaObservable.asInstanceOf[rx.Observable[Option[T]]] + toScalaObservable[Option[T]](jObservableOption.singleOrDefault(None)) + } + + /** + * If the source Observable completes after emitting a single item, return an Observable that emits that + * item; if the source Observable is empty, return an Observable that emits a default item. If the source + * Observable emits more than one item, throw an `IllegalArgumentException`. + * + * + * + * @param default a default value to emit if the source Observable emits no item. + * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything. + * @return an Observable that emits the single item emitted by the source Observable, or a default item if + * the source Observable is empty + * @throws IllegalArgumentException if the source Observable emits more than one item + */ + def singleOrElse[U >: T](default: => U): Observable[U] = { + singleOption.map { + case Some(element) => element + case None => default + } + } + /** * Returns an Observable that forwards all sequentially distinct items emitted from the source Observable. * @@ -2832,13 +2978,26 @@ trait Observable[+T] } /** - * Converts an Observable into a [[rx.lang.scala.observables.BlockingObservable]] (an Observable with blocking - * operators). + * Converts an Observable into a [[BlockingObservable]] (an Observable with blocking operators). * + * @return a [[BlockingObservable]] version of this Observable * @see Blocking Observable Operators */ + @deprecated("Use `toBlocking` instead", "0.19") def toBlockingObservable: BlockingObservable[T] = { - new BlockingObservable[T](asJavaObservable.toBlockingObservable) + new BlockingObservable[T](asJavaObservable.toBlocking) + } + + /** + * Converts an Observable into a [[BlockingObservable]] (an Observable with blocking + * operators). + * + * @return a [[BlockingObservable]] version of this Observable + * @see Blocking Observable Operators + * @since 0.19 + */ + def toBlocking: BlockingObservable[T] = { + new BlockingObservable[T](asJavaObservable.toBlocking) } /** @@ -2870,6 +3029,53 @@ trait Observable[+T] toScalaObservable(asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava, scheduler)) } + /** + * Converts an `Observable[Observable[T]]` into another `Observable[Observable[T]]` whose + * emitted Observables emit the same items, but the number of such Observables is restricted by `parallelObservables`. + * + * For example, if the original `Observable[Observable[T]]` emits 100 Observables and `parallelObservables` is 8, + * the items emitted by the 100 original Observables will be distributed among 8 Observables emitted by the resulting Observable. + * + * + * + * This is a mechanism for efficiently processing `n` number of Observables on a smaller `m` number of resources (typically CPU cores). + * + * @param parallelObservables the number of Observables to merge into + * @return an Observable of Observables constrained in number by `parallelObservables` + */ + def parallelMerge[U](parallelObservables: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[Observable[U]] = { + val o2: Observable[Observable[U]] = this + val o3: Observable[rx.Observable[U]] = o2.map(_.asJavaObservable.asInstanceOf[rx.Observable[U]]) + val o4: rx.Observable[rx.Observable[U]] = o3.asJavaObservable.asInstanceOf[rx.Observable[rx.Observable[U]]] + val o5: rx.Observable[rx.Observable[U]] = rx.Observable.parallelMerge[U](o4, parallelObservables) + toScalaObservable(o5).map(toScalaObservable[U](_)) + } + + /** + * Converts an `Observable[Observable[T]]` into another `Observable[Observable[T]]` whose + * emitted Observables emit the same items, but the number of such Observables is restricted by `parallelObservables`, + * and each runs on a defined Scheduler. + * + * For example, if the original Observable[Observable[T]]` emits 100 Observables and `parallelObservables` is 8, + * the items emitted by the 100 original Observables will be distributed among 8 Observables emitted by the resulting Observable. + * + * + * + * This is a mechanism for efficiently processing n` number of Observables on a smaller `m` + * number of resources (typically CPU cores). + * + * @param parallelObservables the number of Observables to merge into + * @param scheduler the [[Scheduler]] to run each Observable on + * @return an Observable of Observables constrained in number by `parallelObservables` + */ + def parallelMerge[U](parallelObservables: Int, scheduler: Scheduler)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[Observable[U]] = { + val o2: Observable[Observable[U]] = this + val o3: Observable[rx.Observable[U]] = o2.map(_.asJavaObservable.asInstanceOf[rx.Observable[U]]) + val o4: rx.Observable[rx.Observable[U]] = o3.asJavaObservable.asInstanceOf[rx.Observable[rx.Observable[U]]] + val o5: rx.Observable[rx.Observable[U]] = rx.Observable.parallelMerge[U](o4, parallelObservables, scheduler) + toScalaObservable(o5).map(toScalaObservable[U](_)) + } + /** Tests whether a predicate holds for some of the elements of this `Observable`. * * @param p the predicate used to test elements. @@ -3664,6 +3870,32 @@ object Observable { toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS, scheduler)).map(_.longValue()) } + /** + * Returns an Observable that emits `0L` after a specified delay, and then completes. + * + * + * + * @param delay the initial delay before emitting a single `0L` + * @return Observable that emits `0L` after a specified delay, and then completes + */ + def timer(delay: Duration): Observable[Long] = { + toScalaObservable[java.lang.Long](rx.Observable.timer(delay.length, delay.unit)).map(_.longValue()) + } + + /** + * Returns an Observable that emits `0L` after a specified delay, on a specified Scheduler, and then + * completes. + * + * + * + * @param delay the initial delay before emitting a single `0L` + * @param scheduler the Scheduler to use for scheduling the item + * @return Observable that emits `0L` after a specified delay, on a specified Scheduler, and then completes + */ + def timer(delay: Duration, scheduler: Scheduler): Observable[Long] = { + toScalaObservable[java.lang.Long](rx.Observable.timer(delay.length, delay.unit, scheduler)).map(_.longValue()) + } + /** * Constructs an Observable that creates a dependent resource object. *

@@ -3686,6 +3918,18 @@ object Observable { )) } + /** + * Mirror the one Observable in an Iterable of several Observables that first emits an item. + * + * + * + * @param sources an Iterable of Observable sources competing to react first + * @return an Observable that emits the same sequence of items as whichever of the source Observables + * first emitted an item + */ + def amb[T](sources: Observable[T]*): Observable[T] = { + toScalaObservable[T](rx.Observable.amb[T](sources.map(_.asJavaObservable).asJava)) + } } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 611569493b..fa31de9919 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -72,12 +72,13 @@ class CompletenessTest extends JUnitSuite { "all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)", "buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)", "buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)", - "buffer(Func0[_ <: Observable[_ <: TClosing]])" -> "buffer(() => Observable[Any])", - "buffer(Observable[B])" -> "buffer(Observable[Any])", + "buffer(Func0[_ <: Observable[_ <: TClosing]])" -> "buffer(=> Observable[Any])", + "buffer(Observable[B])" -> "buffer(=> Observable[Any])", "buffer(Observable[B], Int)" -> "buffer(Observable[Any], Int)", "buffer(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "buffer(Observable[Opening], Opening => Observable[Any])", "contains(Any)" -> "contains(U)", "count()" -> "length", + "debounce(Func1[_ >: T, _ <: Observable[U]])" -> "debounce(T => Observable[Any])", "delay(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]])" -> "delay(() => Observable[Any], T => Observable[Any])", "delay(Func1[_ >: T, _ <: Observable[U]])" -> "delay(T => Observable[Any])", "dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])", @@ -88,6 +89,9 @@ class CompletenessTest extends JUnitSuite { "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])", "ignoreElements()" -> "[use `filter(_ => false)`]", + "last(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).last`]", + "lastOrDefault(T)" -> "lastOrElse(=> U)", + "lastOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `filter(predicate).lastOrElse(default)`]", "lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])", "limit(Int)" -> "take(Int)", "mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]", @@ -102,6 +106,7 @@ class CompletenessTest extends JUnitSuite { "publish(T)" -> "publish(U)", "publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[U] => Observable[R])", "publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)", + "publishLast(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publishLast(Observable[T] => Observable[R])", "reduce(Func2[T, T, T])" -> "reduce((U, U) => U)", "reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", "replay(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "replay(Observable[U] => Observable[R])", @@ -112,15 +117,18 @@ class CompletenessTest extends JUnitSuite { "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit)" -> "replay(Observable[U] => Observable[R], Duration)", "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit, Scheduler)" -> "replay(Observable[U] => Observable[R], Duration, Scheduler)", "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Scheduler)" -> "replay(Observable[U] => Observable[R], Scheduler)", + "sample(Observable[U])" -> "sample(Observable[Any])", "scan(Func2[T, T, T])" -> unnecessary, "scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)", + "single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]", + "singleOrDefault(T)" -> "singleOrElse(=> U)", + "singleOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `filter(predicate).singleOrElse(default)`]", "skip(Int)" -> "drop(Int)", "skip(Long, TimeUnit)" -> "drop(Duration)", "skip(Long, TimeUnit, Scheduler)" -> "drop(Duration, Scheduler)", "skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)", "skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary, "skipUntil(Observable[U])" -> "dropUntil(Observable[E])", - "single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]", "startWith(T)" -> "[use `item +: o`]", "startWith(Array[T])" -> "[use `Observable.items(items) ++ o`]", "startWith(Array[T], Scheduler)" -> "[use `Observable.items(items).subscribeOn(scheduler) ++ o`]", @@ -152,10 +160,11 @@ class CompletenessTest extends JUnitSuite { "toList()" -> "toSeq", "toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]", "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", + "window(Observable[U])" -> "window(=> Observable[Any])", + "window(Func0[_ <: Observable[_ <: TClosing]])" -> "window(=> Observable[Any])", + "window(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "window(Observable[Opening], Opening => Observable[Any])", "window(Long, Long, TimeUnit)" -> "window(Duration, Duration)", "window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)", - "zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)", - "zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)", // manually added entries for Java static methods "average(Observable[Integer])" -> averageProblem, @@ -178,6 +187,8 @@ class CompletenessTest extends JUnitSuite { "merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])", "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", + "parallelMerge(Observable[Observable[T]], Int)" -> "parallelMerge(Int)(<:<[Observable[T], Observable[Observable[U]]])", + "parallelMerge(Observable[Observable[T]], Int, Scheduler)" -> "parallelMerge(Int, Scheduler)(<:<[Observable[T], Observable[Observable[U]]])", "sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])", "sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqual(Observable[U], (U, U) => Boolean)", "range(Int, Int)" -> "[use `(start until (start + count)).toObservable` instead of `range(start, count)`]", @@ -190,7 +201,9 @@ class CompletenessTest extends JUnitSuite { "switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])", "zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]", "zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]", - "zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]" + "zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]", + "zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)", + "zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)" ) ++ List.iterate("T, T", 8)(s => s + ", T").map( // all 9 overloads of startWith: "startWith(" + _ + ")" -> "[use `Observable.items(...) ++ o`]" @@ -216,7 +229,10 @@ class CompletenessTest extends JUnitSuite { val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("") val funcParams = (1 to i).map(j => s"_ >: T$j, ").mkString("") ("combineLatest(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", "[If C# doesn't need it, Scala doesn't need it either ;-)]") - }).toMap + }).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map( + // amb 2-9 + "amb(" + _ + ")" -> "[unnecessary because we can use `o1 amb o2` instead or `amb(List(o1, o2, o3, ...)`]" + ).drop(1).toMap def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2") diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index d8ef40590e..cda1395b27 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -176,4 +176,44 @@ class ObservableTests extends JUnitSuite { assertEquals(List(0, 1, 2), o.toBlockingObservable.toList) } + @Test + def testSingleOrElse() { + val o = Observable.items(1).singleOrElse(2) + assertEquals(1, o.toBlocking.single) + } + + @Test + def testSingleOrElseWithEmptyObservable() { + val o: Observable[Int] = Observable.empty.singleOrElse(1) + assertEquals(1, o.toBlocking.single) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testSingleOrElseWithTooManyItems() { + Observable.items(1, 2).singleOrElse(1).toBlocking.single + } + + @Test + def testSingleOrElseWithCallByName() { + var called = false + val o: Observable[Int] = Observable.empty.singleOrElse { + called = true + 1 + } + assertFalse(called) + o.subscribe() + assertTrue(called) + } + + @Test + def testSingleOrElseWithCallByName2() { + var called = false + val o = Observable.items(1).singleOrElse { + called = true + 2 + } + assertFalse(called) + o.subscribe() + assertFalse(called) + } }