diff --git a/src/docs/asciidoc/operatorChoice.adoc b/src/docs/asciidoc/operatorChoice.adoc index 039ceff4ca..967d2689c7 100644 --- a/src/docs/asciidoc/operatorChoice.adoc +++ b/src/docs/asciidoc/operatorChoice.adoc @@ -202,8 +202,10 @@ I want to deal with: <>, <>, <>, * I want to get ticks from a clock, regular time intervals: `Flux#interval` +* I want to emit a single `0` after an initial delay: static `Mono.delay`. + * I want to introduce a delay... -** between each onNext signal: `delay` +** between each onNext signal: `Mono#delayElement`, `Flux#delayElements` ** before the subscription happens: `delaySubscription` [[which.window]] diff --git a/src/main/java/reactor/core/publisher/Flux.java b/src/main/java/reactor/core/publisher/Flux.java index b46cb62f37..0f7dcaf081 100644 --- a/src/main/java/reactor/core/publisher/Flux.java +++ b/src/main/java/reactor/core/publisher/Flux.java @@ -1023,6 +1023,11 @@ public static Flux just(T data) { *

* *

+ * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with + * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source + * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to + * another source. + * * @param source a {@link Publisher} of {@link Publisher} sources to merge * @param the merged type * @@ -1042,6 +1047,11 @@ public static Flux merge(Publisher> sour *

* *

+ * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with + * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source + * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to + * another source. + * * @param source a {@link Publisher} of {@link Publisher} sources to merge * @param concurrency the request produced to the main source thus limiting concurrent merge backlog * @param the merged type @@ -1060,6 +1070,11 @@ public static Flux merge(Publisher> sour *

* *

+ * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with + * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source + * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to + * another source. + * * @param source a {@link Publisher} of {@link Publisher} sources to merge * @param concurrency the request produced to the main source thus limiting concurrent merge backlog * @param prefetch the inner source request size @@ -1086,6 +1101,11 @@ public static Flux merge(Publisher> sour *

* *

+ * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with + * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source + * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to + * another source. + * * @param sources the {@link Iterable} of sources to merge (will be lazily iterated on subscribe) * @param The source type of the data sequence * @@ -1102,6 +1122,11 @@ public static Flux merge(Iterable> sourc *

* *

+ * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with + * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source + * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to + * another source. + * * @param sources the array of {@link Publisher} sources to merge * @param The source type of the data sequence * @@ -1119,6 +1144,11 @@ public static Flux merge(Publisher... sources) { *

* *

+ * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with + * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source + * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to + * another source. + * * @param sources the array of {@link Publisher} sources to merge * @param prefetch the inner source request size * @param The source type of the data sequence @@ -1138,6 +1168,11 @@ public static Flux merge(int prefetch, Publisher... sources) *

* *

+ * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with + * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source + * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to + * another source. + * * @param sources the array of {@link Publisher} sources to merge * @param prefetch the inner source request size * @param The source type of the data sequence @@ -4023,6 +4058,12 @@ public int getPrefetch() { *

* * + *

+ * The groups need to be drained and consumed downstream for groupBy to work correctly. + * Notably when the criteria produces a large amount of groups, it can lead to hanging + * if the groups are not suitably consumed downstream (eg. due to a {@code flatMap} + * with a {@code maxConcurrency} parameter that is set too low. + * * @param keyMapper the key mapping {@link Function} that evaluates an incoming data and returns a key. * @param the key type extracted from each value of this sequence * @@ -4039,6 +4080,12 @@ public final Flux> groupBy(Function * * + *

+ * The groups need to be drained and consumed downstream for groupBy to work correctly. + * Notably when the criteria produces a large amount of groups, it can lead to hanging + * if the groups are not suitably consumed downstream (eg. due to a {@code flatMap} + * with a {@code maxConcurrency} parameter that is set too low. + * * @param keyMapper the key mapping {@link Function} that evaluates an incoming data and returns a key. * @param prefetch the number of values to prefetch from the source * @param the key type extracted from each value of this sequence @@ -4057,6 +4104,12 @@ public final Flux> groupBy(Function * * + *

+ * The groups need to be drained and consumed downstream for groupBy to work correctly. + * Notably when the criteria produces a large amount of groups, it can lead to hanging + * if the groups are not suitably consumed downstream (eg. due to a {@code flatMap} + * with a {@code maxConcurrency} parameter that is set too low. + * * @param keyMapper the key mapping function that evaluates an incoming data and returns a key. * @param valueMapper the value mapping function that evaluates which data to extract for re-routing. * @param the key type extracted from each value of this sequence @@ -4078,6 +4131,12 @@ public final Flux> groupBy(Function * * + *

+ * The groups need to be drained and consumed downstream for groupBy to work correctly. + * Notably when the criteria produces a large amount of groups, it can lead to hanging + * if the groups are not suitably consumed downstream (eg. due to a {@code flatMap} + * with a {@code maxConcurrency} parameter that is set too low. + * * @param keyMapper the key mapping function that evaluates an incoming data and returns a key. * @param valueMapper the value mapping function that evaluates which data to extract for re-routing. * @param prefetch the number of values to prefetch from the source @@ -4456,6 +4515,11 @@ public final Flux> materialize() { *

* *

+ * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with + * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source + * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to + * another source. + * * @param other the {@link Publisher} to merge with * * @return a new {@link Flux}