From 29c1b6e8a2645d1444f050b92e679e974498e3a9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 12 Dec 2014 10:34:39 +0800 Subject: [PATCH] Fix 'request(0)' issue in Scan --- .../rx/internal/operators/OperatorScan.java | 16 +++--- .../internal/operators/OperatorScanTest.java | 56 +++++++++++++++++-- 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorScan.java b/src/main/java/rx/internal/operators/OperatorScan.java index 2621ec452d..6809af0db3 100644 --- a/src/main/java/rx/internal/operators/OperatorScan.java +++ b/src/main/java/rx/internal/operators/OperatorScan.java @@ -147,19 +147,17 @@ public void request(long n) { if (once.compareAndSet(false, true)) { if (initialValue == NO_INITIAL_VALUE || n == Long.MAX_VALUE) { producer.request(n); + } else if (n == 1) { + excessive.set(true); + producer.request(1); // request at least 1 } else { - if (n == Long.MAX_VALUE) { - producer.request(Long.MAX_VALUE); - } else if (n == 1) { - excessive.set(true); - producer.request(1); // request at least 1 - } else { - producer.request(n - 1); - } + // n != Long.MAX_VALUE && n != 1 + producer.request(n - 1); } } else { // pass-thru after first time - if (excessive.compareAndSet(true, false) && n != Long.MAX_VALUE) { + if (n > 1 // avoid to request 0 + && excessive.compareAndSet(true, false) && n != Long.MAX_VALUE) { producer.request(n - 1); } else { producer.request(n); diff --git a/src/test/java/rx/internal/operators/OperatorScanTest.java b/src/test/java/rx/internal/operators/OperatorScanTest.java index e3ed546347..5d2aa0bf1e 100644 --- a/src/test/java/rx/internal/operators/OperatorScanTest.java +++ b/src/test/java/rx/internal/operators/OperatorScanTest.java @@ -20,15 +20,14 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -36,6 +35,7 @@ import rx.Observable; import rx.Observer; +import rx.Producer; import rx.Subscriber; import rx.functions.Action2; import rx.functions.Func0; @@ -312,4 +312,52 @@ public Integer call(Integer t1, Integer t2) { subscriber.assertTerminalEvent(); subscriber.assertNoErrors(); } + + @Test + public void testScanShouldNotRequestZero() { + final AtomicReference producer = new AtomicReference(); + Observable o = Observable.create(new Observable.OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + Producer p = spy(new Producer() { + + private AtomicBoolean requested = new AtomicBoolean(false); + + @Override + public void request(long n) { + if (requested.compareAndSet(false, true)) { + subscriber.onNext(1); + } else { + subscriber.onCompleted(); + } + } + }); + producer.set(p); + subscriber.setProducer(p); + } + }).scan(100, new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }); + + o.subscribe(new TestSubscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onNext(Integer integer) { + request(1); + } + }); + + verify(producer.get(), never()).request(0); + verify(producer.get(), times(2)).request(1); + } }