Skip to content

Commit

Permalink
SizeBound draft for takeRight
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelgruetter committed Nov 4, 2014
1 parent 19c9aa1 commit 38f8319
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import org.scalatest.junit.JUnitSuite
import rx.lang.scala._
import rx.lang.scala.schedulers._

import Observable._

/**
* Demo how the different operators can be used. In Eclipse, you can right-click
* a test and choose "Run As" > "Scala JUnit Test".
Expand Down Expand Up @@ -1248,7 +1250,7 @@ class RxScalaDemo extends JUnitSuite {
val o = (1 to 10).toObservable
.zip(Observable.interval(100 millis))
.map(_._1)
.takeRight(2, 300 millis)
.takeRight(SizeBound(2, 300 millis))
println(o.toBlocking.toList)
}

Expand Down
109 changes: 30 additions & 79 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.collection.{Iterable, Traversable, immutable}
import scala.collection.mutable.ArrayBuffer
import scala.language.higherKinds
import scala.reflect.ClassTag
import rx.lang.scala.schedulers.ComputationScheduler


/**
Expand Down Expand Up @@ -1987,86 +1988,22 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.takeWhile(predicate))
}

/**
* Returns an Observable that emits only the last `count` items emitted by the source
* Observable.
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/last.png">
*
* @param count
* the number of items to emit from the end of the sequence emitted by the source
* Observable
* @return an Observable that emits only the last `count` items emitted by the source
* Observable
*/
def takeRight(count: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.takeLast(count))
}

/**
* Return an Observable that emits the items from the source Observable that were emitted in a specified
* window of `time` before the Observable completed.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/takeLast.t.png">
*
* @param time the length of the time window
* @return an Observable that emits the items from the source Observable that were emitted in the window of
* time before the Observable completed specified by `time`
*/
def takeRight(time: Duration): Observable[T] = {
toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit))
}

/**
* Return an Observable that emits the items from the source Observable that were emitted in a specified
* window of `time` before the Observable completed, where the timing information is provided by a specified
* Scheduler.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/takeLast.ts.png">
*
* @param time the length of the time window
* @param scheduler the Scheduler that provides the timestamps for the Observed items
* @return an Observable that emits the items from the source Observable that were emitted in the window of
* time before the Observable completed specified by `time`, where the timing information is
* provided by `scheduler`
*/
def takeRight(time: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit, scheduler.asJavaScheduler))
}
import Observable._

/**
* Return an Observable that emits at most a specified number of items from the source Observable that were
* emitted in a specified window of time before the Observable completed.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/takeLast.tn.png">
*
* @param count the maximum number of items to emit
* @param time the length of the time window
* @return an Observable that emits at most `count` items from the source Observable that were emitted
* in a specified window of time before the Observable completed
* @throws IllegalArgumentException if `count` is less than zero
*/
def takeRight(count: Int, time: Duration): Observable[T] = {
toScalaObservable[T](asJavaObservable.takeLast(count, time.length, time.unit))
}

/**
* Return an Observable that emits at most a specified number of items from the source Observable that were
* emitted in a specified window of `time` before the Observable completed, where the timing information is
* provided by a given Scheduler.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/takeLast.tns.png">
*
* @param count the maximum number of items to emit
* @param time the length of the time window
* @param scheduler the Scheduler that provides the timestamps for the observed items
* @return an Observable that emits at most `count` items from the source Observable that were emitted
* in a specified window of time before the Observable completed, where the timing information is
* provided by the given `scheduler`
* @throws IllegalArgumentException if `count` is less than zero
*/
def takeRight(count: Int, time: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable[T](asJavaObservable.takeLast(count, time.length, time.unit, scheduler.asJavaScheduler))
def takeRight(size: SizeBound, scheduler: Scheduler = ComputationScheduler()): Observable[T] = {
if (size.hasCountBound) {
if (size.hasTimeBound) {
toScalaObservable[T](asJavaObservable.takeLast(size.maxCount, size.maxTime.length, size.maxTime.unit, scheduler.asJavaScheduler))
} else {
toScalaObservable[T](asJavaObservable.takeLast(size.maxCount))
}
} else {
if (size.hasTimeBound) {
toScalaObservable[T](asJavaObservable.takeLast(size.maxTime.length, size.maxTime.unit, scheduler.asJavaScheduler))
} else {
throw new IllegalArgumentException("unbounded takeRight")
}
}
}

/**
Expand Down Expand Up @@ -4690,6 +4627,20 @@ object Observable {
}
toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction))
}

case class SizeBound(val maxCount: Int, val maxTime: Duration) {
def hasCountBound: Boolean = maxCount >= 0
def hasTimeBound: Boolean = maxTime != Duration.Inf
}
object SizeBound {
def apply(maxTime: Duration): SizeBound = SizeBound(-1, maxTime)
def apply(maxCount: Int): SizeBound = SizeBound(maxCount, Duration.Inf)
}

import language.implicitConversions
implicit def intToSizeBound(i: Int): SizeBound = SizeBound(i)
implicit def durationToSizeBound(t: Duration): SizeBound = SizeBound(t)

}


Expand Down

0 comments on commit 38f8319

Please sign in to comment.