diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java index 969f0df8b9..f5a459ef5d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java @@ -88,14 +88,13 @@ public void onNext(final Observable o) { if (checkNeedBuffer(o)) { // use buffering (possibly blocking) for a possibly synchronous subscribe final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber); - o.subscribe(bus); subscriber.add(scheduler.schedule(new Action1() { @Override public void call(final Inner inner) { bus.enterPassthroughMode(); } })); - return; + o.subscribe(bus); } else { // no buffering (async subscribe) subscriber.add(scheduler.schedule(new Action1() { diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java index 9b55256055..7a38bcd8da 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java @@ -399,5 +399,18 @@ public void call(Subscriber sub) { ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); assertEquals(10, count.get()); } - + @Test(timeout = 2000) + public void testNoDeadlock() { + List data = Arrays.asList(1, 2, 3, 4, 5); + Observable source = Observable.from(data); + + Observable result = source.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 1)); + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(data); + } }