Skip to content

Commit

Permalink
Fix a race condition that may make OperatorMaterialize emit too many …
Browse files Browse the repository at this point in the history
…terminal notifications (#5850)

* Fix a race condition that may make OperatorMaterialize emit the
terminal notification more than once

The guards in `OperatorMaterialize.ParentSubscriber#drain` were never
working, because `busy` was actually never set to true.
Therefore it was possible that the `drain` loop was executed by
more than one thread concurrently, which could led to undefined
behavior.

This fix sets `busy` to true at the entry of `drain`.

* Add unit test for race in OperatorMaterialize

* Set sudo required in travis config
  • Loading branch information
pkolaczk authored and akarnokd committed Feb 15, 2018
1 parent 2ba8bb2 commit c40a06f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: java
jdk:
- oraclejdk8
sudo: false
sudo: required
# as per http://blog.travis-ci.com/2014-12-17-faster-builds-with-container-based-infrastructure/

git:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private void drain() {
missed = true;
return;
}
busy = true;
}
// drain loop
final AtomicLong localRequested = this.requested;
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/rx/internal/operators/OperatorMaterializeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import rx.Notification;
import rx.Observable;
import rx.Subscriber;
import rx.TestUtil;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class OperatorMaterializeTest {

Expand Down Expand Up @@ -201,6 +204,33 @@ public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNoti
ts.assertUnsubscribed();
}

@Test
public void testConcurrency() {
for (int i = 0; i < 1000; i++) {
final TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
final PublishSubject<Integer> ps = PublishSubject.create();
Action0 publishAction = new Action0() {
@Override
public void call() {
ps.onCompleted();
}
};

Action0 requestAction = new Action0() {
@Override
public void call() {
ts.requestMore(1);
}
};

ps.materialize().subscribe(ts);
TestUtil.race(publishAction, requestAction);
ts.assertValueCount(1);
ts.assertTerminalEvent();
ts.assertNoErrors();
}
}

private static class TestObserver extends Subscriber<Notification<String>> {

boolean onCompleted;
Expand Down

0 comments on commit c40a06f

Please sign in to comment.