Skip to content

Commit

Permalink
Merge pull request #876 from benjchristensen/751-merge
Browse files Browse the repository at this point in the history
Manual Merge of #750
  • Loading branch information
benjchristensen committed Feb 14, 2014
2 parents b9fe278 + 840f721 commit 6eb7162
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ trait Observable[+T]
.map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
}

/**
* Wraps each item emitted by a source Observable in a timestamped tuple
* with timestamps provided by the given Scheduler.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timestamp.s.png">
*
* @param scheduler [[rx.lang.scala.Scheduler]] to use as a time source.
* @return an Observable that emits timestamped items from the source
* Observable with timestamps provided by the given Scheduler
*/
def timestamp(scheduler: Scheduler): Observable[(Long, T)] = {
toScalaObservable[rx.util.Timestamped[_ <: T]](asJavaObservable.timestamp(scheduler))
.map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
}

/**
* Returns an Observable formed from this Observable and another Observable by combining
* corresponding elements in pairs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.lang.scala

import scala.collection.mutable.ListBuffer
import scala.concurrent.{Future, Await}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -130,6 +131,21 @@ class ObservableTests extends JUnitSuite {
assertEquals(List("a1", "a2", "a3"),zs.toBlockingObservable.toList)
}

@Test def testTimestampWithScheduler() {
val c = 10
val s = TestScheduler()
val o1 = Observable interval (1.milliseconds, s) map (_ + 1)
val o2 = o1 timestamp s
val l = ListBuffer[(Long, Long)]()
o2.subscribe (
onNext = (l += _)
)
s advanceTimeTo c.milliseconds
val (l1, l2) = l.toList.unzip
assertTrue(l1.size == c)
assertEquals(l2, l1)
}

/*
@Test def testHead() {
val observer = mock(classOf[Observer[Int]])
Expand Down

0 comments on commit 6eb7162

Please sign in to comment.