Skip to content

Commit

Permalink
Merge pull request #378 from benjchristensen/tests
Browse files Browse the repository at this point in the history
UnitTests while working on EventStream use cases
  • Loading branch information
benjchristensen committed Sep 13, 2013
2 parents df358b1 + be80eba commit 4cc3739
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 0 deletions.
81 changes: 81 additions & 0 deletions rxjava-core/src/test/java/rx/EventStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package rx;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

import rx.Observable.OnSubscribeFunc;
import rx.concurrency.Schedulers;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Action0;

/**
* Utility for retrieving a mock eventstream for testing.
*/
public class EventStream {

public static Observable<Event> getEventStream(final String type, final int numInstances) {
return Observable.create(new OnSubscribeFunc<Event>() {

@Override
public Subscription onSubscribe(final Observer<? super Event> observer) {
final BooleanSubscription s = new BooleanSubscription();
// run on a background thread inside the OnSubscribeFunc so unsubscribe works
Schedulers.newThread().schedule(new Action0() {

@Override
public void call() {
while (!(s.isUnsubscribed() || Thread.currentThread().isInterrupted())) {
observer.onNext(randomEvent(type, numInstances));
try {
// slow it down somewhat
Thread.sleep(50);
} catch (InterruptedException e) {
observer.onError(e);
}
}
observer.onCompleted();
}

});

return s;
}
});
}

public static Event randomEvent(String type, int numInstances) {
Map<String, Object> values = new LinkedHashMap<String, Object>();
values.put("count200", randomIntFrom0to(4000));
values.put("count4xx", randomIntFrom0to(300));
values.put("count5xx", randomIntFrom0to(500));
return new Event(type, "instance_" + randomIntFrom0to(numInstances), values);
}

private static int randomIntFrom0to(int max) {
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
long x = System.nanoTime();
x ^= (x << 21);
x ^= (x >>> 35);
x ^= (x << 4);
return Math.abs((int) x % max);
}

public static class Event {
public final String type;
public final String instanceId;
public final Map<String, Object> values;

/**
* @param type
* @param instanceId
* @param values
* This does NOT deep-copy, so do not mutate this Map after passing it in.
*/
public Event(String type, String instanceId, Map<String, Object> values) {
this.type = type;
this.instanceId = instanceId;
this.values = Collections.unmodifiableMap(values);
}
}
}
78 changes: 78 additions & 0 deletions rxjava-core/src/test/java/rx/GroupByTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package rx;

import org.junit.Test;

import rx.EventStream.Event;
import rx.observables.GroupedObservable;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class GroupByTests {

@Test
public void testTakeUnsubscribesOnGroupBy() {
Observable.merge(
EventStream.getEventStream("HTTP-ClusterA", 50),
EventStream.getEventStream("HTTP-ClusterB", 20))
// group by type (2 clusters)
.groupBy(new Func1<Event, String>() {

@Override
public String call(Event event) {
return event.type;
}

}).take(1)
.toBlockingObservable().forEach(new Action1<GroupedObservable<String, Event>>() {

@Override
public void call(GroupedObservable<String, Event> g) {
System.out.println(g);
}

});

System.out.println("**** finished");
}

@Test
public void testTakeUnsubscribesOnFlatMapOfGroupBy() {
Observable.merge(
EventStream.getEventStream("HTTP-ClusterA", 50),
EventStream.getEventStream("HTTP-ClusterB", 20))
// group by type (2 clusters)
.groupBy(new Func1<Event, String>() {

@Override
public String call(Event event) {
return event.type;
}

})
.flatMap(new Func1<GroupedObservable<String, Event>, Observable<String>>() {

@Override
public Observable<String> call(GroupedObservable<String, Event> g) {
return g.map(new Func1<Event, String>() {

@Override
public String call(Event event) {
return event.instanceId + " - " + event.values.get("count200");
}
});
}

})
.take(20)
.toBlockingObservable().forEach(new Action1<String>() {

@Override
public void call(String v) {
System.out.println(v);
}

});

System.out.println("**** finished");
}
}
37 changes: 37 additions & 0 deletions rxjava-core/src/test/java/rx/ScanTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package rx;

import java.util.HashMap;
import java.util.Map;

import org.junit.Test;

import rx.EventStream.Event;
import rx.util.functions.Action1;
import rx.util.functions.Func2;

public class ScanTests {

@Test
public void testUnsubscribeScan() {

EventStream.getEventStream("HTTP-ClusterB", 20)
.scan(new HashMap<String, String>(), new Func2<Map<String, String>, Event, Map<String, String>>() {

@Override
public Map<String, String> call(Map<String, String> accum, Event perInstanceEvent) {
accum.put("instance", perInstanceEvent.instanceId);
return accum;
}

})
.take(10)
.toBlockingObservable().forEach(new Action1<Map<String, String>>() {

@Override
public void call(Map<String, String> v) {
System.out.println(v);
}

});
}
}
45 changes: 45 additions & 0 deletions rxjava-core/src/test/java/rx/ZipTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package rx;

import java.util.HashMap;
import java.util.Map;

import org.junit.Test;

import rx.CovarianceTest.CoolRating;
Expand All @@ -9,11 +12,53 @@
import rx.CovarianceTest.Movie;
import rx.CovarianceTest.Rating;
import rx.CovarianceTest.Result;
import rx.EventStream.Event;
import rx.observables.GroupedObservable;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

public class ZipTests {

@Test
public void testZipObservableOfObservables() {
EventStream.getEventStream("HTTP-ClusterB", 20)
.groupBy(new Func1<Event, String>() {

@Override
public String call(Event e) {
return e.instanceId;
}

// now we have streams of cluster+instanceId
}).flatMap(new Func1<GroupedObservable<String, Event>, Observable<Map<String, String>>>() {

@Override
public Observable<Map<String, String>> call(final GroupedObservable<String, Event> ge) {
return ge.scan(new HashMap<String, String>(), new Func2<Map<String, String>, Event, Map<String, String>>() {

@Override
public Map<String, String> call(Map<String, String> accum, Event perInstanceEvent) {
accum.put("instance", ge.getKey());
return accum;
}

});
}
})
.take(10)
.toBlockingObservable().forEach(new Action1<Map<String, String>>() {

@Override
public void call(Map<String, String> v) {
System.out.println(v);
}

});

System.out.println("**** finished");
}

/**
* This won't compile if super/extends isn't done correctly on generics
*/
Expand Down

0 comments on commit 4cc3739

Please sign in to comment.