diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableToList.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableToList.java index 8ed41afff9..75ef64ce2f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableToList.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableToList.java @@ -69,7 +69,10 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - value.add(t); + U v = value; + if (v != null) { + v.add(t); + } } @Override diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java index b8557827e4..8c97ae5475 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -23,7 +24,6 @@ import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.Flowable; import io.reactivex.exceptions.TestException; import io.reactivex.observers.TestObserver; import io.reactivex.processors.PublishProcessor; @@ -389,4 +389,83 @@ public Collection call() throws Exception { .assertFailure(NullPointerException.class) .assertErrorMessage("The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources."); } + + @Test + public void onNextCancelRace() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final TestObserver> ts = pp.toList().test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + + } + + @Test + public void onNextCancelRaceFlowable() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final TestSubscriber> ts = pp.toList().toFlowable().test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + + } + + @Test + public void onCompleteCancelRaceFlowable() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final TestSubscriber> ts = pp.toList().toFlowable().test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + if (ts.valueCount() != 0) { + ts.assertValue(Arrays.asList(1)) + .assertNoErrors(); + } + } + + } }