Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxScala: Add the rest missing methods to BlockingObservable #1336

Merged
merged 7 commits into from
Jun 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -226,6 +227,28 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(squares.toBlockingObservable.toList, List(4, 100, 400, 900))
}

@Test def nextExample() {
val o = Observable.interval(100 millis).take(20)
for(i <- o.toBlocking.next) {
println(i)
Thread.sleep(200)
}
}

@Test def latestExample() {
val o = Observable.interval(100 millis).take(20)
for(i <- o.toBlocking.latest) {
println(i)
Thread.sleep(200)
}
}

@Test def toFutureExample() {
val o = Observable.interval(500 millis).take(1)
val r = Await.result(o.toBlocking.toFuture, 2 seconds)
println(r)
}

@Test def testTwoSubscriptionsToOneInterval() {
val o = Observable.interval(100 millis).take(8)
o.subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2985,13 +2985,14 @@ trait Observable[+T]

/**
* If the source Observable completes after emitting a single item, return an Observable that emits that
* item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`.
* item. If the source Observable emits more than one item or no items, notify of an `IllegalArgumentException`
* or `NoSuchElementException` respectively.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
*
* @return an Observable that emits the single item emitted by the source Observable
* @throws NoSuchElementException
* if the source emits more than one item or no items
* @throws IllegalArgumentException if the source emits more than one item
* @throws NoSuchElementException if the source emits no items
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Wiki: single()</a>
* @see "MSDN: Observable.singleAsync()"
*/
Expand Down Expand Up @@ -3252,7 +3253,7 @@ trait Observable[+T]
*/
@deprecated("Use `toBlocking` instead", "0.19")
def toBlockingObservable: BlockingObservable[T] = {
new BlockingObservable[T](asJavaObservable.toBlocking)
new BlockingObservable[T](this)
}

/**
Expand All @@ -3264,7 +3265,7 @@ trait Observable[+T]
* @since 0.19
*/
def toBlocking: BlockingObservable[T] = {
new BlockingObservable[T](asJavaObservable.toBlocking)
new BlockingObservable[T](this)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@
package rx.lang.scala.observables

import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import rx.lang.scala.ImplicitFunctionConversions._
import rx.lang.scala.Observable
import rx.observables.{BlockingObservable => JBlockingObservable}


/**
* An Observable that provides blocking operators.
*
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlockingObservable]]
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlocking]]
*/
// constructor is private because users should use Observable.toBlockingObservable
class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T])
extends AnyVal
// constructor is private because users should use Observable.toBlocking
class BlockingObservable[+T] private[scala] (val o: Observable[T])
extends AnyVal
{

// This is def because "field definition is not allowed in value class"
private def asJava: JBlockingObservable[_ <: T] = o.asJavaObservable.toBlocking
/**
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
* completes.
Expand Down Expand Up @@ -69,6 +73,31 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
asJava.last : T
}

/**
* Returns an `Option` with the last item emitted by the source Observable,
* or `None` if the source Observable completes without emitting any items.
*
* @return an `Option` with the last item emitted by the source Observable,
* or `None` if the source Observable is empty
*/
def lastOption: Option[T] = {
o.lastOption.toBlocking.single
}

/**
* Returns the last item emitted by the source Observable, or a default item
* if the source Observable completes without emitting any items.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/lastOrDefault.png">
*
* @param default the default item to emit if the source Observable is empty.
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return the last item emitted by the source Observable, or a default item if the source Observable is empty
*/
def lastOrElse[U >: T](default: => U): U = {
lastOption getOrElse default
}

/**
* Returns the first item emitted by a specified [[Observable]], or
* `NoSuchElementException` if source contains no elements.
Expand Down Expand Up @@ -96,12 +125,29 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
*/
def head : T = first

// last -> use toIterable.last
// lastOrDefault -> use toIterable.lastOption
// first -> use toIterable.head
// firstOrDefault -> use toIterable.headOption
// single(predicate) -> use filter and single
// singleOrDefault -> use singleOption
/**
* Returns an `Option` with the very first item emitted by the source Observable,
* or `None` if the source Observable is empty.
*
* @return an `Option` with the very first item from the source,
* or `None` if the source Observable completes without emitting any item.
*/
def headOption: Option[T] = {
o.headOption.toBlocking.single
}

/**
* Returns the very first item emitted by the source Observable, or a default value if the source Observable is empty.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstOrDefault.png">
*
* @param default The default value to emit if the source Observable doesn't emit anything.
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return the very first item from the source, or a default value if the source Observable completes without emitting any item.
*/
def headOrElse[U >: T](default: => U): U = {
headOption getOrElse default
}

/**
* Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}.
Expand Down Expand Up @@ -130,32 +176,48 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
}

/**
* If this {@link Observable} completes after emitting a single item, return that item,
* otherwise throw an exception.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.single.png">
* If the source Observable completes after emitting a single item, return that item. If the source Observable
* emits more than one item or no items, notify of an `IllegalArgumentException` or `NoSuchElementException` respectively.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
*
* @return the single item emitted by the {@link Observable}
* @return an Observable that emits the single item emitted by the source Observable
* @throws IllegalArgumentException if the source emits more than one item
* @throws NoSuchElementException if the source emits no items
*/
def single: T = {
asJava.single(): T // useless ascription because of compiler bug
}

/**
* If this {@link Observable} completes after emitting a single item, return an Option containing
* this item, otherwise return {@code None}.
* If the source Observable completes after emitting a single item, return an `Option` with that item;
* if the source Observable is empty, return `None`. If the source Observable emits more than one item,
* throw an `IllegalArgumentException`.
*
* @return an `Option` with the single item emitted by the source Observable, or
* `None` if the source Observable is empty
* @throws IllegalArgumentException if the source Observable emits more than one item
*/
def singleOption: Option[T] = {
var size: Int = 0
var last: Option[T] = None
for (t <- toIterable) {
size += 1
last = Some(t)
}
if (size == 1) last else None
o.singleOption.toBlocking.single
}

// TODO toFuture()
/**
* If the source Observable completes after emitting a single item, return that item;
* if the source Observable is empty, return a default item. If the source Observable
* emits more than one item, throw an `IllegalArgumentException`.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/singleOrDefault.png">
*
* @param default a default value to emit if the source Observable emits no item.
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return the single item emitted by the source Observable, or a default item if
* the source Observable is empty
* @throws IllegalArgumentException if the source Observable emits more than one item
*/
def singleOrElse[U >: T](default: => U): U = {
singleOption getOrElse default
}

/**
* Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}.
Expand All @@ -171,6 +233,38 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
asJava.toIterable.asScala.toList: List[T] // useless ascription because of compiler bug
}

/**
* Returns an `Iterable` that returns the latest item emitted by this `BlockingObservable`,
* waiting if necessary for one to become available.
*
* If this `BlockingObservable` produces items faster than `Iterator.next` takes them,
* `onNext` events might be skipped, but `onError` or `onCompleted` events are not.
*
* Note also that an `onNext` directly followed by `onCompleted` might hide the `onNext` event.
*
* @return an `Iterable` that always returns the latest item emitted by this `BlockingObservable`
*/
def latest: Iterable[T] = {
asJava.latest.asScala: Iterable[T] // useless ascription because of compiler bug
}

/**
* Returns a `Future` representing the single value emitted by this `BlockingObservable`.
*
* The returned `Future` will be completed with an `IllegalArgumentException` if the `BlockingObservable`
* emits more than one item. And it will be completed with an `NoSuchElementException` if the `BlockingObservable`
* is empty. Use `Observable.toSeq.toBlocking.toFuture` if you are not sure about the size of `BlockingObservable`
* and do not want to handle these `Exception`s.
*
* <img width="640" height="395" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.toFuture.png">
*
* @return a `Future` that expects a single item to be emitted by this `BlockingObservable`.
*/
def toFuture: Future[T] = {
val p = Promise[T]()
o.single.subscribe(t => p.success(t), e => p.failure(e))
p.future
}
}

// Cannot yet have inner class because of this error message:
Expand Down
Loading