diff --git a/src/main/java/rx/internal/operators/OperatorPublish.java b/src/main/java/rx/internal/operators/OperatorPublish.java index 0fca3d6c64..662d093ebf 100644 --- a/src/main/java/rx/internal/operators/OperatorPublish.java +++ b/src/main/java/rx/internal/operators/OperatorPublish.java @@ -458,6 +458,15 @@ void dispatch() { boolean skipFinal = false; try { for (;;) { + /* + * We need to read terminalEvent before checking the queue for emptyness because + * all enqueue happens before setting the terminal event. + * If it were the other way around, when the emission is paused between + * checking isEmpty and checking terminalEvent, some other thread might + * have produced elements and set the terminalEvent and we'd quit emitting + * prematurely. + */ + Object term = terminalEvent; /* * See if the queue is empty; since we need this information multiple * times later on, we read it one. @@ -468,7 +477,7 @@ void dispatch() { // if the queue is empty and the terminal event was received, quit // and don't bother restoring emitting to false: no further activity is // possible at this point - if (checkTerminated(terminalEvent, empty)) { + if (checkTerminated(term, empty)) { skipFinal = true; return; } @@ -508,10 +517,11 @@ void dispatch() { // it may happen everyone has unsubscribed between here and producers.get() // or we have no subscribers at all to begin with if (len == unsubscribed) { + term = terminalEvent; // so let's consume a value from the queue Object v = queue.poll(); // or terminate if there was a terminal event and the queue is empty - if (checkTerminated(terminalEvent, v == null)) { + if (checkTerminated(term, v == null)) { skipFinal = true; return; } @@ -748,4 +758,4 @@ public void unsubscribe() { } } } -} +} \ No newline at end of file