From 7071fc0a84ca840a11ebc6c4be0bba4c8a877e7e Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sat, 16 May 2015 20:28:08 +1000 Subject: [PATCH] OperatorObserveOn should not request more after child is unsubscribed --- .../internal/operators/OperatorObserveOn.java | 4 +- .../operators/OperatorObserveOnTest.java | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index e15c2f93cf..1f1f380ff0 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -185,7 +185,9 @@ void pollQueue() { counter = 1; long produced = 0; long r = requested; - while (!child.isUnsubscribed()) { + for (;;) { + if (child.isUnsubscribed()) + return; Throwable error; if (finished) { if ((error = this.error) != null) { diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index b0c8a5bcfd..1f0cc0a892 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -26,7 +26,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -765,4 +767,42 @@ public void onNext(Integer t) { } + @Test + public void testNoMoreRequestsAfterUnsubscribe() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final List requests = Collections.synchronizedList(new ArrayList()); + Observable.range(1, 1000000) + .doOnRequest(new Action1() { + + @Override + public void call(Long n) { + requests.add(n); + } + }) + .observeOn(Schedulers.io()) + .subscribe(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer t) { + unsubscribe(); + latch.countDown(); + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(1, requests.size()); + } + }