From be80eba406a7c5ae2e0d79e2751a67e21e79def1 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 12 Sep 2013 20:30:39 -0700 Subject: [PATCH] UnitTests while working on EventStream use cases --- rxjava-core/src/test/java/rx/EventStream.java | 81 +++++++++++++++++++ .../src/test/java/rx/GroupByTests.java | 78 ++++++++++++++++++ rxjava-core/src/test/java/rx/ScanTests.java | 37 +++++++++ rxjava-core/src/test/java/rx/ZipTests.java | 45 +++++++++++ 4 files changed, 241 insertions(+) create mode 100644 rxjava-core/src/test/java/rx/EventStream.java create mode 100644 rxjava-core/src/test/java/rx/GroupByTests.java create mode 100644 rxjava-core/src/test/java/rx/ScanTests.java diff --git a/rxjava-core/src/test/java/rx/EventStream.java b/rxjava-core/src/test/java/rx/EventStream.java new file mode 100644 index 0000000000..5fbffba528 --- /dev/null +++ b/rxjava-core/src/test/java/rx/EventStream.java @@ -0,0 +1,81 @@ +package rx; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import rx.Observable.OnSubscribeFunc; +import rx.concurrency.Schedulers; +import rx.subscriptions.BooleanSubscription; +import rx.util.functions.Action0; + +/** + * Utility for retrieving a mock eventstream for testing. + */ +public class EventStream { + + public static Observable getEventStream(final String type, final int numInstances) { + return Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer observer) { + final BooleanSubscription s = new BooleanSubscription(); + // run on a background thread inside the OnSubscribeFunc so unsubscribe works + Schedulers.newThread().schedule(new Action0() { + + @Override + public void call() { + while (!(s.isUnsubscribed() || Thread.currentThread().isInterrupted())) { + observer.onNext(randomEvent(type, numInstances)); + try { + // slow it down somewhat + Thread.sleep(50); + } catch (InterruptedException e) { + observer.onError(e); + } + } + observer.onCompleted(); + } + + }); + + return s; + } + }); + } + + public static Event randomEvent(String type, int numInstances) { + Map values = new LinkedHashMap(); + values.put("count200", randomIntFrom0to(4000)); + values.put("count4xx", randomIntFrom0to(300)); + values.put("count5xx", randomIntFrom0to(500)); + return new Event(type, "instance_" + randomIntFrom0to(numInstances), values); + } + + private static int randomIntFrom0to(int max) { + // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml + long x = System.nanoTime(); + x ^= (x << 21); + x ^= (x >>> 35); + x ^= (x << 4); + return Math.abs((int) x % max); + } + + public static class Event { + public final String type; + public final String instanceId; + public final Map values; + + /** + * @param type + * @param instanceId + * @param values + * This does NOT deep-copy, so do not mutate this Map after passing it in. + */ + public Event(String type, String instanceId, Map values) { + this.type = type; + this.instanceId = instanceId; + this.values = Collections.unmodifiableMap(values); + } + } +} diff --git a/rxjava-core/src/test/java/rx/GroupByTests.java b/rxjava-core/src/test/java/rx/GroupByTests.java new file mode 100644 index 0000000000..87448f8510 --- /dev/null +++ b/rxjava-core/src/test/java/rx/GroupByTests.java @@ -0,0 +1,78 @@ +package rx; + +import org.junit.Test; + +import rx.EventStream.Event; +import rx.observables.GroupedObservable; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class GroupByTests { + + @Test + public void testTakeUnsubscribesOnGroupBy() { + Observable.merge( + EventStream.getEventStream("HTTP-ClusterA", 50), + EventStream.getEventStream("HTTP-ClusterB", 20)) + // group by type (2 clusters) + .groupBy(new Func1() { + + @Override + public String call(Event event) { + return event.type; + } + + }).take(1) + .toBlockingObservable().forEach(new Action1>() { + + @Override + public void call(GroupedObservable g) { + System.out.println(g); + } + + }); + + System.out.println("**** finished"); + } + + @Test + public void testTakeUnsubscribesOnFlatMapOfGroupBy() { + Observable.merge( + EventStream.getEventStream("HTTP-ClusterA", 50), + EventStream.getEventStream("HTTP-ClusterB", 20)) + // group by type (2 clusters) + .groupBy(new Func1() { + + @Override + public String call(Event event) { + return event.type; + } + + }) + .flatMap(new Func1, Observable>() { + + @Override + public Observable call(GroupedObservable g) { + return g.map(new Func1() { + + @Override + public String call(Event event) { + return event.instanceId + " - " + event.values.get("count200"); + } + }); + } + + }) + .take(20) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String v) { + System.out.println(v); + } + + }); + + System.out.println("**** finished"); + } +} diff --git a/rxjava-core/src/test/java/rx/ScanTests.java b/rxjava-core/src/test/java/rx/ScanTests.java new file mode 100644 index 0000000000..bef93e471b --- /dev/null +++ b/rxjava-core/src/test/java/rx/ScanTests.java @@ -0,0 +1,37 @@ +package rx; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import rx.EventStream.Event; +import rx.util.functions.Action1; +import rx.util.functions.Func2; + +public class ScanTests { + + @Test + public void testUnsubscribeScan() { + + EventStream.getEventStream("HTTP-ClusterB", 20) + .scan(new HashMap(), new Func2, Event, Map>() { + + @Override + public Map call(Map accum, Event perInstanceEvent) { + accum.put("instance", perInstanceEvent.instanceId); + return accum; + } + + }) + .take(10) + .toBlockingObservable().forEach(new Action1>() { + + @Override + public void call(Map v) { + System.out.println(v); + } + + }); + } +} diff --git a/rxjava-core/src/test/java/rx/ZipTests.java b/rxjava-core/src/test/java/rx/ZipTests.java index ee25bb6ade..64d96c904a 100644 --- a/rxjava-core/src/test/java/rx/ZipTests.java +++ b/rxjava-core/src/test/java/rx/ZipTests.java @@ -1,5 +1,8 @@ package rx; +import java.util.HashMap; +import java.util.Map; + import org.junit.Test; import rx.CovarianceTest.CoolRating; @@ -9,11 +12,53 @@ import rx.CovarianceTest.Movie; import rx.CovarianceTest.Rating; import rx.CovarianceTest.Result; +import rx.EventStream.Event; +import rx.observables.GroupedObservable; import rx.util.functions.Action1; +import rx.util.functions.Func1; import rx.util.functions.Func2; public class ZipTests { + @Test + public void testZipObservableOfObservables() { + EventStream.getEventStream("HTTP-ClusterB", 20) + .groupBy(new Func1() { + + @Override + public String call(Event e) { + return e.instanceId; + } + + // now we have streams of cluster+instanceId + }).flatMap(new Func1, Observable>>() { + + @Override + public Observable> call(final GroupedObservable ge) { + return ge.scan(new HashMap(), new Func2, Event, Map>() { + + @Override + public Map call(Map accum, Event perInstanceEvent) { + accum.put("instance", ge.getKey()); + return accum; + } + + }); + } + }) + .take(10) + .toBlockingObservable().forEach(new Action1>() { + + @Override + public void call(Map v) { + System.out.println(v); + } + + }); + + System.out.println("**** finished"); + } + /** * This won't compile if super/extends isn't done correctly on generics */