From 3fbbfa9bb218cf2d834fd9f87c0ab90f78b48fa0 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Wed, 6 May 2015 09:25:34 +1000 Subject: [PATCH] don't decrement if consumerCapacity is 0 --- .../rx/internal/operators/OnSubscribeRedo.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index f0a761c948..2f74635580 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -44,6 +44,7 @@ import rx.Producer; import rx.Scheduler; import rx.Subscriber; +import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; import rx.functions.Func1; import rx.functions.Func2; @@ -238,11 +239,16 @@ public void onNext(T v) { child.onNext(v); while (true) { long cc = consumerCapacity.get(); - if (cc != Long.MAX_VALUE) { - if (consumerCapacity.compareAndSet(cc, cc -1)) { + if (cc == 0) { + child.onError(new MissingBackpressureException( + "an item has arrived to this operator that was not requested, " + + "please use backpressure aware operators upstream (or append .onBackpressureBuffer() or similar)")); + break; + } else if (cc != Long.MAX_VALUE) { + if (consumerCapacity.compareAndSet(cc, cc - 1)) { break; } - } else { + } else { break; } } @@ -320,10 +326,14 @@ public void onError(Throwable e) { @Override public void onNext(Object t) { + // a restart instruction has arrived if (!isLocked.get() && !child.isUnsubscribed()) { + // if there are outstanding requests if (consumerCapacity.get() > 0) { + // schedule resubscription worker.schedule(subscribeToSource); } else { + // otherwise we indicate that on the next request we should resubscribe resumeBoundary.compareAndSet(false, true); } }