From e7538584a4a066241bc64480a96ff9066b0a1b17 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 29 Jan 2020 11:57:11 +0100 Subject: [PATCH] 2.x: Fig groupBy not requesting more if a group is cancelled with buffered items (#6894) --- .../operators/flowable/FlowableGroupBy.java | 23 +++++++--- .../flowable/FlowableGroupByTest.java | 46 ++++++++++++++++--- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java index 8fd358c26e..99a63c7b91 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java @@ -518,6 +518,7 @@ public void request(long n) { public void cancel() { if (cancelled.compareAndSet(false, true)) { parent.cancel(key); + drain(); } } @@ -568,7 +569,6 @@ void drainFused() { for (;;) { if (a != null) { if (cancelled.get()) { - q.clear(); return; } @@ -623,7 +623,7 @@ void drainNormal() { T v = q.poll(); boolean empty = v == null; - if (checkTerminated(d, empty, a, delayError)) { + if (checkTerminated(d, empty, a, delayError, e)) { return; } @@ -636,7 +636,7 @@ void drainNormal() { e++; } - if (e == r && checkTerminated(done, q.isEmpty(), a, delayError)) { + if (e == r && checkTerminated(done, q.isEmpty(), a, delayError, e)) { return; } @@ -658,9 +658,15 @@ void drainNormal() { } } - boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError) { + boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError, long emitted) { if (cancelled.get()) { - queue.clear(); + // make sure buffered items can get replenished + while (queue.poll() != null) { + emitted++; + } + if (emitted != 0) { + parent.upstream.request(emitted); + } return true; } @@ -732,7 +738,12 @@ void tryReplenish() { @Override public void clear() { - queue.clear(); + // make sure buffered items can get replenished + SpscLinkedArrayQueue q = queue; + while (q.poll() != null) { + produced++; + } + tryReplenish(); } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java index 0db7abc639..cb52621bac 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java @@ -2215,20 +2215,20 @@ public void fusedNoConcurrentCleanDueToCancel() { try { final PublishProcessor pp = PublishProcessor.create(); - final AtomicReference>> qs = new AtomicReference>>(); + final AtomicReference>> qs = new AtomicReference>>(); final TestSubscriber ts2 = new TestSubscriber(); - pp.groupBy(Functions.identity(), Functions.identity(), false, 4) - .subscribe(new FlowableSubscriber>() { + pp.groupBy(Functions.identity(), Functions.identity(), false, 4) + .subscribe(new FlowableSubscriber>() { boolean once; @Override - public void onNext(GroupedFlowable g) { + public void onNext(GroupedFlowable g) { if (!once) { try { - GroupedFlowable t = qs.get().poll(); + GroupedFlowable t = qs.get().poll(); if (t != null) { once = true; t.subscribe(ts2); @@ -2250,7 +2250,7 @@ public void onComplete() { @Override public void onSubscribe(Subscription s) { @SuppressWarnings("unchecked") - QueueSubscription> q = (QueueSubscription>)s; + QueueSubscription> q = (QueueSubscription>)s; qs.set(q); q.requestFusion(QueueFuseable.ANY); q.request(1); @@ -2316,4 +2316,38 @@ public Publisher apply(GroupedFlowable g) { .assertComplete() .assertNoErrors(); } + + @Test + public void cancelledGroupResumesRequesting() { + final List> tss = new ArrayList>(); + final AtomicInteger counter = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + Flowable.range(1, 1000) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + counter.getAndIncrement(); + } + }) + .groupBy(Functions.justFunction(1)) + .subscribe(new Consumer>() { + @Override + public void accept(GroupedFlowable v) throws Exception { + TestSubscriber ts = TestSubscriber.create(0L); + tss.add(ts); + v.subscribe(ts); + } + }, Functions.emptyConsumer(), new Action() { + @Override + public void run() throws Exception { + done.set(true); + } + }); + + while (!done.get()) { + tss.remove(0).cancel(); + } + + assertEquals(1000, counter.get()); + } }