diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 02ed97b462..2da1306632 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -115,22 +115,42 @@ void cancel(RefConnection rc) { void terminated(RefConnection rc) { synchronized (this) { - if (connection != null && connection == rc) { - connection = null; - if (rc.timer != null) { - rc.timer.dispose(); + if (source instanceof FlowablePublishClassic) { + if (connection != null && connection == rc) { + connection = null; + clearTimer(rc); } - } - if (--rc.subscriberCount == 0) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(rc.get()); + + if (--rc.subscriberCount == 0) { + reset(rc); + } + } else { + if (connection != null && connection == rc) { + clearTimer(rc); + if (--rc.subscriberCount == 0) { + connection = null; + reset(rc); + } } } } } + void clearTimer(RefConnection rc) { + if (rc.timer != null) { + rc.timer.dispose(); + rc.timer = null; + } + } + + void reset(RefConnection rc) { + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } else if (source instanceof ResettableConnectable) { + ((ResettableConnectable)source).resetIf(rc.get()); + } + } + void timeout(RefConnection rc) { synchronized (this) { if (rc.subscriberCount == 0 && rc == connection) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 5306f4481d..27e633c664 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -112,22 +112,42 @@ void cancel(RefConnection rc) { void terminated(RefConnection rc) { synchronized (this) { - if (connection != null && connection == rc) { - connection = null; - if (rc.timer != null) { - rc.timer.dispose(); + if (source instanceof ObservablePublishClassic) { + if (connection != null && connection == rc) { + connection = null; + clearTimer(rc); } - } - if (--rc.subscriberCount == 0) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(rc.get()); + + if (--rc.subscriberCount == 0) { + reset(rc); + } + } else { + if (connection != null && connection == rc) { + clearTimer(rc); + if (--rc.subscriberCount == 0) { + connection = null; + reset(rc); + } } } } } + void clearTimer(RefConnection rc) { + if (rc.timer != null) { + rc.timer.dispose(); + rc.timer = null; + } + } + + void reset(RefConnection rc) { + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } else if (source instanceof ResettableConnectable) { + ((ResettableConnectable)source).resetIf(rc.get()); + } + } + void timeout(RefConnection rc) { synchronized (this) { if (rc.subscriberCount == 0 && rc == connection) { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java index 2ee23b2dc1..c8eca05dca 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java @@ -1443,4 +1443,23 @@ public void publishRefCountShallBeThreadSafe() { .assertComplete(); } } + + @Test + public void upstreamTerminationTriggersAnotherCancel() throws Exception { + ReplayProcessor rp = ReplayProcessor.create(); + rp.onNext(1); + rp.onComplete(); + + Flowable shared = rp.share(); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 3cb5f57fb0..c032e61da5 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.io.IOException; @@ -1436,4 +1437,23 @@ public void disconnectBeforeConnect() { flowable.take(1).test().assertResult(2); } + + @Test + public void upstreamTerminationTriggersAnotherCancel() throws Exception { + ReplayProcessor rp = ReplayProcessor.create(); + rp.onNext(1); + rp.onComplete(); + + Flowable shared = rp.share(); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java index 05aada6b84..0f675bd15d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java @@ -1399,4 +1399,23 @@ public void publishRefCountShallBeThreadSafe() { .assertComplete(); } } + + @Test + public void upstreamTerminationTriggersAnotherCancel() throws Exception { + ReplaySubject rs = ReplaySubject.create(); + rs.onNext(1); + rs.onComplete(); + + Observable shared = rs.share(); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 99a2a79f79..485afc54e1 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -1380,4 +1380,23 @@ public void disconnectBeforeConnect() { observable.take(1).test().assertResult(2); } + + @Test + public void upstreamTerminationTriggersAnotherCancel() throws Exception { + ReplaySubject rs = ReplaySubject.create(); + rs.onNext(1); + rs.onComplete(); + + Observable shared = rs.share(); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + + shared + .buffer(shared.debounce(5, TimeUnit.SECONDS)) + .test() + .assertValueCount(2); + } }