Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve OperatorSerializeTest.testMultiThreadedWithNPEinMiddle #2985

Merged
merged 1 commit into from
May 27, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 36 additions & 27 deletions src/test/java/rx/internal/operators/OperatorSerializeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,31 +120,38 @@ public void testMultiThreadedWithNPE() {

@Test
public void testMultiThreadedWithNPEinMiddle() {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
Observable<String> w = Observable.create(onSubscribe);

BusyObserver busyobserver = new BusyObserver();

w.serialize().subscribe(busyobserver);
onSubscribe.waitToFinish();

System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// this should not be the full number of items since the error should stop it before it completes all 9
System.out.println("onNext count: " + busyobserver.onNextCount.get());
assertTrue(busyobserver.onNextCount.get() < 9);
assertTrue(busyobserver.onError);
// no onCompleted because onError was invoked
assertFalse(busyobserver.onCompleted);
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
// so commenting out for now as this is not a critical thing to test here
// verify(s, times(1)).unsubscribe();

// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busyobserver.maxConcurrentThreads.get());
boolean lessThan9 = false;
for (int i = 0; i < 3; i++) {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
Observable<String> w = Observable.create(onSubscribe);

BusyObserver busyobserver = new BusyObserver();

w.serialize().subscribe(busyobserver);
onSubscribe.waitToFinish();

System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// this should not always be the full number of items since the error should (very often)
// stop it before it completes all 9
System.out.println("onNext count: " + busyobserver.onNextCount.get());
if (busyobserver.onNextCount.get() < 9) {
lessThan9 = true;
}
assertTrue(busyobserver.onError);
// no onCompleted because onError was invoked
assertFalse(busyobserver.onCompleted);
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
// so commenting out for now as this is not a critical thing to test here
// verify(s, times(1)).unsubscribe();

// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busyobserver.maxConcurrentThreads.get());
}
assertTrue(lessThan9);
}

/**
* A thread that will pass data to onNext
*/
Expand Down Expand Up @@ -276,6 +283,7 @@ public TestMultiThreadedObservable(String... values) {
@Override
public void call(final Subscriber<? super String> observer) {
System.out.println("TestMultiThreadedObservable subscribed to ...");
final NullPointerException npe = new NullPointerException();
t = new Thread(new Runnable() {

@Override
Expand All @@ -290,11 +298,12 @@ public void run() {
threadsRunning.incrementAndGet();
try {
// perform onNext call
System.out.println("TestMultiThreadedObservable onNext: " + s);
if (s == null) {
System.out.println("TestMultiThreadedObservable onNext: null");
// force an error
throw new NullPointerException();
}
throw npe;
} else
System.out.println("TestMultiThreadedObservable onNext: " + s);
observer.onNext(s);
// capture 'maxThreads'
int concurrentThreads = threadsRunning.get();
Expand Down