From c40a06f331cfeea8ccca987edaff54a294abc07a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Thu, 15 Feb 2018 19:49:26 +0100 Subject: [PATCH] Fix a race condition that may make OperatorMaterialize emit too many terminal notifications (#5850) * Fix a race condition that may make OperatorMaterialize emit the terminal notification more than once The guards in `OperatorMaterialize.ParentSubscriber#drain` were never working, because `busy` was actually never set to true. Therefore it was possible that the `drain` loop was executed by more than one thread concurrently, which could led to undefined behavior. This fix sets `busy` to true at the entry of `drain`. * Add unit test for race in OperatorMaterialize * Set sudo required in travis config --- .travis.yml | 2 +- .../operators/OperatorMaterialize.java | 1 + .../operators/OperatorMaterializeTest.java | 30 +++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 751937fa52..e9ae55c471 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: java jdk: - oraclejdk8 -sudo: false +sudo: required # as per http://blog.travis-ci.com/2014-12-17-faster-builds-with-container-based-infrastructure/ git: diff --git a/src/main/java/rx/internal/operators/OperatorMaterialize.java b/src/main/java/rx/internal/operators/OperatorMaterialize.java index c1d4a3b65e..ce9e1be604 100644 --- a/src/main/java/rx/internal/operators/OperatorMaterialize.java +++ b/src/main/java/rx/internal/operators/OperatorMaterialize.java @@ -134,6 +134,7 @@ private void drain() { missed = true; return; } + busy = true; } // drain loop final AtomicLong localRequested = this.requested; diff --git a/src/test/java/rx/internal/operators/OperatorMaterializeTest.java b/src/test/java/rx/internal/operators/OperatorMaterializeTest.java index 437593d313..a55758d25c 100644 --- a/src/test/java/rx/internal/operators/OperatorMaterializeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMaterializeTest.java @@ -29,9 +29,12 @@ import rx.Notification; import rx.Observable; import rx.Subscriber; +import rx.TestUtil; +import rx.functions.Action0; import rx.functions.Action1; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorMaterializeTest { @@ -201,6 +204,33 @@ public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNoti ts.assertUnsubscribed(); } + @Test + public void testConcurrency() { + for (int i = 0; i < 1000; i++) { + final TestSubscriber> ts = TestSubscriber.create(0); + final PublishSubject ps = PublishSubject.create(); + Action0 publishAction = new Action0() { + @Override + public void call() { + ps.onCompleted(); + } + }; + + Action0 requestAction = new Action0() { + @Override + public void call() { + ts.requestMore(1); + } + }; + + ps.materialize().subscribe(ts); + TestUtil.race(publishAction, requestAction); + ts.assertValueCount(1); + ts.assertTerminalEvent(); + ts.assertNoErrors(); + } + } + private static class TestObserver extends Subscriber> { boolean onCompleted;