Skip to content

davidmoten/rxjava-extras

Repository files navigation

rxjava-extras


Maven Central
Dependency Status

Utilities for use with rxjava:

  • Functions.identity, alwaysTrue, alwaysFalse, constant, not
  • Checked provides lambda helpers for dealing with checked exceptions in functions and actions
  • TestingHelper
  • RetryWhen builder for use with .retryWhen(Func1) operator
  • Transformers.toOperator
  • Tranformers.mapWithIndex
  • Transformers.stateMachine
  • Transformers.orderedMerge
  • Transformers.collectWhile
  • Transformers.toListWhile
  • Transformers.toListUntilChanged
  • Transformers.collectStats
  • Serialized.read/write
  • PublishSubjectSingleSubscriber
  • OperatorUnsubscribeEagerly
  • Bytes.from
  • Bytes.unzip unzips zip archives
  • Strings.from
  • Strings.lines
  • Strings.split

Status: released to Maven Central

Maven site reports are here including javadoc.

Getting started

Add this to your pom.xml:

<dependency>
  <groupId>com.github.davidmoten</groupId>
  <artifactId>rxjava-extras</artifactId>
  <version>0.6.2</version>
</dependency>

Checked

Checked exceptions can be annoying. If you are happy to wrap a checked exception with a RuntimeException then the function and action helpers in Checked are great:

Instead of

OutputStream os =  ...;
Observable<String> source = ...;
source.doOnNext(s -> {
	    try {
	        os.write(s.getBytes());
	    } catch (IOException e) {
	        throw new RuntimeException(e);
	    }
    })
    .subscribe();

you can write:

source.doOnNext(Checked.a1(s -> os.write(s.getBytes())))
      .subscribe();

Serialized

To read serialized objects from a file:

Observable<Item> items = Serialized.read(file);

To write an Observable to a file:

Serialized.write(observable, file).subscribe();

Kryo

Serialized also has support for the very fast serialization library kryo. Unlike standard Java serialization Kryo can also serialize/deserialize objects that don't implement Serializable.

Add this to your pom.xml:

<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>3.0.3</version>
</dependency>

For example,

To read:

Observable<Item> items = Serialized.kryo().read(file);

To write:

Serialized.write(observable, file).subscribe();

You can also call Serialized.kryo(kryo) to use an instance of Kryo that you have configured specially.

Transformers.toListWhile

You may want to group emissions from an Observable into lists of variable size. This can be achieved safely using toListWhile.

As an example from a sequence of temperatures lets group the sub-zero and zero or above temperatures into contiguous lists:

Observable.just(10, 5, 2, -1, -2, -5, -1, 2, 5, 6)
    .compose(Transformers.toListWhile( 
        (list, t) -> list.isEmpty() 
            || Math.signum(list.get(0)) < 0 && Math.signum(t) < 0
            || Math.signum(list.get(0)) >= 0 && Math.signum(t) >= 0)
    .forEach(System.out::println);

produces

[10, 5, 2]
[-1, -2, -5, -1]
[2, 5, 6]

Tranformers.collectWhile

Behaves as per toListWhile but allows control over the data structure used.

RetryWhen

A common use case for .retry() is some sequence of actions that are attempted and then after a delay a retry is attempted. RxJava does not provide first class support for this use case but the building blocks are there with the .retryWhen() method. RetryWhen offers static methods that build a Func1 for use with Observable.retryWhen().

Retry after a constant delay

observable.retryWhen(
    RetryWhen.delay(10, TimeUnit.SECONDS).build());

Retry after a constant delay with a maximum number of retries

observable.retryWhen(
    RetryWhen.delay(10, TimeUnit.SECONDS)
        .maxRetries(10).build());

Retry after custom delays

//the length of waits determines number of retries
Observable<Long> delays = Observable.just(10L,20L,30L,30L,30L);
observable.retryWhen(
    RetryWhen.delays(delays, TimeUnit.SECONDS).build());

Retry only for a particular exception

observable.retryWhen(
    RetryWhen.retryWhenInstanceOf(IOException.class)
        .build());

Unzipping

Suppose you have a a zip file file.zip and you want to stream the lines of the file doc.txt extracted from the archive:

Observable<String> lines = 
    Bytes.unzip(new File("file.zip"))
       .filter(entry -> entry.getName().equals("doc.txt"))
       .concatMap(entry -> Strings.from(entry.getInputStream))
       .compose(o-> Strings.split(o, "\n"));

Note that above you don't need to worry about closing entry.getInputStream() because it is handled in the unsubscribe of the Bytes.unzip source.

You must process the emissions of ZippedEntry synchronously (don't replace the concatMap() with a flatMap(... .subscribeOn(Schedulers.computation()) for instance. This is because the InputStream of each ZippedEntry must be processed fullly (which could mean ignoring it of course) before moving on to the next one.

TestingHelper

For a given named test the following variations are tested:

  • without backpressure
  • intiial request maximum, no further request
  • initial request maximum, keep requesting single
  • backpressure, initial request 1, then by 0 and 1
  • backpressure, initial request 1, then by 1
  • backpressure, initial request 2, then by 2
  • backpressure, initial request 5, then by 5
  • backpressure, initial request 100, then by 100
  • backpressure, initial request 1000, then by 1000
  • backpressure, initial request 2, then Long.MAX_VALUE-1 (checks for request overflow)

Note that the above list no longer contains a check for negative request because that situation is covered by Subscriber.request throwing an IllegalArgumentException.

For each variation the following aspects are tested:

  • expected onNext items received
  • unsubscribe from source occurs (for completion, error or explicit downstream unsubscription (optional))
  • unsubscribe from downstream subscriber occurs
  • onCompleted called (if unsubscribe not requested before completion and no errors expected)
  • if onCompleted expected is only called once
  • onError not called unless error expected
  • if error expected onCompleted not called after onError
  • should not deliver more than requested

An example that tests all of the above variations and aspects for the Observable.count() method:

import junit.framework.TestCase;
import junit.framework.TestSuite;
import rx.Observable;

import com.github.davidmoten.rx.testing.TestingHelper;

public class CountTest extends TestCase {

    public static TestSuite suite() {

        return TestingHelper
                .function(o -> o.count())
                // test empty
                .name("testCountOfEmptyReturnsZero")
                .fromEmpty()
                .expect(0)
                // test error
                .name("testCountErrorReturnsError")
                .fromError()
                .expectError()
                // test error after some emission
                .name("testCountErrorAfterTwoEmissionsReturnsError")
                .fromErrorAfter(5, 6)
                .expectError()
                // test non-empty count
                .name("testCountOfTwoReturnsTwo")
                .from(5, 6)
                .expect(2)
                // test single input
                .name("testCountOfOneReturnsOne")
                .from(5)
                .expect(1)
                // count many
                .name("testCountOfManyDoesNotGiveStackOverflow")
                .from(Observable.range(1, 1000000))
                .expect(1000000)
                // get test suites
                .testSuite(TestingHelperCountTest.class);
    }

    public void testDummy() {
        // just here to fool eclipse
    }

}

When you run CountTest as a JUnit test in Eclipse you see the test variations described as below:

An asynchronous example with OperatorMerge is below. Note the specific control of the wait times. For synchronous transformations the wait times can be left at their defaults:

public class TestingHelperMergeTest extends TestCase {

    private static final Observable<Integer> MERGE_WITH = 
        Observable.from(asList(7, 8, 9)).subscribeOn(Schedulers.computation());

    public static TestSuite suite() {

        return TestingHelper
                .function(o->o.mergeWith(MERGE_WITH).subscribeOn(Schedulers.computation())
                .waitForUnsubscribe(100, TimeUnit.MILLISECONDS)
                .waitForTerminalEvent(10, TimeUnit.SECONDS)
                .waitForMoreTerminalEvents(100, TimeUnit.MILLISECONDS)
                // test empty
                .name("testEmptyWithOtherReturnsOther")
                .fromEmpty()
                .expect(7, 8, 9)
                // test error
                .name("testMergeErrorReturnsError")
                .fromError()
                .expectError()
                // test error after items
                .name("testMergeErrorAfter2ReturnsError")
                .fromErrorAfter(1, 2)
                .expectError()
                // test non-empty count
                .name("testTwoWithOtherReturnsTwoAndOtherInAnyOrder")
                .from(1, 2)
                .expectAnyOrder(1, 7, 8, 9, 2)
                // test single input
                .name("testOneWithOtherReturnsOneAndOtherInAnyOrder").from(1)
                .expectAnyOrder(7, 1, 8, 9)
                // unsub before completion
                .name("testTwoWithOtherUnsubscribedAfterOneReturnsOneItemOnly").from(1, 2)
                .unsubscribeAfter(1).expectSize(1)
                // get test suites
                .testSuite(TestingHelperMergeTest.class);
    }

    public void testDummy() {
        // just here to fool eclipse
    }
}

How to use OperatorUnsubscribeEagerly

Observable<T> o;

Observable<T> stream = 
      o.lift(OperatorUnsubscribeEagerly.<T>instance());