From ed392d7a0d77a22d4ad3b50bcc8e7b8a462eced8 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 27 Mar 2014 11:04:14 -0700 Subject: [PATCH 1/5] New Implementation of SerializedObserver #### JMH Benchmarks 0.17.3 Benchmark (size) Mode Samples Mean Mean error Units r.operators.OperatorSerializePerf.noSerializationSingleThreaded 1024 avgt 5 45.504 1.710 ns/op r.operators.OperatorSerializePerf.noSerializationSingleThreaded 1048576 avgt 5 58.600 5.647 ns/op r.operators.OperatorSerializePerf.serializedSingleStream 1024 avgt 5 68.610 4.596 ns/op r.operators.OperatorSerializePerf.serializedSingleStream 1048576 avgt 5 71.313 2.318 ns/op r.operators.OperatorSerializePerf.synchronizedSingleStream 1024 avgt 5 73.322 3.666 ns/op r.operators.OperatorSerializePerf.synchronizedSingleStream 1048576 avgt 5 76.518 1.355 ns/op 0.17.2 Benchmark (size) Mode Samples Mean Mean error Units r.operators.OperatorSerializePerf.noSerializationSingleThreaded 1024 avgt 5 45.790 1.184 ns/op r.operators.OperatorSerializePerf.noSerializationSingleThreaded 1048576 avgt 5 58.518 3.788 ns/op r.operators.OperatorSerializePerf.serializedSingleStream 1024 avgt 5 72.665 7.851 ns/op r.operators.OperatorSerializePerf.serializedSingleStream 1048576 avgt 5 74.788 2.946 ns/op r.operators.OperatorSerializePerf.synchronizedSingleStream 1024 avgt 5 73.661 3.499 ns/op r.operators.OperatorSerializePerf.synchronizedSingleStream 1048576 avgt 5 78.386 5.036 ns/op #### Manual Benchmarks /** * 0.17.3: * * Run: 10 - 9,746,505 ops/sec * Run: 11 - 9,956,019 ops/sec * Run: 12 - 10,053,770 ops/sec * Run: 13 - 10,076,958 ops/sec * Run: 14 - 9,983,319 ops/sec * * 0.17.2: * * Run: 10 - 9,851,999 ops/sec * Run: 11 - 9,726,975 ops/sec * Run: 12 - 9,719,762 ops/sec * Run: 13 - 9,668,141 ops/sec * Run: 14 - 9,799,700 ops/sec * * @param input */ public void serializedSingleStream(Input input) { for (int i = 0; i < reps; i++) { input.observable.serialize().subscribe(input.subscriber); } } --- .../java/rx/observers/SerializedObserver.java | 194 ++++++++++-------- .../OperatorSerializePerformance.java | 114 +++++++++- 2 files changed, 220 insertions(+), 88 deletions(-) diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java index 6be8d7a23e..0d397fe73f 100644 --- a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java +++ b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java @@ -1,9 +1,6 @@ package rx.observers; -import java.util.ArrayList; - import rx.Observer; -import rx.operators.NotificationLite; /** * Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError. @@ -22,8 +19,40 @@ public class SerializedObserver implements Observer { private boolean emitting = false; private boolean terminated = false; - private ArrayList queue = new ArrayList(); - private NotificationLite on = NotificationLite.instance(); + private FastList queue; + + private static final int MAX_DRAIN_ITERATION = 1; + private static final Object NULL_SENTINEL = new Object(); + private static final Object COMPLETE_SENTINEL = new Object(); + + static final class FastList { + Object[] array; + int size; + + public void add(Object o) { + int s = size; + Object[] a = array; + if (a == null) { + a = new Object[16]; + array = a; + } else if (s == a.length) { + Object[] array2 = new Object[s + (s >> 2)]; + System.arraycopy(a, 0, array2, 0, s); + a = array2; + array = a; + } + a[s] = o; + size = s + 1; + } + } + + private static final class ErrorSentinel { + final Throwable e; + + ErrorSentinel(Throwable e) { + this.e = e; + } + } public SerializedObserver(Observer s) { this.actual = s; @@ -31,128 +60,119 @@ public SerializedObserver(Observer s) { @Override public void onCompleted() { - boolean canEmit = false; - ArrayList list = null; + FastList list; synchronized (this) { if (terminated) { return; } terminated = true; - if (!emitting) { - // emit immediately - emitting = true; - canEmit = true; - if (queue.size() > 0) { - list = queue; // copy reference - queue = new ArrayList(); // new version; - } - } else { - // someone else is already emitting so just queue it - queue.add(on.completed()); - } - } - - if (canEmit) { - // we won the right to emit - try { - drainQueue(list); - actual.onCompleted(); - } finally { - synchronized (this) { - emitting = false; + if (emitting) { + if (queue == null) { + queue = new FastList(); } + queue.add(COMPLETE_SENTINEL); + return; } + emitting = true; + list = queue; + queue = null; } + drainQueue(list); + actual.onCompleted(); } @Override public void onError(final Throwable e) { - boolean canEmit = false; - ArrayList list = null; + FastList list; synchronized (this) { if (terminated) { return; } terminated = true; - if (!emitting) { - // emit immediately - emitting = true; - canEmit = true; - if (queue.size() > 0) { - list = queue; // copy reference - queue = new ArrayList(); // new version; - } - } else { - // someone else is already emitting so just queue it ... after eliminating the queue to shortcut - queue.clear(); - queue.add(on.error(e)); - } - } - if (canEmit) { - // we won the right to emit - try { - drainQueue(list); - actual.onError(e); - } finally { - synchronized (this) { - emitting = false; + if (emitting) { + if (queue == null) { + queue = new FastList(); } + queue.add(new ErrorSentinel(e)); + return; } + emitting = true; + list = queue; + queue = null; } + drainQueue(list); + actual.onError(e); } @Override public void onNext(T t) { - boolean canEmit = false; - ArrayList list = null; + FastList list; + synchronized (this) { if (terminated) { return; } - if (!emitting) { - // emit immediately - emitting = true; - canEmit = true; - if (queue.size() > 0) { - list = queue; // copy reference - queue = new ArrayList(); // new version; + if (emitting) { + if (queue == null) { + queue = new FastList(); } - } else { - // someone else is already emitting so just queue it - queue.add(on.next(t)); + queue.add(t != null ? t : NULL_SENTINEL); + return; } + emitting = true; + list = queue; + queue = null; } - if (canEmit) { - // we won the right to emit - try { + + try { + int iter = MAX_DRAIN_ITERATION; + do { drainQueue(list); - actual.onNext(t); - } finally { - synchronized (this) { - if (terminated) { - list = queue; // copy reference - queue = new ArrayList(); // new version; - } else { - // release this thread - emitting = false; - canEmit = false; + if (iter == MAX_DRAIN_ITERATION) { + actual.onNext(t); + } + --iter; + if (iter > 0) { + synchronized (this) { + list = queue; + queue = null; + } + if (list == null) { + break; } } + } while (iter > 0); + } finally { + synchronized (this) { + if (terminated) { + list = queue; + queue = null; + } else { + emitting = false; + list = null; + } } - } - - // if terminated this will still be true so let's drain the rest of the queue - if (canEmit) { drainQueue(list); } } - public void drainQueue(ArrayList list) { - if (list == null || list.size() == 0) { + void drainQueue(FastList list) { + if (list == null || list.size == 0) { return; } - for (Object v : list) { - on.accept(actual, v); + for (Object v : list.array) { + if (v == null) { + break; + } + if (v == NULL_SENTINEL) { + actual.onNext(null); + } else if (v == COMPLETE_SENTINEL) { + actual.onCompleted(); + } else if (v.getClass() == ErrorSentinel.class) { + actual.onError(((ErrorSentinel) v).e); + } else { + actual.onNext((T) v); + } } } -} +} \ No newline at end of file diff --git a/rxjava-core/src/perf/java/rx/archive/operators/OperatorSerializePerformance.java b/rxjava-core/src/perf/java/rx/archive/operators/OperatorSerializePerformance.java index 366d23d092..7eb94edd3b 100644 --- a/rxjava-core/src/perf/java/rx/archive/operators/OperatorSerializePerformance.java +++ b/rxjava-core/src/perf/java/rx/archive/operators/OperatorSerializePerformance.java @@ -3,14 +3,18 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.logic.BlackHole; + import rx.Observable; import rx.Observable.OnSubscribe; +import rx.Observer; import rx.Subscriber; import rx.archive.perf.AbstractPerformanceTester; import rx.archive.perf.IntegerSumObserver; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; public class OperatorSerializePerformance extends AbstractPerformanceTester { @@ -26,12 +30,17 @@ public class OperatorSerializePerformance extends AbstractPerformanceTester { public static void main(String args[]) { final OperatorSerializePerformance spt = new OperatorSerializePerformance(); + final Input input = new Input(); + input.setup(); try { spt.runTest(new Action0() { @Override public void call() { - spt.timeTwoStreams(); + // spt.noSerializationSingleThreaded(input); + spt.serializedSingleStream(input); + // spt.synchronizedSingleStream(input); + // spt.timeTwoStreams(); // spt.timeSingleStream(); // spt.timeTwoStreamsIntervals(); } @@ -42,6 +51,61 @@ public void call() { } + /** + * Run: 10 - 12,186,982 ops/sec + * Run: 11 - 10,236,722 ops/sec + * Run: 12 - 11,377,690 ops/sec + * Run: 13 - 10,876,358 ops/sec + * Run: 14 - 11,383,619 ops/sec + * + * @param input + */ + public void noSerializationSingleThreaded(Input input) { + for (int i = 0; i < reps; i++) { + input.observable.subscribe(input.subscriber); + } + } + + /** + * 0.17.3: + * + * Run: 10 - 9,746,505 ops/sec + * Run: 11 - 9,956,019 ops/sec + * Run: 12 - 10,053,770 ops/sec + * Run: 13 - 10,076,958 ops/sec + * Run: 14 - 9,983,319 ops/sec + * + * 0.17.2: + * + * Run: 10 - 9,851,999 ops/sec + * Run: 11 - 9,726,975 ops/sec + * Run: 12 - 9,719,762 ops/sec + * Run: 13 - 9,668,141 ops/sec + * Run: 14 - 9,799,700 ops/sec + * + * @param input + */ + public void serializedSingleStream(Input input) { + for (int i = 0; i < reps; i++) { + input.observable.serialize().subscribe(input.subscriber); + } + } + + /** + * Run: 10 - 9,475,925 ops/sec + * Run: 11 - 9,501,341 ops/sec + * Run: 12 - 9,550,495 ops/sec + * Run: 13 - 9,510,303 ops/sec + * Run: 14 - 9,690,300 ops/sec + * + * @param input + */ + public void synchronizedSingleStream(Input input) { + for (int i = 0; i < reps; i++) { + input.observable.synchronize().subscribe(input.subscriber); + } + } + /** * 1 streams emitting in a tight loop. Testing for single-threaded overhead. * @@ -321,4 +385,52 @@ public void call(Integer t1) { return o.sum; } + public static class Input { + + public int size = 1048576; + + public Observable observable; + public TestSubscriber subscriber; + + private CountDownLatch latch; + + public void setup() { + observable = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber o) { + for (int value = 0; value < size; value++) { + if (o.isUnsubscribed()) + return; + o.onNext(value); + } + o.onCompleted(); + } + }); + + final BlackHole bh = new BlackHole(); + latch = new CountDownLatch(1); + + subscriber = new TestSubscriber(new Observer() { + @Override + public void onCompleted() { + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + + @Override + public void onNext(Integer value) { + bh.consume(value); + } + }); + + } + + public void awaitCompletion() throws InterruptedException { + latch.await(); + } + } } From 5b317ad827f624bed6cbf5f2e04df050ebbe01d0 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 28 Mar 2014 14:17:29 -0700 Subject: [PATCH 2/5] Update SerializedObserver to Not Allow Notification Delay Unit test showing delays. Fails when MAX_DRAIN_ITERATION set to 1, passes as currently configured. Added a thread starvation unit test and marked as ignored for now. Doesn't pass even with MAX_DRAIN_ITERATION set to 1. Probably needs backpressure solution. --- .../java/rx/observers/SerializedObserver.java | 2 +- .../rx/observers/SerializedObserverTest.java | 149 +++++++++++++++++- 2 files changed, 143 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java index 0d397fe73f..f4a21e06f9 100644 --- a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java +++ b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java @@ -21,7 +21,7 @@ public class SerializedObserver implements Observer { private boolean terminated = false; private FastList queue; - private static final int MAX_DRAIN_ITERATION = 1; + private static final int MAX_DRAIN_ITERATION = Integer.MAX_VALUE; private static final Object NULL_SENTINEL = new Object(); private static final Object COMPLETE_SENTINEL = new Object(); diff --git a/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java b/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java index e6043edd39..7a82e59c06 100644 --- a/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java +++ b/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java @@ -15,9 +15,15 @@ */ package rx.observers; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -28,14 +34,17 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; import rx.Subscriber; import rx.Subscription; +import rx.schedulers.Schedulers; public class SerializedObserverTest { @@ -265,6 +274,111 @@ public void runConcurrencyTest() { } } + @Test + public void testNotificationDelay() { + ExecutorService tp = Executors.newFixedThreadPool(2); + + TestSubscriber to = new TestSubscriber(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(String t) { + // force it to take time when delivering + // so the second thread will asynchronously enqueue + try { + Thread.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + }); + Observer o = serializedObserver(to); + + Future f1 = tp.submit(new OnNextThread(o, 1)); + Future f2 = tp.submit(new OnNextThread(o, 1)); + + waitOnThreads(f1, f2); + // not completed yet + + assertEquals(2, to.getOnNextEvents().size()); + System.out.println(to.getOnNextEvents()); + o.onCompleted(); + System.out.println(to.getOnNextEvents()); + } + + /** + * Demonstrates thread starvation problem. + * + * No solution on this for now. Trade-off in this direction as per https://github.com/Netflix/RxJava/issues/998#issuecomment-38959474 + * Probably need backpressure for this to work + * + * When using SynchronizedObserver we get this output: + * + * p1: 18 p2: 68 => should be close to each other unless we have thread starvation + * + * When using SerializedObserver we get: + * + * p1: 1 p2: 2445261 => should be close to each other unless we have thread starvation + * + * This demonstrates how SynchronizedObserver balances back and forth better, and blocks emission. + * The real issue in this example is the async buffer-bloat, so we need backpressure. + * + * + * @throws InterruptedException + */ + @Ignore + @Test + public void testThreadStarvation() throws InterruptedException { + + TestSubscriber to = new TestSubscriber(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(String t) { + // force it to take time when delivering + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + + }); + Observer o = serializedObserver(to); + + AtomicInteger p1 = new AtomicInteger(); + AtomicInteger p2 = new AtomicInteger(); + + Subscription s1 = infinite(p1).subscribe(o); + Subscription s2 = infinite(p2).subscribe(o); + + Thread.sleep(100); + + System.out.println("p1: " + p1.get() + " p2: " + p2.get() + " => should be close to each other unless we have thread starvation"); + assertEquals(p1.get(), p2.get(), 10000); // fairly distributed within 10000 of each other + + s1.unsubscribe(); + s2.unsubscribe(); + } + private static void waitOnThreads(Future... futures) { for (Future f : futures) { try { @@ -276,23 +390,44 @@ private static void waitOnThreads(Future... futures) { } } + private static Observable infinite(final AtomicInteger produced) { + return Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber s) { + while (!s.isUnsubscribed()) { + s.onNext("onNext"); + produced.incrementAndGet(); + } + } + + }).subscribeOn(Schedulers.newThread()); + } + /** * A thread that will pass data to onNext */ public static class OnNextThread implements Runnable { - private final Observer Observer; + private final Observer observer; private final int numStringsToSend; + final AtomicInteger produced; - OnNextThread(Observer Observer, int numStringsToSend) { - this.Observer = Observer; + OnNextThread(Observer observer, int numStringsToSend, AtomicInteger produced) { + this.observer = observer; this.numStringsToSend = numStringsToSend; + this.produced = produced; + } + + OnNextThread(Observer observer, int numStringsToSend) { + this(observer, numStringsToSend, new AtomicInteger()); } @Override public void run() { for (int i = 0; i < numStringsToSend; i++) { - Observer.onNext(Thread.currentThread().getId() + "-" + i); + observer.onNext(Thread.currentThread().getId() + "-" + i); + produced.incrementAndGet(); } } } From 9791c2d1e3ad84808d470f584eb4a45abf4c5cb9 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 1 Apr 2014 13:18:25 -0700 Subject: [PATCH 3/5] Use latches instead of sleep for unit test As per suggestion at https://github.com/benjchristensen/RxJava/commit/5b317ad827f624bed6cbf5f2e04df050ebbe01d0#commitcomment-5839773 --- .../rx/observers/SerializedObserverTest.java | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java b/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java index 7a82e59c06..172c996ee0 100644 --- a/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java +++ b/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java @@ -17,6 +17,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -274,10 +275,18 @@ public void runConcurrencyTest() { } } + /** + * Test that a notification does not get delayed in the queue waiting for the next event to push it through. + * + * @throws InterruptedException + */ @Test - public void testNotificationDelay() { + public void testNotificationDelay() throws InterruptedException { ExecutorService tp = Executors.newFixedThreadPool(2); + final CountDownLatch onNextCount = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); + TestSubscriber to = new TestSubscriber(new Observer() { @Override @@ -292,12 +301,12 @@ public void onError(Throwable e) { @Override public void onNext(String t) { - // force it to take time when delivering - // so the second thread will asynchronously enqueue + // know when the first thread gets in + onNextCount.countDown(); + // force it to take time when delivering so the second one is enqueued try { - Thread.sleep(50); + latch.await(); } catch (InterruptedException e) { - e.printStackTrace(); } } @@ -307,10 +316,23 @@ public void onNext(String t) { Future f1 = tp.submit(new OnNextThread(o, 1)); Future f2 = tp.submit(new OnNextThread(o, 1)); + onNextCount.await(); + + Thread t1 = to.getLastSeenThread(); + System.out.println("first onNext on thread: " + t1); + + latch.countDown(); + waitOnThreads(f1, f2); // not completed yet assertEquals(2, to.getOnNextEvents().size()); + + Thread t2 = to.getLastSeenThread(); + System.out.println("second onNext on thread: " + t2); + + assertSame(t1, t2); + System.out.println(to.getOnNextEvents()); o.onCompleted(); System.out.println(to.getOnNextEvents()); From abee009981a0610a6ee84c723bf96a59c247c27b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 1 Apr 2014 13:50:44 -0700 Subject: [PATCH 4/5] comments to walk through logic --- .../src/main/java/rx/observers/SerializedObserver.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java index f4a21e06f9..94a7845bcb 100644 --- a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java +++ b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java @@ -117,18 +117,23 @@ public void onNext(T t) { queue = new FastList(); } queue.add(t != null ? t : NULL_SENTINEL); + // another thread is emitting so we add to the queue and return return; } + // we can emit emitting = true; + // reference to the list to drain before emitting our value list = queue; queue = null; } + // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above try { int iter = MAX_DRAIN_ITERATION; do { drainQueue(list); if (iter == MAX_DRAIN_ITERATION) { + // after the first draining we emit our own value actual.onNext(t); } --iter; @@ -152,6 +157,7 @@ public void onNext(T t) { list = null; } } + // this will only drain if terminated (done here outside of synchronized block) drainQueue(list); } } From 9f6e9a4845cbb079584a634d1856bd0b6e4303da Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 1 Apr 2014 13:59:52 -0700 Subject: [PATCH 5/5] Fix unit test determinism --- .../rx/observers/SerializedObserverTest.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java b/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java index 172c996ee0..32f9f359e7 100644 --- a/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java +++ b/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java @@ -284,7 +284,8 @@ public void runConcurrencyTest() { public void testNotificationDelay() throws InterruptedException { ExecutorService tp = Executors.newFixedThreadPool(2); - final CountDownLatch onNextCount = new CountDownLatch(1); + final CountDownLatch firstOnNext = new CountDownLatch(1); + final CountDownLatch onNextCount = new CountDownLatch(2); final CountDownLatch latch = new CountDownLatch(1); TestSubscriber to = new TestSubscriber(new Observer() { @@ -301,8 +302,7 @@ public void onError(Throwable e) { @Override public void onNext(String t) { - // know when the first thread gets in - onNextCount.countDown(); + firstOnNext.countDown(); // force it to take time when delivering so the second one is enqueued try { latch.await(); @@ -313,10 +313,10 @@ public void onNext(String t) { }); Observer o = serializedObserver(to); - Future f1 = tp.submit(new OnNextThread(o, 1)); - Future f2 = tp.submit(new OnNextThread(o, 1)); + Future f1 = tp.submit(new OnNextThread(o, 1, onNextCount)); + Future f2 = tp.submit(new OnNextThread(o, 1, onNextCount)); - onNextCount.await(); + firstOnNext.await(); Thread t1 = to.getLastSeenThread(); System.out.println("first onNext on thread: " + t1); @@ -431,14 +431,24 @@ public void call(Subscriber s) { */ public static class OnNextThread implements Runnable { + private final CountDownLatch latch; private final Observer observer; private final int numStringsToSend; final AtomicInteger produced; + OnNextThread(Observer observer, int numStringsToSend, CountDownLatch latch) { + this(observer, numStringsToSend, new AtomicInteger(), latch); + } + OnNextThread(Observer observer, int numStringsToSend, AtomicInteger produced) { + this(observer, numStringsToSend, produced, null); + } + + OnNextThread(Observer observer, int numStringsToSend, AtomicInteger produced, CountDownLatch latch) { this.observer = observer; this.numStringsToSend = numStringsToSend; this.produced = produced; + this.latch = latch; } OnNextThread(Observer observer, int numStringsToSend) { @@ -449,6 +459,9 @@ public static class OnNextThread implements Runnable { public void run() { for (int i = 0; i < numStringsToSend; i++) { observer.onNext(Thread.currentThread().getId() + "-" + i); + if (latch != null) { + latch.countDown(); + } produced.incrementAndGet(); } }