Skip to content

Commit

Permalink
2.x: Fix refCount() not resetting when cross-canceled (#6629)
Browse files Browse the repository at this point in the history
* 2.x: Fix refCount() not resetting when cross-canceled

* Undo test timeout comment
  • Loading branch information
akarnokd authored Aug 22, 2019
1 parent a9df239 commit fa406d1
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,42 @@ void cancel(RefConnection rc) {

void terminated(RefConnection rc) {
synchronized (this) {
if (connection != null && connection == rc) {
connection = null;
if (rc.timer != null) {
rc.timer.dispose();
if (source instanceof FlowablePublishClassic) {
if (connection != null && connection == rc) {
connection = null;
clearTimer(rc);
}
}
if (--rc.subscriberCount == 0) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(rc.get());

if (--rc.subscriberCount == 0) {
reset(rc);
}
} else {
if (connection != null && connection == rc) {
clearTimer(rc);
if (--rc.subscriberCount == 0) {
connection = null;
reset(rc);
}
}
}
}
}

void clearTimer(RefConnection rc) {
if (rc.timer != null) {
rc.timer.dispose();
rc.timer = null;
}
}

void reset(RefConnection rc) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(rc.get());
}
}

void timeout(RefConnection rc) {
synchronized (this) {
if (rc.subscriberCount == 0 && rc == connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,42 @@ void cancel(RefConnection rc) {

void terminated(RefConnection rc) {
synchronized (this) {
if (connection != null && connection == rc) {
connection = null;
if (rc.timer != null) {
rc.timer.dispose();
if (source instanceof ObservablePublishClassic) {
if (connection != null && connection == rc) {
connection = null;
clearTimer(rc);
}
}
if (--rc.subscriberCount == 0) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(rc.get());

if (--rc.subscriberCount == 0) {
reset(rc);
}
} else {
if (connection != null && connection == rc) {
clearTimer(rc);
if (--rc.subscriberCount == 0) {
connection = null;
reset(rc);
}
}
}
}
}

void clearTimer(RefConnection rc) {
if (rc.timer != null) {
rc.timer.dispose();
rc.timer = null;
}
}

void reset(RefConnection rc) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(rc.get());
}
}

void timeout(RefConnection rc) {
synchronized (this) {
if (rc.subscriberCount == 0 && rc == connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1443,4 +1443,23 @@ public void publishRefCountShallBeThreadSafe() {
.assertComplete();
}
}

@Test
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
ReplayProcessor<Integer> rp = ReplayProcessor.create();
rp.onNext(1);
rp.onComplete();

Flowable<Integer> shared = rp.share();

shared
.buffer(shared.debounce(5, TimeUnit.SECONDS))
.test()
.assertValueCount(2);

shared
.buffer(shared.debounce(5, TimeUnit.SECONDS))
.test()
.assertValueCount(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.operators.flowable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import java.io.IOException;
Expand Down Expand Up @@ -1436,4 +1437,23 @@ public void disconnectBeforeConnect() {

flowable.take(1).test().assertResult(2);
}

@Test
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
ReplayProcessor<Integer> rp = ReplayProcessor.create();
rp.onNext(1);
rp.onComplete();

Flowable<Integer> shared = rp.share();

shared
.buffer(shared.debounce(5, TimeUnit.SECONDS))
.test()
.assertValueCount(2);

shared
.buffer(shared.debounce(5, TimeUnit.SECONDS))
.test()
.assertValueCount(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1399,4 +1399,23 @@ public void publishRefCountShallBeThreadSafe() {
.assertComplete();
}
}

@Test
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
ReplaySubject<Integer> rs = ReplaySubject.create();
rs.onNext(1);
rs.onComplete();

Observable<Integer> shared = rs.share();

shared
.buffer(shared.debounce(5, TimeUnit.SECONDS))
.test()
.assertValueCount(2);

shared
.buffer(shared.debounce(5, TimeUnit.SECONDS))
.test()
.assertValueCount(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1380,4 +1380,23 @@ public void disconnectBeforeConnect() {

observable.take(1).test().assertResult(2);
}

@Test
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
ReplaySubject<Integer> rs = ReplaySubject.create();
rs.onNext(1);
rs.onComplete();

Observable<Integer> shared = rs.share();

shared
.buffer(shared.debounce(5, TimeUnit.SECONDS))
.test()
.assertValueCount(2);

shared
.buffer(shared.debounce(5, TimeUnit.SECONDS))
.test()
.assertValueCount(2);
}
}

0 comments on commit fa406d1

Please sign in to comment.