diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java index c4860532b1..e0dc7be452 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java @@ -409,7 +409,7 @@ public void onError(Throwable e) { boolean sendOnComplete = false; synchronized (this) { wip--; - if (wip == 0 && completed) { + if ((wip == 0 && completed) || (wip < 0)) { sendOnComplete = true; } } diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java index 3063463cd5..f5163bdb8e 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java @@ -46,36 +46,6 @@ public void before() { MockitoAnnotations.initMocks(this); } - @Test(timeout=1000L) - public void testSynchronousError() { - final Observable> o1 = Observable.error(new RuntimeException("unit test")); - - final CountDownLatch latch = new CountDownLatch(1); - Observable.mergeDelayError(o1).subscribe(new Subscriber() { - @Override - public void onCompleted() { - fail("Expected onError path"); - } - - @Override - public void onError(Throwable e) { - latch.countDown(); - } - - @Override - public void onNext(String s) { - fail("Expected onError path"); - } - }); - - try { - latch.await(); - } catch (InterruptedException ex) { - fail("interrupted"); - } - } - - @Test public void testErrorDelayed1() { final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called @@ -313,6 +283,35 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } + @Test(timeout=1000L) + public void testSynchronousError() { + final Observable> o1 = Observable.error(new RuntimeException("unit test")); + + final CountDownLatch latch = new CountDownLatch(1); + Observable.mergeDelayError(o1).subscribe(new Subscriber() { + @Override + public void onCompleted() { + fail("Expected onError path"); + } + + @Override + public void onError(Throwable e) { + latch.countDown(); + } + + @Override + public void onNext(String s) { + fail("Expected onError path"); + } + }); + + try { + latch.await(); + } catch (InterruptedException ex) { + fail("interrupted"); + } + } + private static class TestSynchronousObservable implements Observable.OnSubscribe { @Override