Skip to content

Commit

Permalink
Merge pull request #2934 from akarnokd/OperatorPublishRaceFix
Browse files Browse the repository at this point in the history
Fix termination race condition in OperatorPublish.dispatch
  • Loading branch information
akarnokd committed May 6, 2015
2 parents 2532484 + b28007e commit fe66cc9
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/main/java/rx/internal/operators/OperatorPublish.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,15 @@ void dispatch() {
boolean skipFinal = false;
try {
for (;;) {
/*
* We need to read terminalEvent before checking the queue for emptyness because
* all enqueue happens before setting the terminal event.
* If it were the other way around, when the emission is paused between
* checking isEmpty and checking terminalEvent, some other thread might
* have produced elements and set the terminalEvent and we'd quit emitting
* prematurely.
*/
Object term = terminalEvent;
/*
* See if the queue is empty; since we need this information multiple
* times later on, we read it one.
Expand All @@ -468,7 +477,7 @@ void dispatch() {
// if the queue is empty and the terminal event was received, quit
// and don't bother restoring emitting to false: no further activity is
// possible at this point
if (checkTerminated(terminalEvent, empty)) {
if (checkTerminated(term, empty)) {
skipFinal = true;
return;
}
Expand Down Expand Up @@ -508,10 +517,11 @@ void dispatch() {
// it may happen everyone has unsubscribed between here and producers.get()
// or we have no subscribers at all to begin with
if (len == unsubscribed) {
term = terminalEvent;
// so let's consume a value from the queue
Object v = queue.poll();
// or terminate if there was a terminal event and the queue is empty
if (checkTerminated(terminalEvent, v == null)) {
if (checkTerminated(term, v == null)) {
skipFinal = true;
return;
}
Expand Down Expand Up @@ -748,4 +758,4 @@ public void unsubscribe() {
}
}
}
}
}

0 comments on commit fe66cc9

Please sign in to comment.