diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index f270b08eac..9f3f36390b 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -218,15 +218,17 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable t) { T value = t.get(); if (getEmitLock()) { + boolean moreToDrain; try { actual.onNext(value); - return; } finally { - if (releaseEmitLock()) { - drainQueuesIfNeeded(); - } - request(1); + moreToDrain = releaseEmitLock(); } + if (moreToDrain) { + drainQueuesIfNeeded(); + } + request(1); + return; } else { initScalarValueQueueIfNeeded(); try { @@ -241,6 +243,8 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchro private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable t) { if (getEmitLock()) { boolean emitted = false; + boolean moreToDrain; + boolean isReturn = false; try { long r = mergeProducer.requested; if (r > 0) { @@ -248,15 +252,19 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou actual.onNext(t.get()); MergeProducer.REQUESTED.decrementAndGet(mergeProducer); // we handle this Observable without ever incrementing the wip or touching other machinery so just return here - return; + isReturn = true; } } finally { - if (releaseEmitLock()) { - drainQueuesIfNeeded(); - } - if (emitted) { - request(1); - } + moreToDrain = releaseEmitLock(); + } + if (moreToDrain) { + drainQueuesIfNeeded(); + } + if (emitted) { + request(1); + } + if (isReturn) { + return; } } @@ -301,20 +309,21 @@ private boolean drainQueuesIfNeeded() { while (true) { if (getEmitLock()) { int emitted = 0; + boolean moreToDrain; try { emitted = drainScalarValueQueue(); drainChildrenQueues(); } finally { - boolean moreToDrain = releaseEmitLock(); - // request outside of lock - if (emitted > 0) { - request(emitted); - } - if (!moreToDrain) { - return true; - } - // otherwise we'll loop and get whatever was added + moreToDrain = releaseEmitLock(); + } + // request outside of lock + if (emitted > 0) { + request(emitted); + } + if (!moreToDrain) { + return true; } + // otherwise we'll loop and get whatever was added } else { return false; }