Skip to content

Commit

Permalink
2.x: fix replay() cancel/dispose NPE (#5064)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Feb 3, 2017
1 parent a636b87 commit 3852b1b
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ public void connect(Consumer<? super Disposable> connection) {
}

@SuppressWarnings("rawtypes")
static final class ReplaySubscriber<T> implements Subscriber<T>, Disposable {
static final class ReplaySubscriber<T>
extends AtomicReference<Subscription>
implements Subscriber<T>, Disposable {
private static final long serialVersionUID = 7224554242710036740L;
/** Holds notifications from upstream. */
final ReplayBuffer<T> buffer;
/** Indicates this Subscriber received a terminal event. */
Expand All @@ -361,8 +364,6 @@ static final class ReplaySubscriber<T> implements Subscriber<T>, Disposable {
long maxChildRequested;
/** Counts the outstanding upstream requests until the producer arrives. */
long maxUpstreamRequested;
/** The upstream producer. */
volatile Subscription subscription;

@SuppressWarnings("unchecked")
ReplaySubscriber(ReplayBuffer<T> buffer) {
Expand All @@ -386,7 +387,7 @@ public void dispose() {
// current.compareAndSet(ReplaySubscriber.this, null);
// we don't care if it fails because it means the current has
// been replaced in the meantime
subscription.cancel();
SubscriptionHelper.cancel(this);
}

/**
Expand Down Expand Up @@ -476,8 +477,7 @@ void remove(InnerSubscription<T> p) {

@Override
public void onSubscribe(Subscription p) {
if (SubscriptionHelper.validate(subscription, p)) {
subscription = p;
if (SubscriptionHelper.setOnce(this, p)) {
manageRequests();
for (InnerSubscription<T> rp : subscribers.get()) {
buffer.replay(rp);
Expand Down Expand Up @@ -548,7 +548,7 @@ void manageRequests() {
}

long ur = maxUpstreamRequested;
Subscription p = subscription;
Subscription p = get();

long diff = maxTotalRequests - ri;
if (diff != 0L) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ public void connect(Consumer<? super Disposable> connection) {
}

@SuppressWarnings("rawtypes")
static final class ReplayObserver<T> implements Observer<T>, Disposable {
static final class ReplayObserver<T>
extends AtomicReference<Disposable>
implements Observer<T>, Disposable {
private static final long serialVersionUID = -533785617179540163L;
/** Holds notifications from upstream. */
final ReplayBuffer<T> buffer;
/** Indicates this Observer received a terminal event. */
Expand All @@ -335,9 +338,6 @@ static final class ReplayObserver<T> implements Observer<T>, Disposable {
*/
final AtomicBoolean shouldConnect;

/** The upstream producer. */
volatile Disposable subscription;

ReplayObserver(ReplayBuffer<T> buffer) {
this.buffer = buffer;

Expand All @@ -358,7 +358,7 @@ public void dispose() {
// current.compareAndSet(ReplayObserver.this, null);
// we don't care if it fails because it means the current has
// been replaced in the meantime
subscription.dispose();
DisposableHelper.dispose(this);
}

/**
Expand Down Expand Up @@ -444,8 +444,7 @@ void remove(InnerDisposable<T> producer) {

@Override
public void onSubscribe(Disposable p) {
if (DisposableHelper.validate(this.subscription, p)) {
subscription = p;
if (DisposableHelper.setOnce(this, p)) {
replay();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,4 +873,25 @@ public void subscribe(FlowableEmitter<Object> s) throws Exception {
.test(0L)
.assertFailure(MissingBackpressureException.class);
}

@Test
public void delayedUpstreamOnSubscribe() {
final Subscriber<?>[] sub = { null };

new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
sub[0] = s;
}
}
.publish()
.connect()
.dispose();

BooleanSubscription bs = new BooleanSubscription();

sub[0].onSubscribe(bs);

assertTrue(bs.isCancelled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1711,4 +1711,24 @@ public void testSizedTruncation() {
Assert.assertFalse(buf.hasError());
}

@Test
public void delayedUpstreamOnSubscribe() {
final Subscriber<?>[] sub = { null };

new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
sub[0] = s;
}
}
.replay()
.connect()
.dispose();

BooleanSubscription bs = new BooleanSubscription();

sub[0].onSubscribe(bs);

assertTrue(bs.isCancelled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,4 +699,25 @@ public ObservableSource<Integer> apply(Observable<Integer> v) throws Exception {

assertFalse(ps.hasObservers());
}

@Test
public void delayedUpstreamOnSubscribe() {
final Observer<?>[] sub = { null };

new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> s) {
sub[0] = s;
}
}
.publish()
.connect()
.dispose();

Disposable bs = Disposables.empty();

sub[0].onSubscribe(bs);

assertTrue(bs.isDisposed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.annotations.NonNull;
import org.junit.*;
import org.mockito.InOrder;

import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.Scheduler.Worker;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
Expand Down Expand Up @@ -1490,4 +1490,25 @@ public void onNext(Integer t) {

to.assertValues(1);
}

@Test
public void delayedUpstreamOnSubscribe() {
final Observer<?>[] sub = { null };

new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> s) {
sub[0] = s;
}
}
.replay()
.connect()
.dispose();

Disposable bs = Disposables.empty();

sub[0].onSubscribe(bs);

assertTrue(bs.isDisposed());
}
}

0 comments on commit 3852b1b

Please sign in to comment.