-
Notifications
You must be signed in to change notification settings - Fork 0
Filtering Observables
This section explains operators you can use to filter and select items emitted by Observables.
-
filter( )
orwhere( )
— filter items emitted by an Observable -
takeLast( )
— only emit the last n items emitted by an Observable -
skip( )
— ignore the first n items emitted by an Observable -
take( )
— emit only the first n items emitted by an Observable -
sample( )
— emit items emitted by an Observable at a particular time interval -
takeWhile( )
andtakeWhileWithIndex( )
— emit items emitted by an Observable as long as a specified condition is true, then skip the remainder -
skipWhile( )
andskipWhileWithIndex( )
— discard items emitted by an Observable until a specified condition is false, then emit the remainder
You can filter an Observable, discarding any items that do not meet some test, by passing a filtering function into the filter( )
method. For example, the following code filters a list of integers, emitting only those that are even (that is, where the remainder from dividing the number by two is zero):
numbers = Observable.toObservable([1, 2, 3, 4, 5, 6, 7, 8, 9]);
Observable.filter(numbers, { 0 == (it % 2) }).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
2
4
6
8
Sequence complete
In addition to calling filter( )
as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of
Observable.filter(numbers, { 0 == (it %2) }) ...
you could instead write
numbers.filter({ 0 == (it % 2) }) ...
The where( )
method has the same purpose as filter( )
but accepts a Func1
evaluator function instead of a closure. Here is the same sample, but implemented with where( )
instead of filter( )
:
class isEven implements rx.util.functions.Func1 {
Boolean call( Object it ) { return(0 == (it % 2)); }
}
myisEven = new isEven();
numbers = Observable.toObservable([1, 2, 3, 4, 5, 6, 7, 8, 9]);
numbers.where(myisEven).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
To convert an Observable that emits several items into one that only emits the last n of these itemsbefore completing, use the takeLast( )
method. For instance, in the following code, takeLast( )
emits only the last integer in the list of integers represented by numbers
:
numbers = Observable.toObservable([1, 2, 3, 4, 5, 6, 7, 8, 9]);
Observable.takeLast(numbers,1).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
9
Sequence complete
In addition to calling takeLast( )
as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of
Observable.takeLast(numbers,1) ...
you could instead write
numbers.takeLast(1) ...
You can ignore the first n items emitted by an Observable and attend only to those items that come after, by modifying the Observable with the skip(n)
method.
numbers = Observable.toObservable([1, 2, 3, 4, 5, 6, 7, 8, 9]);
Observable.skip(numbers, 3).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
4
5
6
7
8
9
Sequence complete
In addition to calling skip( )
as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of
Observable.skip(numbers, 3) ...
you could instead write
numbers.skip(3) ...
You can choose to pay attention only to the first n items emitted by an Observable by calling its take(n)
method. That method returns an Observable that will invoke an Observer’s onNext
method a maximum of n times before invoking onCompleted
. For example,
numbers = Observable.toObservable([1, 2, 3, 4, 5, 6, 7, 8, 9]);
Observable.take(numbers, 3).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
1
2
3
Sequence complete
In addition to calling take( )
as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of
Observable.take(numbers, 3) ...
you could instead write
numbers.take(3) ...
If you call take(n)
on an Observable, and that Observable emits fewer than n items before completing, the new, take
-modified Observable will not throw an exception or invoke onError()
, but will merely emit this same fewer number of items before it completes.
Use the sample( )
method to periodically look at an Observable to see what item it is emitting at a particular time.
The following code constructs an Observable that emits the numbers between one and a million, and then samples that Observable every ten milliseconds to see what number it is emitting at that moment.
def numbers = Observable.range( 1, 1000000 );
numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
339707
547810
891282
Sequence complete
emit items emitted by an Observable as long as a specified condition is true, then skip the remainder
The takeWhile( )
method returns an Observable that mirrors the behavior of the source Observable until such time as a function applied to an item emitted by that Observable returns false
, whereupon the new Observable invokes onCompleted( )
.
numbers = Observable.toObservable( [1, 2, 3, 4, 5, 6, 7, 8, 9] );
numbers.takeWhile({ ((it < 6) || (0 == (it % 2))) }).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
1
2
3
4
5
6
Sequence complete
The takeWhileWithIndex( )
method is similar, but your function takes an additional parameter: the (zero-based) index of the item being emitted by the source Observable.
numbers = Observable.toObservable( [1, 2, 3, 4, 5, 6, 7, 8, 9] );
numbers.takeWhileWithIndex({ it, index -> ((it < 6) || (index < 5)) }).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
1
2
3
4
5
Sequence complete
discard items emitted by an Observable until a specified condition is false, then emit the remainder
(diagram TBD)
The skipWhile( )
method returns an Observable that discards items emitted by the source Observable until such time as a function applied to an item emitted by that Observable returns false
, whereupon the new Observable emits that item and the remainder of the items emitted by the source Observable.
numbers = Observable.toObservable( [1, 2, 3, 4, 5, 6, 7, 8, 9] );
numbers.skipWhile({ (0 == (it % 5)) }).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
5
6
7
8
9
Sequence complete
The skipWhileWithIndex( )
method is similar, but your function takes an additional parameter: the (zero-based) index of the item being emitted by the source Observable.
numbers = Observable.toObservable( [1, 2, 3, 4, 5, 6, 7, 8, 9] );
numbers.skipWhileWithIndex({ it, index -> ((it < 6) || (index < 5)) }).subscribe(
[ onNext:{ myWriter.println(it); },
onCompleted:{ myWriter.println("Sequence complete"); },
onError:{ myWriter.println("Error encountered"); } ]
);
6
7
8
9
Sequence complete
A Netflix Original Production
Tech Blog | Twitter @NetflixOSS | Twitter @RxJava | Jobs