Skip to content

Commit

Permalink
Merge pull request #2771 from davidmoten/range-request-overflow
Browse files Browse the repository at this point in the history
OnSubscribeRange request overflow check
  • Loading branch information
akarnokd committed Feb 24, 2015
2 parents 2605ede + 608dd0f commit 4778d00
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OnSubscribeRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void request(long n) {
}
} else if (n > 0) {
// backpressure is requested
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER,this, n);
if (_c == 0) {
while (true) {
/*
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeRangeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -190,4 +191,33 @@ public void testWithBackpressureRequestWayMore() {
ts.assertReceivedOnNext(list);
ts.assertTerminalEvent();
}

@Test
public void testRequestOverflow() {
final AtomicInteger count = new AtomicInteger();
int n = 10;
Observable.range(1, n).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(2);
}

@Override
public void onCompleted() {
//do nothing
}

@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}

@Override
public void onNext(Integer t) {
count.incrementAndGet();
request(Long.MAX_VALUE - 1);
}});
assertEquals(n, count.get());
}
}

0 comments on commit 4778d00

Please sign in to comment.