Skip to content

Filtering Observables

DavidMGross edited this page Sep 18, 2013 · 27 revisions

This section explains operators you can use to filter and select items emitted by Observables.

filter( ) or where( )

filter items emitted by an Observable

You can filter an Observable, discarding any items that do not meet some test, by passing a filtering function into the filter( ) method. For example, the following code filters a list of integers, emitting only those that are even (that is, where the remainder from dividing the number by two is zero):

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

Observable.filter(numbers, { 0 == (it % 2) }).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
2
4
6
8
Sequence complete

In addition to calling filter( ) as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of

Observable.filter(numbers, { 0 == (it %2) }) ...

you could instead write

numbers.filter({ 0 == (it % 2) }) ...

The where( ) method has the same purpose as filter( ) but accepts a Func1 evaluator function instead of a closure. Here is the same sample, but implemented with where( ) instead of filter( ):

class isEven implements rx.util.functions.Func1 {
  Boolean call( Object it ) { return(0 == (it % 2)); }
}

myisEven = new isEven();

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

numbers.where(myisEven).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);

takeLast( )

only emit the last n items emitted by an Observable

To convert an Observable that emits several items into one that only emits the last n of these itemsbefore completing, use the takeLast( ) method. For instance, in the following code, takeLast( ) emits only the last integer in the list of integers represented by numbers:

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

Observable.takeLast(numbers,1).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
9
Sequence complete

In addition to calling takeLast( ) as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of

Observable.takeLast(numbers,1) ...

you could instead write

numbers.takeLast(1) ...

skip()

ignore the first n items emitted by an Observable

You can ignore the first n items emitted by an Observable and attend only to those items that come after, by modifying the Observable with the skip(n) method.

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

Observable.skip(numbers, 3).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
4
5
6
7
8
9
Sequence complete

In addition to calling skip( ) as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of

Observable.skip(numbers, 3) ...

you could instead write

numbers.skip(3) ...

skipWhile( ) and skipWhileWithIndex( )

discard items emitted by an Observable until a specified condition is false, then emit the remainder

The skipWhile( ) method returns an Observable that discards items emitted by the source Observable until such time as a function applied to an item emitted by that Observable returns false, whereupon the new Observable emits that item and the remainder of the items emitted by the source Observable.

numbers = Observable.from( [1, 2, 3, 4, 5, 6, 7, 8, 9] );

numbers.skipWhile({ (0 == (it % 5)) }).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
5
6
7
8
9
Sequence complete

The skipWhileWithIndex( ) method is similar, but your function takes an additional parameter: the (zero-based) index of the item being emitted by the source Observable.

numbers = Observable.from( [1, 2, 3, 4, 5, 6, 7, 8, 9] );

numbers.skipWhileWithIndex({ it, index -> ((it < 6) || (index < 5)) }).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
6
7
8
9
Sequence complete

take( )

emit only the first n items emitted by an Observable

You can choose to pay attention only to the first n items emitted by an Observable by calling its take(n) method. That method returns an Observable that will invoke an Observer’s onNext method a maximum of n times before invoking onCompleted. For example,

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

Observable.take(numbers, 3).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
1
2
3
Sequence complete

In addition to calling take( ) as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of

Observable.take(numbers, 3) ...

you could instead write

numbers.take(3) ...

If you call take(n) on an Observable, and that Observable emits fewer than n items before completing, the new, take-modified Observable will not throw an exception or invoke onError(), but will merely emit this same fewer number of items before it completes.

takeWhile( ) and takeWhileWithIndex( )

emit items emitted by an Observable as long as a specified condition is true, then skip the remainder

The takeWhile( ) method returns an Observable that mirrors the behavior of the source Observable until such time as a function applied to an item emitted by that Observable returns false, whereupon the new Observable invokes onCompleted( ).

numbers = Observable.from( [1, 2, 3, 4, 5, 6, 7, 8, 9] );

numbers.takeWhile({ ((it < 6) || (0 == (it % 2))) }).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
1
2
3
4
5
6
Sequence complete

The takeWhileWithIndex( ) method is similar, but your function takes an additional parameter: the (zero-based) index of the item being emitted by the source Observable.

numbers = Observable.from( [1, 2, 3, 4, 5, 6, 7, 8, 9] );

numbers.takeWhileWithIndex({ it, index -> ((it < 6) || (index < 5)) }).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
1
2
3
4
5
Sequence complete

first( )

emit only the first item emitted by an Observable, or the first item that meets some condition

To create an Observable that emits only the first item emitted by a source Observable (if any), use the first( ) method.

You can also pass a function to this method that evaluates items as they are emitted by the source Observable, in which case first( ) will create an Observable that emits the first such item for which your function returns true (if any).

firstOrDefault( )

emit only the first item emitted by an Observable, or the first item that meets some condition, or a default value if the source Observable is empty

To create an Observable that emits only the first item emitted by a source Observable (or a default value if the source Observable is empty), use the firstOrDefault( ) method.

You can also pass a function to this method that evaluates items as they are emitted by the source Observable, in which case firstOrDefault( ) will create an Observable that emits the first such item for which your function returns true (or the supplied default value if no such item is emitted).

elementAt( )

emit item n emitted by the source Observable

Pass elementAt( ) a zero-based index value and it will emit the solitary item from the source Observable's sequence that matches that index value (for example, if you pass the index value 5, elementAt( ) will emit the sixth item emitted by the source Observable). If you pass in a negative index value, or if the source Observable emits fewer than index value + 1 items, elementAt( ) will throw an IndexOutOfBoundsException.

elementAtOrDefault( )

emit item n emitted by the source Observable, or a default item if the source Observable emits fewer than n items

Pass elementAtOrDefault( ) a zero-based index value and it will emit the solitary item from the source Observable's sequence that matches that index value (for example, if you pass the index value 5, elementAtOrDefault( ) will emit the sixth item emitted by the source Observable). If you pass in a negative index value, elementAtOrDefault( ) will throw an IndexOutOfBoundsException. If the source Observable emits fewer than index value + 1 items, elementAtOrDefault( ) will emit the default value you pass in (you must also pass in a type for this value that is appropriate to what type your Observers expect to observe).

sample( ) or throttleLast( )

emit the most recent items emitted by an Observable within periodic time intervals

Use the sample( ) method to periodically look at an Observable to see what item it is emitting at a particular time.

The following code constructs an Observable that emits the numbers between one and a million, and then samples that Observable every ten milliseconds to see what number it is emitting at that moment.

def numbers = Observable.range( 1, 1000000 );
 
numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
339707
547810
891282
Sequence complete

throttleFirst( )

emit the first items emitted by an Observable within periodic time intervals

Use the throttleFirst( ) method to periodically look at an Observable to see what item it emitted first during a particular time span. The following code shows how an Observable can be modified by throttleFirst( ):

PublishSubject<Integer> o = PublishSubject.create();
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(
    { println(it); },                  // onNext
    { println("Error encountered"); }, // onError
    { println("Sequence complete"); }  // onCompleted
);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // deliver
o.onNext(2); // skip
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // deliver
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // skip
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onCompleted();
1
3
7
Sequence complete

throttleWithTimeout( ) or debounce( )

only emit an item from the source Observable after a particular timespan has passed without the Observable emitting any other items

Use the throttleWithTimeout( ) method to select only those items emitted by a source Observable that are not quickly superceded by other items.

distinct( )

suppress duplicate items emitted by the source Observable

Use the distinct( ) method to remove duplicate items from a source Observable and only emit single examples of those items.

You can also pass a function or a comparator into distinct( ) that customizes how it distinguishes between distinct and non-distinct items.

distinctUntilChanged( )

suppress duplicate consecutive items emitted by the source Observable

Use the distinctUntilChanged( ) method to remove duplicate consecutive items from a source Observable and only emit single examples of such items.

You can also pass a function or a comparator into distinctUntilChanged( ) that customizes how it distinguishes between distinct and non-distinct items.

sidebar

Clone this wiki locally