Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[doc] Document when groupBy can deadlock because of backpressure #3872

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,73 @@ continues fetching data from upstream and feeding more groups. Sometimes, these
constraints multiply and lead to hangs, such as when you have a high cardinality and the
concurrency of the `flatMap` consuming the groups is too low.

The following example groups values by their 1st character:

[source,java]
[%unbreakable]
----
public static Flux<String> createGroupedFlux() {
List<String> data = List.of("alpha", "air", "aim", "beta", "cat", "ball", "apple", "bat", "dog", "ace");
return Flux.fromIterable(data)
.groupBy(d -> d.charAt(0), 5)
.concatMap(g -> g.map(String::valueOf)
.startWith(String.valueOf(g.key()))
.map(o -> {
System.out.println(o);
return o;
})
);
}

@Test
public void testGroupBy() {
StepVerifier.create(createGroupedFlux())
.expectNext("a", "alpha", "air", "aim", "apple", "ace")
.expectNext("b", "beta", "ball", "bat")
.expectNext("c", "cat", "d", "dog")
.verifyComplete();
}
----
In the above example:
cardinality of groups is 4 (a, b, c ,d being the group keys)
concurrency of concatMap is 1
bufferSize of groupBy is 5 (since we defined prefetch as 5, by default its 256)

`a alpha air aim apple`

The test gets stuck after printing these elements.

Explanation :

Initially groupBy requests 5 elements
groupByBuffer :

`"alpha", "air", "aim", "beta", "cat"`

concatMap has concurrency 1, therefore group with key 'a' is the only group subscribed.
Out of these `"alpha", "air", "aim"` are consumed by concatMap and rest `"beta", "cat"` remain in the buffer.

Next, groupBy requests for additional 3 items (2 spaces are already occupied in buffer)
groupByBuffer :

`"beta", "cat", "ball", "apple", "bat"`

Out of these "apple" is consumed rest remain in the buffer

Next, groupBy requests for additional 1 item (4 spaces are already occupied in buffer)

`"beta", "cat", "ball", "bat","dog"`

Now, nothing from the buffer belongs to group a, hence no more consumption happens by concatMap and it remains open.
groupBy is unable to request more data from the publisher since its buffer size is full. The publisher faces a backpressure and is not able to publish the remaining items. This results in the deadlock.

In the same example, if the data was in slightly different order, for example :

`"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"`

The same test would PASS successfully (the same concatMap would be able to receive a complete signal, complete one group subscribe to next group and so on).
Hence, when the pattern of data published is random, groupBy is likely to face a deadlock when the consumption rate is slower than the accommodation capacity of the groupBy buffer.

[[windowing-with-flux-flux]]
== Windowing with `Flux<Flux<T>>`

Expand Down
32 changes: 32 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -5924,6 +5924,14 @@ public int getPrefetch() {
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
Expand Down Expand Up @@ -5954,6 +5962,14 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
Expand Down Expand Up @@ -5987,6 +6003,14 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
Expand Down Expand Up @@ -6024,6 +6048,14 @@ public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extend
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
Expand Down
Loading