Skip to content

Commit

Permalink
fixes GroupBy overflow issue
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Oct 20, 2020
1 parent 868dca9 commit c40e359
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ public K key() {
volatile boolean outputFused;

int produced;
boolean isFirstRequest = true;

UnicastGroupedFlux(K key,
Queue<V> queue,
Expand Down Expand Up @@ -572,7 +573,16 @@ void drainRegular(Subscriber<? super V> a) {
if (e != 0) {
GroupByMain<?, K, V> main = parent;
if (main != null) {
main.s.request(e);
if (this.isFirstRequest) {
this.isFirstRequest = false;
long toRequest = e - 1;

if (toRequest > 0) {
main.s.request(toRequest);
}
} else {
main.s.request(e);
}
}
if (r != Long.MAX_VALUE) {
REQUESTED.addAndGet(this, -e);
Expand Down Expand Up @@ -751,7 +761,16 @@ void tryReplenish() {
produced = 0;
GroupByMain<?, K, V> main = parent;
if (main != null) {
main.s.request(p);
if (this.isFirstRequest) {
this.isFirstRequest = false;
p--;

if (p > 0) {
main.s.request(p);
}
} else {
main.s.request(p);
}
}
}
}
Expand Down

0 comments on commit c40e359

Please sign in to comment.