Skip to content

Commit

Permalink
2.x: Fig groupBy not requesting more if a group is cancelled with buf…
Browse files Browse the repository at this point in the history
…fered items (#6894)
  • Loading branch information
akarnokd authored Jan 29, 2020
1 parent 030528b commit e753858
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ public void request(long n) {
public void cancel() {
if (cancelled.compareAndSet(false, true)) {
parent.cancel(key);
drain();
}
}

Expand Down Expand Up @@ -568,7 +569,6 @@ void drainFused() {
for (;;) {
if (a != null) {
if (cancelled.get()) {
q.clear();
return;
}

Expand Down Expand Up @@ -623,7 +623,7 @@ void drainNormal() {
T v = q.poll();
boolean empty = v == null;

if (checkTerminated(d, empty, a, delayError)) {
if (checkTerminated(d, empty, a, delayError, e)) {
return;
}

Expand All @@ -636,7 +636,7 @@ void drainNormal() {
e++;
}

if (e == r && checkTerminated(done, q.isEmpty(), a, delayError)) {
if (e == r && checkTerminated(done, q.isEmpty(), a, delayError, e)) {
return;
}

Expand All @@ -658,9 +658,15 @@ void drainNormal() {
}
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, boolean delayError) {
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, boolean delayError, long emitted) {
if (cancelled.get()) {
queue.clear();
// make sure buffered items can get replenished
while (queue.poll() != null) {
emitted++;
}
if (emitted != 0) {
parent.upstream.request(emitted);
}
return true;
}

Expand Down Expand Up @@ -732,7 +738,12 @@ void tryReplenish() {

@Override
public void clear() {
queue.clear();
// make sure buffered items can get replenished
SpscLinkedArrayQueue<T> q = queue;
while (q.poll() != null) {
produced++;
}
tryReplenish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2215,20 +2215,20 @@ public void fusedNoConcurrentCleanDueToCancel() {
try {
final PublishProcessor<Integer> pp = PublishProcessor.create();

final AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>> qs = new AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>>();
final AtomicReference<QueueSubscription<GroupedFlowable<Integer, Integer>>> qs = new AtomicReference<QueueSubscription<GroupedFlowable<Integer, Integer>>>();

final TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();

pp.groupBy(Functions.identity(), Functions.<Integer>identity(), false, 4)
.subscribe(new FlowableSubscriber<GroupedFlowable<Object, Integer>>() {
pp.groupBy(Functions.<Integer>identity(), Functions.<Integer>identity(), false, 4)
.subscribe(new FlowableSubscriber<GroupedFlowable<Integer, Integer>>() {

boolean once;

@Override
public void onNext(GroupedFlowable<Object, Integer> g) {
public void onNext(GroupedFlowable<Integer, Integer> g) {
if (!once) {
try {
GroupedFlowable<Object, Integer> t = qs.get().poll();
GroupedFlowable<Integer, Integer> t = qs.get().poll();
if (t != null) {
once = true;
t.subscribe(ts2);
Expand All @@ -2250,7 +2250,7 @@ public void onComplete() {
@Override
public void onSubscribe(Subscription s) {
@SuppressWarnings("unchecked")
QueueSubscription<GroupedFlowable<Object, Integer>> q = (QueueSubscription<GroupedFlowable<Object, Integer>>)s;
QueueSubscription<GroupedFlowable<Integer, Integer>> q = (QueueSubscription<GroupedFlowable<Integer, Integer>>)s;
qs.set(q);
q.requestFusion(QueueFuseable.ANY);
q.request(1);
Expand Down Expand Up @@ -2316,4 +2316,38 @@ public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) {
.assertComplete()
.assertNoErrors();
}

@Test
public void cancelledGroupResumesRequesting() {
final List<TestSubscriber<Integer>> tss = new ArrayList<TestSubscriber<Integer>>();
final AtomicInteger counter = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
Flowable.range(1, 1000)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
counter.getAndIncrement();
}
})
.groupBy(Functions.justFunction(1))
.subscribe(new Consumer<GroupedFlowable<Integer, Integer>>() {
@Override
public void accept(GroupedFlowable<Integer, Integer> v) throws Exception {
TestSubscriber<Integer> ts = TestSubscriber.create(0L);
tss.add(ts);
v.subscribe(ts);
}
}, Functions.emptyConsumer(), new Action() {
@Override
public void run() throws Exception {
done.set(true);
}
});

while (!done.get()) {
tss.remove(0).cancel();
}

assertEquals(1000, counter.get());
}
}

0 comments on commit e753858

Please sign in to comment.