diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 7b5b7fe79c..018eea37fd 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -282,6 +282,21 @@ trait Observable[+T] .map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue)) } + /** + * Wraps each item emitted by a source Observable in a timestamped tuple + * with timestamps provided by the given Scheduler. + *
+ * + * + * @param scheduler [[rx.lang.scala.Scheduler]] to use as a time source. + * @return an Observable that emits timestamped items from the source + * Observable with timestamps provided by the given Scheduler + */ + def timestamp(scheduler: Scheduler): Observable[(Long, T)] = { + toScalaObservable[rx.util.Timestamped[_ <: T]](asJavaObservable.timestamp(scheduler)) + .map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue)) + } + /** * Returns an Observable formed from this Observable and another Observable by combining * corresponding elements in pairs. diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index c91cbd4e70..397907a6cb 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -15,6 +15,7 @@ */ package rx.lang.scala +import scala.collection.mutable.ListBuffer import scala.concurrent.{Future, Await} import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global @@ -130,6 +131,21 @@ class ObservableTests extends JUnitSuite { assertEquals(List("a1", "a2", "a3"),zs.toBlockingObservable.toList) } + @Test def testTimestampWithScheduler() { + val c = 10 + val s = TestScheduler() + val o1 = Observable interval (1.milliseconds, s) map (_ + 1) + val o2 = o1 timestamp s + val l = ListBuffer[(Long, Long)]() + o2.subscribe ( + onNext = (l += _) + ) + s advanceTimeTo c.milliseconds + val (l1, l2) = l.toList.unzip + assertTrue(l1.size == c) + assertEquals(l2, l1) + } + /* @Test def testHead() { val observer = mock(classOf[Observer[Int]])