Skip to content

Commit

Permalink
2.x: Fix parallel() on grouped flowable not replenishing properly (#6720
Browse files Browse the repository at this point in the history
)
  • Loading branch information
akarnokd authored Nov 21, 2019
1 parent aeb5f2c commit 170f952
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -709,17 +709,25 @@ public T poll() {
produced++;
return v;
}
int p = produced;
if (p != 0) {
produced = 0;
parent.upstream.request(p);
}
tryReplenish();
return null;
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
if (queue.isEmpty()) {
tryReplenish();
return true;
}
return false;
}

void tryReplenish() {
int p = produced;
if (p != 0) {
produced = 0;
parent.upstream.request(p);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2286,4 +2286,34 @@ public void run() {
}
}
}

@Test
public void fusedParallelGroupProcessing() {
Flowable.range(0, 500000)
.subscribeOn(Schedulers.single())
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer i) {
return i % 2;
}
})
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) {
return g.getKey() == 0
? g
.parallel()
.runOn(Schedulers.computation())
.map(Functions.<Integer>identity())
.sequential()
: g.map(Functions.<Integer>identity()) // no need to use hide
;
}
})
.test()
.awaitDone(20, TimeUnit.SECONDS)
.assertValueCount(500000)
.assertComplete()
.assertNoErrors();
}
}

0 comments on commit 170f952

Please sign in to comment.