Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Change Flowable.groupBy to signal MBE instead of possibly hanging #6740

Merged
merged 5 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 61 additions & 37 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.flowables.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
Expand Down Expand Up @@ -10421,13 +10421,16 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, final Co
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
* lead to {@code OutOfMemoryError}.</dd>
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
* </dl>
*
* @param keySelector
Expand All @@ -10438,9 +10441,11 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, final Co
* unique key value and each of which emits those items from the source Publisher that share that
* key value
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
* @see #groupBy(Function, boolean)
* @see #groupBy(Function, Function)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector) {
return groupBy(keySelector, Functions.<T>identity(), false, bufferSize());
Expand Down Expand Up @@ -10474,13 +10479,16 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
* lead to {@code OutOfMemoryError}.</dd>
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
* </dl>
*
* @param keySelector
Expand All @@ -10496,7 +10504,7 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector, boolean delayError) {
return groupBy(keySelector, Functions.<T>identity(), delayError, bufferSize());
Expand Down Expand Up @@ -10530,13 +10538,16 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
* lead to {@code OutOfMemoryError}.</dd>
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
* </dl>
*
* @param keySelector
Expand All @@ -10551,9 +10562,12 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
* unique key value and each of which emits those items from the source Publisher that share that
* key value
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
* @see #groupBy(Function, Function, boolean)
* @see #groupBy(Function, Function, boolean, int)
* @see #groupBy(Function, Function, boolean, int, Function)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
Function<? super T, ? extends V> valueSelector) {
Expand Down Expand Up @@ -10588,13 +10602,16 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
* lead to {@code OutOfMemoryError}.</dd>
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
* </dl>
*
* @param keySelector
Expand All @@ -10612,9 +10629,10 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* unique key value and each of which emits those items from the source Publisher that share that
* key value
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
* @see #groupBy(Function, Function, boolean, int)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
Function<? super T, ? extends V> valueSelector, boolean delayError) {
Expand Down Expand Up @@ -10649,13 +10667,16 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
* lead to {@code OutOfMemoryError}.</dd>
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
* </dl>
*
* @param keySelector
Expand All @@ -10678,7 +10699,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
Function<? super T, ? extends V> valueSelector,
Expand Down Expand Up @@ -10759,13 +10780,16 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code GroupedFlowable}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code GroupedFlowable}s use
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
* lead to {@code OutOfMemoryError}.</dd>
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
* </dl>
* <p>History: 2.1.10 - beta
* @param keySelector
Expand Down Expand Up @@ -10796,7 +10820,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
Function<? super T, ? extends V> valueSelector,
Expand Down
Loading