Skip to content

Commit

Permalink
fix #596 Better hint at merge/groupBy restrictions + delayElement(s)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed May 18, 2017
1 parent 49a7dc2 commit a9df23a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/docs/asciidoc/operatorChoice.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ I want to deal with: <<which.create>>, <<which.values>>, <<which.peeking>>,

* I want to get ticks from a clock, regular time intervals: `Flux#interval`

* I want to emit a single `0` after an initial delay: static `Mono.delay`.

* I want to introduce a delay...
** between each onNext signal: `delay`
** between each onNext signal: `Mono#delayElement`, `Flux#delayElements`
** before the subscription happens: `delaySubscription`

[[which.window]]
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,11 @@ public static <T> Flux<T> just(T data) {
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/mergeinner.png" alt="">
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param source a {@link Publisher} of {@link Publisher} sources to merge
* @param <T> the merged type
*
Expand All @@ -1042,6 +1047,11 @@ public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> sour
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/mergeinner.png" alt="">
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param source a {@link Publisher} of {@link Publisher} sources to merge
* @param concurrency the request produced to the main source thus limiting concurrent merge backlog
* @param <T> the merged type
Expand All @@ -1060,6 +1070,11 @@ public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> sour
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/mergeinner.png" alt="">
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param source a {@link Publisher} of {@link Publisher} sources to merge
* @param concurrency the request produced to the main source thus limiting concurrent merge backlog
* @param prefetch the inner source request size
Expand All @@ -1086,6 +1101,11 @@ public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> sour
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/merge.png" alt="">
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param sources the {@link Iterable} of sources to merge (will be lazily iterated on subscribe)
* @param <I> The source type of the data sequence
*
Expand All @@ -1102,6 +1122,11 @@ public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sourc
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/merge.png" alt="">
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param sources the array of {@link Publisher} sources to merge
* @param <I> The source type of the data sequence
*
Expand All @@ -1119,6 +1144,11 @@ public static <I> Flux<I> merge(Publisher<? extends I>... sources) {
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/merge.png" alt="">
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param sources the array of {@link Publisher} sources to merge
* @param prefetch the inner source request size
* @param <I> The source type of the data sequence
Expand All @@ -1138,6 +1168,11 @@ public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources)
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/merge.png" alt="">
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param sources the array of {@link Publisher} sources to merge
* @param prefetch the inner source request size
* @param <I> The source type of the data sequence
Expand Down Expand Up @@ -4023,6 +4058,12 @@ public int getPrefetch() {
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/groupby.png" alt="">
*
* <p>
* The groups need to be drained and consumed downstream for groupBy to work correctly.
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low.
*
* @param keyMapper the key mapping {@link Function} that evaluates an incoming data and returns a key.
* @param <K> the key type extracted from each value of this sequence
*
Expand All @@ -4039,6 +4080,12 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/groupby.png" alt="">
*
* <p>
* The groups need to be drained and consumed downstream for groupBy to work correctly.
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low.
*
* @param keyMapper the key mapping {@link Function} that evaluates an incoming data and returns a key.
* @param prefetch the number of values to prefetch from the source
* @param <K> the key type extracted from each value of this sequence
Expand All @@ -4057,6 +4104,12 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/groupby.png" alt="">
*
* <p>
* The groups need to be drained and consumed downstream for groupBy to work correctly.
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low.
*
* @param keyMapper the key mapping function that evaluates an incoming data and returns a key.
* @param valueMapper the value mapping function that evaluates which data to extract for re-routing.
* @param <K> the key type extracted from each value of this sequence
Expand All @@ -4078,6 +4131,12 @@ public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extend
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/groupby.png" alt="">
*
* <p>
* The groups need to be drained and consumed downstream for groupBy to work correctly.
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low.
*
* @param keyMapper the key mapping function that evaluates an incoming data and returns a key.
* @param valueMapper the value mapping function that evaluates which data to extract for re-routing.
* @param prefetch the number of values to prefetch from the source
Expand Down Expand Up @@ -4456,6 +4515,11 @@ public final Flux<Signal<T>> materialize() {
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M1/src/docs/marble/merge.png" alt="">
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param other the {@link Publisher} to merge with
*
* @return a new {@link Flux}
Expand Down

0 comments on commit a9df23a

Please sign in to comment.