Skip to content

Commit

Permalink
[doc] Document when groupBy can deadlock because of backpressure (#3872)
Browse files Browse the repository at this point in the history
Documenting the scenarios where backpressure can lead to deadlock in groupBy and how it could be avoided.
  • Loading branch information
NamrataGuptaRoy authored Aug 20, 2024
1 parent 35651b0 commit bd04464
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,73 @@ continues fetching data from upstream and feeding more groups. Sometimes, these
constraints multiply and lead to hangs, such as when you have a high cardinality and the
concurrency of the `flatMap` consuming the groups is too low.

The following example groups values by their 1st character:

[source,java]
[%unbreakable]
----
public static Flux<String> createGroupedFlux() {
List<String> data = List.of("alpha", "air", "aim", "beta", "cat", "ball", "apple", "bat", "dog", "ace");
return Flux.fromIterable(data)
.groupBy(d -> d.charAt(0), 5)
.concatMap(g -> g.map(String::valueOf)
.startWith(String.valueOf(g.key()))
.map(o -> {
System.out.println(o);
return o;
})
);
}
@Test
public void testGroupBy() {
StepVerifier.create(createGroupedFlux())
.expectNext("a", "alpha", "air", "aim", "apple", "ace")
.expectNext("b", "beta", "ball", "bat")
.expectNext("c", "cat", "d", "dog")
.verifyComplete();
}
----
In the above example:
cardinality of groups is 4 (a, b, c ,d being the group keys)
concurrency of concatMap is 1
bufferSize of groupBy is 5 (since we defined prefetch as 5, by default its 256)

`a alpha air aim apple`

The test gets stuck after printing these elements.

Explanation :

Initially groupBy requests 5 elements
groupByBuffer :

`"alpha", "air", "aim", "beta", "cat"`

concatMap has concurrency 1, therefore group with key 'a' is the only group subscribed.
Out of these `"alpha", "air", "aim"` are consumed by concatMap and rest `"beta", "cat"` remain in the buffer.

Next, groupBy requests for additional 3 items (2 spaces are already occupied in buffer)
groupByBuffer :

`"beta", "cat", "ball", "apple", "bat"`

Out of these "apple" is consumed rest remain in the buffer

Next, groupBy requests for additional 1 item (4 spaces are already occupied in buffer)

`"beta", "cat", "ball", "bat","dog"`

Now, nothing from the buffer belongs to group a, hence no more consumption happens by concatMap and it remains open.
groupBy is unable to request more data from the publisher since its buffer size is full. The publisher faces a backpressure and is not able to publish the remaining items. This results in the deadlock.

In the same example, if the data was in slightly different order, for example :

`"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"`

The same test would PASS successfully (the same concatMap would be able to receive a complete signal, complete one group subscribe to next group and so on).
Hence, when the pattern of data published is random, groupBy is likely to face a deadlock when the consumption rate is slower than the accommodation capacity of the groupBy buffer.

[[windowing-with-flux-flux]]
== Windowing with `Flux<Flux<T>>`

Expand Down
32 changes: 32 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -5924,6 +5924,14 @@ public int getPrefetch() {
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
Expand Down Expand Up @@ -5954,6 +5962,14 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
Expand Down Expand Up @@ -5987,6 +6003,14 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
Expand Down Expand Up @@ -6024,6 +6048,14 @@ public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extend
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
Expand Down

0 comments on commit bd04464

Please sign in to comment.