From bd0446412d62bb7a1df7517c6a4bc486ba629a29 Mon Sep 17 00:00:00 2001 From: Namrata Gupta Roy Date: Tue, 20 Aug 2024 16:31:54 +0530 Subject: [PATCH] [doc] Document when groupBy can deadlock because of backpressure (#3872) Documenting the scenarios where backpressure can lead to deadlock in groupBy and how it could be avoided. --- .../advanced-three-sorts-batching.adoc | 67 +++++++++++++++++++ .../java/reactor/core/publisher/Flux.java | 32 +++++++++ 2 files changed, 99 insertions(+) diff --git a/docs/modules/ROOT/pages/advancedFeatures/advanced-three-sorts-batching.adoc b/docs/modules/ROOT/pages/advancedFeatures/advanced-three-sorts-batching.adoc index 82b4d43aa4..d07c057060 100644 --- a/docs/modules/ROOT/pages/advancedFeatures/advanced-three-sorts-batching.adoc +++ b/docs/modules/ROOT/pages/advancedFeatures/advanced-three-sorts-batching.adoc @@ -50,6 +50,73 @@ continues fetching data from upstream and feeding more groups. Sometimes, these constraints multiply and lead to hangs, such as when you have a high cardinality and the concurrency of the `flatMap` consuming the groups is too low. +The following example groups values by their 1st character: + +[source,java] +[%unbreakable] +---- + public static Flux createGroupedFlux() { + List data = List.of("alpha", "air", "aim", "beta", "cat", "ball", "apple", "bat", "dog", "ace"); + return Flux.fromIterable(data) + .groupBy(d -> d.charAt(0), 5) + .concatMap(g -> g.map(String::valueOf) + .startWith(String.valueOf(g.key())) + .map(o -> { + System.out.println(o); + return o; + }) + ); + } + + @Test + public void testGroupBy() { + StepVerifier.create(createGroupedFlux()) + .expectNext("a", "alpha", "air", "aim", "apple", "ace") + .expectNext("b", "beta", "ball", "bat") + .expectNext("c", "cat", "d", "dog") + .verifyComplete(); + } +---- +In the above example: +cardinality of groups is 4 (a, b, c ,d being the group keys) +concurrency of concatMap is 1 +bufferSize of groupBy is 5 (since we defined prefetch as 5, by default its 256) + +`a alpha air aim apple` + +The test gets stuck after printing these elements. + +Explanation : + +Initially groupBy requests 5 elements +groupByBuffer : + +`"alpha", "air", "aim", "beta", "cat"` + +concatMap has concurrency 1, therefore group with key 'a' is the only group subscribed. +Out of these `"alpha", "air", "aim"` are consumed by concatMap and rest `"beta", "cat"` remain in the buffer. + +Next, groupBy requests for additional 3 items (2 spaces are already occupied in buffer) +groupByBuffer : + +`"beta", "cat", "ball", "apple", "bat"` + +Out of these "apple" is consumed rest remain in the buffer + +Next, groupBy requests for additional 1 item (4 spaces are already occupied in buffer) + +`"beta", "cat", "ball", "bat","dog"` + +Now, nothing from the buffer belongs to group a, hence no more consumption happens by concatMap and it remains open. +groupBy is unable to request more data from the publisher since its buffer size is full. The publisher faces a backpressure and is not able to publish the remaining items. This results in the deadlock. + +In the same example, if the data was in slightly different order, for example : + +`"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"` + +The same test would PASS successfully (the same concatMap would be able to receive a complete signal, complete one group subscribe to next group and so on). +Hence, when the pattern of data published is random, groupBy is likely to face a deadlock when the consumption rate is slower than the accommodation capacity of the groupBy buffer. + [[windowing-with-flux-flux]] == Windowing with `Flux>` diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 23e7d5c068..c3e57dd8ff 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -5924,6 +5924,14 @@ public int getPrefetch() { * if the groups are not suitably consumed downstream (eg. due to a {@code flatMap} * with a {@code maxConcurrency} parameter that is set too low). * + *

+ * To avoid deadlock, the concurrency of the subscriber to groupBy should be + * greater than or equal to the cardinality of groups created. In this case every group would have + * its own subscriber and there would be no deadlocks, even when the data publish pattern is random. + * In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers), + * the subscribers should be designed with caution, because if the rate of consumption + * is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure. + * *

* Note that groups are a live view of part of the underlying source publisher, * and as such their lifecycle is tied to that source. As a result, it is not possible @@ -5954,6 +5962,14 @@ public final Flux> groupBy(Function + * To avoid deadlock, the concurrency of the subscriber to groupBy should be + * greater than or equal to the cardinality of groups created. In this case every group would have + * its own subscriber and there would be no deadlocks, even when the data publish pattern is random. + * In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers), + * the subscribers should be designed with caution, because if the rate of consumption + * is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure. * *

* Note that groups are a live view of part of the underlying source publisher, @@ -5987,6 +6003,14 @@ public final Flux> groupBy(Function + * To avoid deadlock, the concurrency of the subscriber to groupBy should be + * greater than or equal to the cardinality of groups created. In this case every group would have + * its own subscriber and there would be no deadlocks, even when the data publish pattern is random. + * In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers), + * the subscribers should be designed with caution, because if the rate of consumption + * is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure. * *

* Note that groups are a live view of part of the underlying source publisher, @@ -6024,6 +6048,14 @@ public final Flux> groupBy(Function + * To avoid deadlock, the concurrency of the subscriber to groupBy should be + * greater than or equal to the cardinality of groups created. In this case every group would have + * its own subscriber and there would be no deadlocks, even when the data publish pattern is random. + * In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers), + * the subscribers should be designed with caution, because if the rate of consumption + * is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure. + * *

* Note that groups are a live view of part of the underlying source publisher, * and as such their lifecycle is tied to that source. As a result, it is not possible