From 5348ebf333b21007186b307174c517e5af879b09 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Wed, 28 Jan 2015 11:19:33 +1100 Subject: [PATCH] handle request overflow for OperatorMerge --- .../rx/internal/operators/OperatorMerge.java | 11 ++++++- .../internal/operators/OperatorMergeTest.java | 32 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 9b0eb074aa..b86db26c47 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -545,7 +545,16 @@ public void request(long n) { if (n == Long.MAX_VALUE) { requested = Long.MAX_VALUE; } else { - REQUESTED.getAndAdd(this, n); + // add n to requested but check for overflow + while (true) { + long current = REQUESTED.get(this); + long next = current + n; + //check for overflow + if (next < 0) + next = Long.MAX_VALUE; + if (REQUESTED.compareAndSet(this, current, next)) + break; + } if (ms.drainQueuesIfNeeded()) { boolean sendComplete = false; synchronized (ms) { diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index fa861f68ea..7d785b4088 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -1183,6 +1183,38 @@ public void call() { assertTrue(a); //} } + + @Test + public void testMergeRequestOverflow() throws InterruptedException { + //do a non-trivial merge so that future optimisations with EMPTY don't invalidate this test + Observable o = Observable.from(Arrays.asList(1,2)).mergeWith(Observable.from(Arrays.asList(3,4))); + final int expectedCount = 4; + final CountDownLatch latch = new CountDownLatch(expectedCount); + o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + //ignore + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + + @Override + public void onNext(Integer t) { + latch.countDown(); + request(2); + request(Long.MAX_VALUE-1); + }}); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } private static Action1 printCount() { return new Action1() {