Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document when groupBy can deadlock because of beckpressure #3443

Closed
OlegDokuka opened this issue Apr 20, 2023 · 11 comments · Fixed by #3872
Closed

Document when groupBy can deadlock because of beckpressure #3443

OlegDokuka opened this issue Apr 20, 2023 · 11 comments · Fixed by #3872
Assignees
Labels
good first issue Ideal for a new contributor, we'll help type/documentation A documentation update
Milestone

Comments

@OlegDokuka
Copy link
Contributor

recent activity related to groupBy deadlock (#3442 #3427) adds extra motivation to clarify how groupBy has to be used:

Documentation Issue

  • Document that the inner group MUST be subscribed
  • Document that operators such as flatMap/concatMap may cause deadlock because of backpressure
@OlegDokuka OlegDokuka added the type/documentation A documentation update label Apr 20, 2023
@He-Pin
Copy link
Contributor

He-Pin commented Apr 20, 2023

That's true, In Akka Stream there is some notes about this.

  /**
   * Flatten the sub-flows back into the super-flow by concatenating them.
   * This is usually a bad idea when combined with `groupBy` since it can
   * easily lead to deadlock—the concatenation does not consume from the second
   * substream until the first has finished and the `groupBy` operator will get
   * back-pressure from the second stream.
   *
   * This is identical in effect to `mergeSubstreamsWithParallelism(1)`.
   */

@OlegDokuka OlegDokuka added the good first issue Ideal for a new contributor, we'll help label May 30, 2023
@MikkelHJuul
Copy link

Just so as to not flood the issues-section I found that the groupBy operator blocks/deadlocks in a case where the flatMap retries. Using backoff on the retry fixes this situation.

e.g.

Flux.just(
    Map.entry(1,1),
    Map.entry(2,1), //error from this blocks everything
    Map.entry(3,3),
    Map.entry(2,2)
).groupBy(Map.Entry::getKey)
.flatMap(grp -> 
    grp.switchMap(e -> Mono.just(e).flatMapMany(this::factory).retry())
)

<T> Flux<T> factory(Map.Entry<Integer, Integer> entry) {
    if (!Objects.equals(entry.getKey(), entry.getValue()))
        return Flux.error(...);
    return Flux.just(...);
}

@NamrataGuptaRoy
Copy link
Contributor

@OlegDokuka I want to get started with contributing to this. Could you point out where the documentation needs to be added?
I see a good documentation here https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#groupBy-java.util.function.Function-

@chemicL
Copy link
Member

chemicL commented Jul 29, 2024

Hey, @NamrataGuptaRoy. There seem to be 4 groupBy overloads. We usually have the relevant content repeated for each overload. I think a dedicated note in each will be useful. Also, as @He-Pin noted, concatMap sounds like a good operator to take note of this combination. The suggestion from @MikkelHJuul is also worth exploring and considering how exactly that circumstance can be triggered (what kind of backoff? how does backoff help and regular retry runs into problems?) and making a relevant note in the appropriate operator.

Also, we have a mention of the problems in the reference documentation. Perhaps adding a bit more examples to the warning section would be helpful.

Perhaps best would be to start with the list of considered additions here in the issue and once we consider the list is cohesive you could open a PR? Thanks, I look forward to your ideas!

@NamrataGuptaRoy
Copy link
Contributor

NamrataGuptaRoy commented Aug 3, 2024

Going through few issues and threads related to groupBy, I feel, it is known that groupBy works well with LOW cardinality, but what quantity is considered LOW (its a relative term) is unclear among users, this results in deadlock situations.

I have the below proposal for the documentation:
To ensure avoiding deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the cardinality of groups created. In this case every group would have its own subscriber and there would be no deadlocks, even when the data publish pattern is random.

In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
the subscribers should be designed with caution, because if the rate of consumption is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.

Example using concatMap:

 public static Flux<String> createGroupedFlux() {
        List<String> data = List.of("alpha", "air", "aim", "beta", "cat", "ball", "apple", "bat", "dog", "ace");
        return Flux.fromIterable(data)
                .groupBy(d -> d.charAt(0), 5)
                .concatMap(g -> g.map(String::valueOf)
                        .startWith(String.valueOf(g.key()))
                        .map(o -> {
                            System.out.println(o);
                            return o;
                        })
                );
    }

    @Test
    public void testGroupBy() {
        StepVerifier.create(createGroupedFlux())
                .expectNext("a", "alpha", "air", "aim", "apple", "ace")
                .expectNext("b", "beta", "ball", "bat")
                .expectNext("c", "cat", "d", "dog")
                .verifyComplete();
    }

In the above example:
cardinality of groups is 4 (a, b, c ,d being the group keys)
concurrency of concatMap is 1
bufferSize of groupBy is 5 (since we defined prefetch as 5, by default its 256)

a alpha air aim apple

The test gets stuck after printing these elements.

Explanation:

Initially groupBy requests 5 elements
groupByBuffer :

"alpha", "air", "aim", "beta", "cat"

concatMap has concurrency 1, therefore group with key 'a' is the only group subscribed.
Out of these "alpha", "air", "aim" are consumed by concatMap and rest remain in the buffer.

Next, groupBy requests for additional 3 items (2 spaces are already occupied in buffer)
groupByBuffer :

"beta", "cat", "ball", "apple", "bat"

Out of these "apple" is consumed rest remain in the buffer

Next, groupBy requests for additional 1 item (4 spaces are already occupied in buffer)

"beta", "cat", "ball", "bat","dog"

Now, nothing from the buffer belongs to group a, hence no more consumption happens by concatMap and it remains open.
groupBy is unable to request more data from the publisher since its buffer size is full. The publisher faces a backpressure and is not able to publish the remaining items. This results in the deadlock.

In the same example, if the data was in slightly different order, for example :

"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"

The same test would PASS successfully (the same concatMap would be able to receive a complete signal, complete one group subscribe to next group and so on).
Hence, when the pattern of data published is random, groupBy is likely to face a deadlock when the consumption rate is slower than the accommodation capacity of the groupBy buffer.

@NamrataGuptaRoy
Copy link
Contributor

NamrataGuptaRoy commented Aug 3, 2024

Pardon the huge comment !
I believe this note and example could cover the following points raised in this issue thread

  • Document that the inner group MUST be subscribed
  • Document that operators such as flatMap/concatMap may cause deadlock because of backpressure
    pointed out by @OlegDokuka
  • concatMap sounds like a good operator to take note of this combination pointed out by @He-Pin
  • There seem to be 4 groupBy overloads. We usually have the relevant content repeated for each overload. I think a dedicated note in each will be useful mentioned by @chemicL


Although, I would not put the entire example and note at all the places, only the relevant parts in the respective operator documentations and probably the example in the reference documentation
For PRs, I am thinking about having PRs addressing the above points and a separate PR for the retry and backoff pointed out by @MikkelHJuul .
I will work on creating an example for retry and backoff.


Let me know your inputs and suggestions, and whether this looks good :)

@chemicL
Copy link
Member

chemicL commented Aug 9, 2024

Thanks @NamrataGuptaRoy - the example is really good and easy to follow! Please go ahead and submit a PR for the reference documentation update with the example. You can also combine this with Javadoc updates you speak of. The PR should target the main branch.

I am only unclear about the retry + backoff case, so that can be another PR/issue. Do we know why retry deadlocks, but a retry with backoff does not?

@NamrataGuptaRoy
Copy link
Contributor

NamrataGuptaRoy commented Aug 11, 2024

Thanks @chemicL ! I will work on creating the PRs.
For the retry + backoff case, I was looking into the example provided by @MikkelHJuul
I believe the retry without backoff is always going to create a deadlock (runs indefinitely without a complete signal since it keeps resubscribing and erroring out) irrespective of whether its combined with groupBy or not, if the retry never succeeds.
Example where retry never succeeds:

 public Flux<Object> factoryWithException(Map.Entry<Integer, Integer> entry) {
        if (entry.getKey() > entry.getValue()) {
            return Flux.error(new RuntimeException());
        }
        return Flux.just(entry.getKey()+ " "+ entry.getValue());
    }

Example of deadlock with above method:

 public Flux<Object> createFluxWithRetryWithoutBackoff(){
        return Flux.just(
                Map.entry(1,1),
                Map.entry(2,1), //error from this blocks everything
                Map.entry(3,3),
                Map.entry(4,2)
        ).flatMap(map -> Mono.just(map).flatMapMany(this::factoryWithException).retry());
    }

    @Test
    public void testGroupByWithRetryWithoutBackoff() {
        StepVerifier.create(createFluxWithRetryWithoutBackoff())
                .expectNext("1 1")
                .expectError(RuntimeException.class) // keeps retrying indefinitely on exception and never succeeds
                .verify();
    }

If used with backoff, it throws the error and sends the complete signal.
Example with backoff:

 public Flux<Object> createFluxWithRetryAndBackoff(){
        return Flux.just(
                Map.entry(1,1),
                Map.entry(2,1), //error from this blocks everything
                Map.entry(3,3),
                Map.entry(4,2)
        ).flatMap(map -> Mono.just(map).flatMapMany(this::factoryWithException).retry(2));
    }

    @Test
    public void testGroupByWithRetryAndBackoff() {
        StepVerifier.create(createFluxWithRetryAndBackoff())
                .expectNext("1 1")
                .expectError(RuntimeException.class) // retries 2 times and never succeeds but throws error and sends complete signal
                .verify();
    }

Also, in the scenario where retry succeeds after a couple of times, the process will eventually be completed even without a backoff specified (even when used alongwith groupBy).
Example where retry succeeds after couple of times:

 Set<Integer> visited = new HashSet<>();
 public Flux<Object> factory(Map.Entry<Integer, Integer> entry) {
        if (entry.getKey() > entry.getValue() && !visited.contains(entry.getKey())) {
            visited.add(entry.getKey());
            return Flux.error(new RuntimeException());
        } 
        return Flux.just(entry.getKey()+ " "+ entry.getValue());
    }
    
     // after 1st retry it succeeds

Example of NO deadlock with groupBy + retry without backoff:

 public Flux<Object> createFluxWithRetry(){
        return Flux.just(
                        Map.entry(1, 1),
                        Map.entry(2, 1), //error from this blocks everything
                        Map.entry(3, 3),
                        Map.entry(4, 2)
                )
                .groupBy(Map.Entry::getKey)
                .flatMap(grp ->
                        grp.switchMap(e -> Mono.just(e).flatMapMany(this::factory).retry())
                );
    }

    @Test
    public void testGroupByWithRetry() {
        StepVerifier.create(createFluxWithRetry())
                .expectNext("1 1")
                .expectNext("2 1")
                .expectNext("3 3")
                .expectNext("4 2")
                .verifyComplete();
    }
    // completes successfully

Let me know your thoughts!

@chemicL
Copy link
Member

chemicL commented Aug 12, 2024

Oh, that's what you mean. I was confused by the term "backoff" which implied some notion of delay in my head. Right, so the problems can happen when you retry indefinitely. That absolutely makes sense. Yet, I think is a topic for another discussion and is not directly connected to groupBy. I'm not sure if it makes sense to document this further than what we already have for retry() with no argument:

Re-subscribes to this {@link Flux} sequence if it signals any error, indefinitely.

Perhaps something regarding retry() use could be further clarified in the reference documentation, but I'm not sure if it's worth adding too many examples to what you already provided with the indeed confusing combination of groupBy and concatMap. Let's go step by step and get something merged in before we consider more additions :) Thanks!

@NamrataGuptaRoy
Copy link
Contributor

NamrataGuptaRoy commented Aug 13, 2024

Yes @chemicL I agree, this is not directly connected to groupBy. (assuming this interpretation of @MikkelHJuul 's comment is correct)

NamrataGuptaRoy added a commit to NamrataGuptaRoy/reactor-core that referenced this issue Aug 15, 2024
…eactor#3443)

This documents the scenarios where backpressure can lead to deadlock in groupBy and how it could be avoided.
NamrataGuptaRoy added a commit to NamrataGuptaRoy/reactor-core that referenced this issue Aug 15, 2024
@chemicL
Copy link
Member

chemicL commented Aug 20, 2024

Thanks, @NamrataGuptaRoy ! With #3872 now merged I think this case is now resolved. Congrats on your contribution! 🎉

@chemicL chemicL closed this as completed Aug 20, 2024
@chemicL chemicL added this to the 3.7.0-M6 milestone Aug 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Ideal for a new contributor, we'll help type/documentation A documentation update
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants