Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupBy followed by flatMap with "terminal operation" never finishes #596

Closed
osi opened this issue May 9, 2017 · 4 comments
Closed

groupBy followed by flatMap with "terminal operation" never finishes #596

osi opened this issue May 9, 2017 · 4 comments
Labels
for/stackoverflow Questions are best asked on SO or Gitter

Comments

@osi
Copy link
Contributor

osi commented May 9, 2017

The following snippet never finishes,

Flux<Integer> source = Flux.push(sink -> {
        for (int i = 0; i < 6000; i++) {
            sink.next(i);
        }
        sink.complete();
    });

StepVerifier.create(source.groupBy(Function.identity())
                              .flatMap(gf -> gf.reduce(BinaryOperator.maxBy(Integer::compareTo))))
                .expectNextCount(6000)
                .verifyComplete();

It works if the groupBy is given a prefetch of Integer.MAX_VALUE

In debugging it seems that the request to the upstream eventually falls to 0. Since the push ultimately completes, and the flatMap has a "terminal operation" (the reduce), my initial expectation was that this would have worked as written.

@akarnokd
Copy link
Contributor

akarnokd commented May 9, 2017

FlatMap by default merges a limited number of sources and groupBy may hang if not all of its groups are properly drained. Increasing maxConcurrency with flatMap should help.

@osi
Copy link
Contributor Author

osi commented May 10, 2017

thanks. i fixed it by adding a larger prefetch to the groupBy, but I see how modifying concurrency on flatMap would do the same.

i raised the issue because this seems like an API "gotcha". something might work ok in a smaller test but if the scale is ramped up the stream may hang.

@IceMan81
Copy link

IceMan81 commented May 3, 2018

@osi @simonbasle Is the best recommendation here to increase the prefetch on the groupBy or increase the concurrency on the flatMap? I have a scenario where I do not know upfront how many groups may be there in the groupBy - it feels like the best solution for that is to set the prefetch for groupBy to Integer.MAX_VALUE.
Thoughts?
Sorry, I know this is a closed ticket, but I got bit by the exact same issue and it took me a while to figure out the fix.

@osi
Copy link
Contributor Author

osi commented May 3, 2018

The general idea I started following was to be conscious of how many items I was flat-map'ing and setting the concurrency appropriately. If I didn't know upfront, I did use Integer.MAX_VALUE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Projects
None yet
Development

No branches or pull requests

4 participants