diff --git a/src/main/java/rx/internal/operators/OnSubscribeRange.java b/src/main/java/rx/internal/operators/OnSubscribeRange.java index f7a95bd216..f648e6f414 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRange.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRange.java @@ -73,7 +73,7 @@ public void request(long n) { } } else if (n > 0) { // backpressure is requested - long _c = REQUESTED_UPDATER.getAndAdd(this, n); + long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER,this, n); if (_c == 0) { while (true) { /* diff --git a/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java b/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java index 21dc5599e2..9b06cdb4d0 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java @@ -32,6 +32,7 @@ import rx.Observable; import rx.Observer; +import rx.Subscriber; import rx.functions.Action1; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; @@ -190,4 +191,33 @@ public void testWithBackpressureRequestWayMore() { ts.assertReceivedOnNext(list); ts.assertTerminalEvent(); } + + @Test + public void testRequestOverflow() { + final AtomicInteger count = new AtomicInteger(); + int n = 10; + Observable.range(1, n).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(2); + } + + @Override + public void onCompleted() { + //do nothing + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + + @Override + public void onNext(Integer t) { + count.incrementAndGet(); + request(Long.MAX_VALUE - 1); + }}); + assertEquals(n, count.get()); + } }