Utilities for use with rxjava:
Functions.identity, alwaysTrue, alwaysFalse, constant, not
Checked
provides lambda helpers for dealing with checked exceptions in functions and actionsTestingHelper
RetryWhen
builder for use with.retryWhen(Func1)
operatorTransformers.toOperator
Tranformers.mapWithIndex
Transformers.stateMachine
Transformers.orderedMerge
Transformers.collectWhile
Transformers.toListWhile
Transformers.toListUntilChanged
Transformers.collectStats
Serialized.read/write
PublishSubjectSingleSubscriber
OperatorUnsubscribeEagerly
Bytes.from
Bytes.unzip
unzips zip archivesStrings.from
Strings.lines
Strings.split
Status: released to Maven Central
Maven site reports are here including javadoc.
Add this to your pom.xml:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.6.2</version>
</dependency>
Checked exceptions can be annoying. If you are happy to wrap a checked exception with a RuntimeException
then the function and action helpers in Checked
are great:
Instead of
OutputStream os = ...;
Observable<String> source = ...;
source.doOnNext(s -> {
try {
os.write(s.getBytes());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.subscribe();
you can write:
source.doOnNext(Checked.a1(s -> os.write(s.getBytes())))
.subscribe();
To read serialized objects from a file:
Observable<Item> items = Serialized.read(file);
To write an Observable to a file:
Serialized.write(observable, file).subscribe();
Serialized
also has support for the very fast serialization library kryo. Unlike standard Java serialization Kryo can also serialize/deserialize objects that don't implement Serializable
.
Add this to your pom.xml:
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
For example,
To read:
Observable<Item> items = Serialized.kryo().read(file);
To write:
Serialized.write(observable, file).subscribe();
You can also call Serialized.kryo(kryo)
to use an instance of Kryo
that you have configured specially.
You may want to group emissions from an Observable into lists of variable size. This can be achieved safely using toListWhile
.
As an example from a sequence of temperatures lets group the sub-zero and zero or above temperatures into contiguous lists:
Observable.just(10, 5, 2, -1, -2, -5, -1, 2, 5, 6)
.compose(Transformers.toListWhile(
(list, t) -> list.isEmpty()
|| Math.signum(list.get(0)) < 0 && Math.signum(t) < 0
|| Math.signum(list.get(0)) >= 0 && Math.signum(t) >= 0)
.forEach(System.out::println);
produces
[10, 5, 2]
[-1, -2, -5, -1]
[2, 5, 6]
Behaves as per toListWhile
but allows control over the data structure used.
A common use case for .retry()
is some sequence of actions that are attempted and then after a delay a retry is attempted. RxJava does not provide
first class support for this use case but the building blocks are there with the .retryWhen()
method. RetryWhen
offers static methods that build a Func1
for use with Observable.retryWhen()
.
observable.retryWhen(
RetryWhen.delay(10, TimeUnit.SECONDS).build());
observable.retryWhen(
RetryWhen.delay(10, TimeUnit.SECONDS)
.maxRetries(10).build());
//the length of waits determines number of retries
Observable<Long> delays = Observable.just(10L,20L,30L,30L,30L);
observable.retryWhen(
RetryWhen.delays(delays, TimeUnit.SECONDS).build());
observable.retryWhen(
RetryWhen.retryWhenInstanceOf(IOException.class)
.build());
Suppose you have a a zip file file.zip
and you want to stream the lines of the file doc.txt
extracted from the archive:
Observable<String> lines =
Bytes.unzip(new File("file.zip"))
.filter(entry -> entry.getName().equals("doc.txt"))
.concatMap(entry -> Strings.from(entry.getInputStream))
.compose(o-> Strings.split(o, "\n"));
Note that above you don't need to worry about closing entry.getInputStream()
because it is handled in the unsubscribe of the Bytes.unzip
source.
You must process the emissions of ZippedEntry
synchronously (don't replace the concatMap()
with a flatMap(... .subscribeOn(Schedulers.computation())
for instance. This is because the InputStream
of each ZippedEntry
must be processed fullly (which could mean ignoring it of course) before moving on to the next one.
For a given named test the following variations are tested:
- without backpressure
- intiial request maximum, no further request
- initial request maximum, keep requesting single
- backpressure, initial request 1, then by 0 and 1
- backpressure, initial request 1, then by 1
- backpressure, initial request 2, then by 2
- backpressure, initial request 5, then by 5
- backpressure, initial request 100, then by 100
- backpressure, initial request 1000, then by 1000
- backpressure, initial request 2, then Long.MAX_VALUE-1 (checks for request overflow)
Note that the above list no longer contains a check for negative request because that situation is covered by Subscriber.request
throwing an IllegalArgumentException
.
For each variation the following aspects are tested:
- expected onNext items received
- unsubscribe from source occurs (for completion, error or explicit downstream unsubscription (optional))
- unsubscribe from downstream subscriber occurs
onCompleted
called (if unsubscribe not requested before completion and no errors expected)- if
onCompleted
expected is only called once onError
not called unless error expected- if error expected
onCompleted
not called afteronError
- should not deliver more than requested
An example that tests all of the above variations and aspects for the Observable.count()
method:
import junit.framework.TestCase;
import junit.framework.TestSuite;
import rx.Observable;
import com.github.davidmoten.rx.testing.TestingHelper;
public class CountTest extends TestCase {
public static TestSuite suite() {
return TestingHelper
.function(o -> o.count())
// test empty
.name("testCountOfEmptyReturnsZero")
.fromEmpty()
.expect(0)
// test error
.name("testCountErrorReturnsError")
.fromError()
.expectError()
// test error after some emission
.name("testCountErrorAfterTwoEmissionsReturnsError")
.fromErrorAfter(5, 6)
.expectError()
// test non-empty count
.name("testCountOfTwoReturnsTwo")
.from(5, 6)
.expect(2)
// test single input
.name("testCountOfOneReturnsOne")
.from(5)
.expect(1)
// count many
.name("testCountOfManyDoesNotGiveStackOverflow")
.from(Observable.range(1, 1000000))
.expect(1000000)
// get test suites
.testSuite(TestingHelperCountTest.class);
}
public void testDummy() {
// just here to fool eclipse
}
}
When you run CountTest
as a JUnit test in Eclipse you see the test variations described as below:
An asynchronous example with OperatorMerge
is below. Note the specific control of the wait times. For synchronous transformations the wait times can be left at their defaults:
public class TestingHelperMergeTest extends TestCase {
private static final Observable<Integer> MERGE_WITH =
Observable.from(asList(7, 8, 9)).subscribeOn(Schedulers.computation());
public static TestSuite suite() {
return TestingHelper
.function(o->o.mergeWith(MERGE_WITH).subscribeOn(Schedulers.computation())
.waitForUnsubscribe(100, TimeUnit.MILLISECONDS)
.waitForTerminalEvent(10, TimeUnit.SECONDS)
.waitForMoreTerminalEvents(100, TimeUnit.MILLISECONDS)
// test empty
.name("testEmptyWithOtherReturnsOther")
.fromEmpty()
.expect(7, 8, 9)
// test error
.name("testMergeErrorReturnsError")
.fromError()
.expectError()
// test error after items
.name("testMergeErrorAfter2ReturnsError")
.fromErrorAfter(1, 2)
.expectError()
// test non-empty count
.name("testTwoWithOtherReturnsTwoAndOtherInAnyOrder")
.from(1, 2)
.expectAnyOrder(1, 7, 8, 9, 2)
// test single input
.name("testOneWithOtherReturnsOneAndOtherInAnyOrder").from(1)
.expectAnyOrder(7, 1, 8, 9)
// unsub before completion
.name("testTwoWithOtherUnsubscribedAfterOneReturnsOneItemOnly").from(1, 2)
.unsubscribeAfter(1).expectSize(1)
// get test suites
.testSuite(TestingHelperMergeTest.class);
}
public void testDummy() {
// just here to fool eclipse
}
}
Observable<T> o;
Observable<T> stream =
o.lift(OperatorUnsubscribeEagerly.<T>instance());