Skip to content

Commit

Permalink
2.x: fix Flowable.toList() onNext/cancel race (#5247)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 31, 2017
1 parent 7c95808 commit 6c8b0ef
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
value.add(t);
U v = value;
if (v != null) {
v.add(t);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import java.util.*;
Expand All @@ -23,7 +24,6 @@
import org.reactivestreams.Subscriber;

import io.reactivex.*;
import io.reactivex.Flowable;
import io.reactivex.exceptions.TestException;
import io.reactivex.observers.TestObserver;
import io.reactivex.processors.PublishProcessor;
Expand Down Expand Up @@ -389,4 +389,83 @@ public Collection<Integer> call() throws Exception {
.assertFailure(NullPointerException.class)
.assertErrorMessage("The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
}

@Test
public void onNextCancelRace() {
for (int i = 0; i < 1000; i++) {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final TestObserver<List<Integer>> ts = pp.toList().test();

Runnable r1 = new Runnable() {
@Override
public void run() {
pp.onNext(1);
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};

TestHelper.race(r1, r2);
}

}

@Test
public void onNextCancelRaceFlowable() {
for (int i = 0; i < 1000; i++) {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final TestSubscriber<List<Integer>> ts = pp.toList().toFlowable().test();

Runnable r1 = new Runnable() {
@Override
public void run() {
pp.onNext(1);
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};

TestHelper.race(r1, r2);
}

}

@Test
public void onCompleteCancelRaceFlowable() {
for (int i = 0; i < 1000; i++) {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final TestSubscriber<List<Integer>> ts = pp.toList().toFlowable().test();

pp.onNext(1);

Runnable r1 = new Runnable() {
@Override
public void run() {
pp.onComplete();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};

TestHelper.race(r1, r2);

if (ts.valueCount() != 0) {
ts.assertValue(Arrays.asList(1))
.assertNoErrors();
}
}

}
}

0 comments on commit 6c8b0ef

Please sign in to comment.