From 1ebd97900fe79130a3066a2cab1040dddf7229bd Mon Sep 17 00:00:00 2001 From: Vladimir Mironov Date: Mon, 12 Sep 2016 21:21:50 +0400 Subject: [PATCH] Observable/Flowable should unsubscribe from underlying subscription on dispose (#4536) --- .../flowable/FlowableTimeoutTimed.java | 2 +- .../observable/ObservableTimeoutTimed.java | 2 +- .../completable/CompletableTimeoutTest.java | 21 +++++++++ .../flowable/FlowableTimeoutTests.java | 17 +++++++ .../observable/ObservableTimeoutTests.java | 18 ++++++++ .../operators/single/SingleTimeoutTests.java | 44 +++++++++++++++++++ 6 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTests.java diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java index e00b203a7f..4883730152 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java @@ -252,7 +252,6 @@ void scheduleTimeout(final long idx) { public void run() { if (idx == index) { done = true; - s.cancel(); dispose(); actual.onError(new TimeoutException()); @@ -293,6 +292,7 @@ public void onComplete() { public void dispose() { worker.dispose(); DisposableHelper.dispose(timer); + s.cancel(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java index bc9afd696d..6a121f95e8 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java @@ -251,7 +251,6 @@ void scheduleTimeout(final long idx) { public void run() { if (idx == index) { done = true; - s.dispose(); dispose(); actual.onError(new TimeoutException()); @@ -292,6 +291,7 @@ public void onComplete() { public void dispose() { worker.dispose(); DisposableHelper.dispose(timer); + s.dispose(); } @Override diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java index 84a127cd1a..7fa77366a2 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java @@ -14,9 +14,14 @@ package io.reactivex.internal.operators.completable; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.concurrent.*; +import io.reactivex.schedulers.TestScheduler; +import io.reactivex.subjects.PublishSubject; +import io.reactivex.subscribers.TestSubscriber; import org.junit.Test; import io.reactivex.Completable; @@ -56,4 +61,20 @@ public void run() throws Exception { assertEquals(1, call[0]); } + @Test + public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() { + final PublishSubject subject = PublishSubject.create(); + final TestScheduler scheduler = new TestScheduler(); + + final TestSubscriber observer = subject.toCompletable() + .timeout(100, TimeUnit.MILLISECONDS, scheduler) + .test(); + + assertTrue(subject.hasObservers()); + + observer.dispose(); + + assertFalse(subject.hasObservers()); + } + } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java index cdc16e894b..20f9d427da 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java @@ -13,6 +13,8 @@ package io.reactivex.internal.operators.flowable; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; @@ -358,6 +360,21 @@ public void subscribe(Subscriber subscriber) { verify(s, times(1)).cancel(); } + @Test + public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() { + final PublishProcessor subject = PublishProcessor.create(); + final TestScheduler scheduler = new TestScheduler(); + + final TestSubscriber observer = subject + .timeout(100, TimeUnit.MILLISECONDS, scheduler) + .test(); + + assertTrue(subject.hasSubscribers()); + + observer.dispose(); + + assertFalse(subject.hasSubscribers()); + } @Test public void timedAndOther() { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java index ca0acd4730..29a69f3f96 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java @@ -13,6 +13,8 @@ package io.reactivex.internal.operators.observable; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; @@ -357,6 +359,22 @@ public void subscribe(Observer NbpSubscriber) { verify(s, times(1)).dispose(); } + @Test + public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() { + final PublishSubject subject = PublishSubject.create(); + final TestScheduler scheduler = new TestScheduler(); + + final TestObserver observer = subject + .timeout(100, TimeUnit.MILLISECONDS, scheduler) + .test(); + + assertTrue(subject.hasObservers()); + + observer.dispose(); + + assertFalse(subject.hasObservers()); + } + @Test public void timedAndOther() { Observable.never().timeout(100, TimeUnit.MILLISECONDS, Observable.just(1)) diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTests.java new file mode 100644 index 0000000000..a6b95ef78a --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTests.java @@ -0,0 +1,44 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.schedulers.TestScheduler; +import io.reactivex.subjects.PublishSubject; +import io.reactivex.subscribers.TestSubscriber; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SingleTimeoutTests { + + @Test + public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() { + final PublishSubject subject = PublishSubject.create(); + final TestScheduler scheduler = new TestScheduler(); + + final TestSubscriber observer = subject.toSingle() + .timeout(100, TimeUnit.MILLISECONDS, scheduler) + .test(); + + assertTrue(subject.hasObservers()); + + observer.dispose(); + + assertFalse(subject.hasObservers()); + } + +}