diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index da7160f81e..423bcef036 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -26,6 +26,7 @@ import io.reactivex.internal.fuseable.ScalarCallable; import io.reactivex.internal.observers.*; import io.reactivex.internal.operators.flowable.*; +import io.reactivex.internal.operators.mixed.*; import io.reactivex.internal.operators.observable.*; import io.reactivex.internal.util.*; import io.reactivex.observables.*; @@ -6498,7 +6499,97 @@ public final Completable concatMapCompletable(Function mapper, int capacityHint) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(capacityHint, "capacityHint"); - return RxJavaPlugins.onAssembly(new ObservableConcatMapCompletable(this, mapper, capacityHint)); + return RxJavaPlugins.onAssembly(new ObservableConcatMapCompletable(this, mapper, ErrorMode.IMMEDIATE, capacityHint)); + } + + /** + * Maps the upstream items into {@link CompletableSource}s and subscribes to them one after the + * other terminates, delaying all errors till both this {@code Observable} and all + * inner {@code CompletableSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function called with the upstream item and should return + * a {@code CompletableSource} to become the next source to + * be subscribed to + * @return a new Completable instance + * @since 2.1.11 - experimental + * @see #concatMapCompletable(Function, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable concatMapCompletableDelayError(Function mapper) { + return concatMapCompletableDelayError(mapper, true, 2); + } + + /** + * Maps the upstream items into {@link CompletableSource}s and subscribes to them one after the + * other terminates, optionally delaying all errors till both this {@code Observable} and all + * inner {@code CompletableSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function called with the upstream item and should return + * a {@code CompletableSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Observable} or any of the + * inner {@code CompletableSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Observable} is delayed until the current inner + * {@code CompletableSource} terminates and only then is + * it emitted to the downstream. + * @return a new Completable instance + * @since 2.1.11 - experimental + * @see #concatMapCompletable(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable concatMapCompletableDelayError(Function mapper, boolean tillTheEnd) { + return concatMapCompletableDelayError(mapper, tillTheEnd, 2); + } + + /** + * Maps the upstream items into {@link CompletableSource}s and subscribes to them one after the + * other terminates, optionally delaying all errors till both this {@code Observable} and all + * inner {@code CompletableSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function called with the upstream item and should return + * a {@code CompletableSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Observable} or any of the + * inner {@code CompletableSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Observable} is delayed until the current inner + * {@code CompletableSource} terminates and only then is + * it emitted to the downstream. + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code CompletableSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code CompletableSource}s. + * @return a new Completable instance + * @since 2.1.11 - experimental + * @see #concatMapCompletable(Function, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable concatMapCompletableDelayError(Function mapper, boolean tillTheEnd, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ObservableConcatMapCompletable(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, prefetch)); } /** @@ -6558,6 +6649,312 @@ public final Observable concatMapIterable(final Function + * + *
+ *
Scheduler:
+ *
{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybeDelayError(Function) + * @see #concatMapMaybe(Function, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapMaybe(Function> mapper) { + return concatMapMaybe(mapper, 2); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other succeeds or completes, emits their success value if available or terminates immediately if + * either this {@code Observable} or the current inner {@code MaybeSource} fail. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code MaybeSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code MaybeSource}s. + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybe(Function) + * @see #concatMapMaybeDelayError(Function, boolean, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapMaybe(Function> mapper, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ObservableConcatMapMaybe(this, mapper, ErrorMode.IMMEDIATE, prefetch)); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other terminates, emits their success value if available and delaying all errors + * till both this {@code Observable} and all inner {@code MaybeSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybe(Function) + * @see #concatMapMaybeDelayError(Function, boolean) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapMaybeDelayError(Function> mapper) { + return concatMapMaybeDelayError(mapper, true, 2); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other terminates, emits their success value if available and optionally delaying all errors + * till both this {@code Observable} and all inner {@code MaybeSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Observable} or any of the + * inner {@code MaybeSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Observable} is delayed until the current inner + * {@code MaybeSource} terminates and only then is + * it emitted to the downstream. + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybe(Function, int) + * @see #concatMapMaybeDelayError(Function, boolean, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapMaybeDelayError(Function> mapper, boolean tillTheEnd) { + return concatMapMaybeDelayError(mapper, tillTheEnd, 2); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other terminates, emits their success value if available and optionally delaying all errors + * till both this {@code Observable} and all inner {@code MaybeSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Observable} or any of the + * inner {@code MaybeSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Observable} is delayed until the current inner + * {@code MaybeSource} terminates and only then is + * it emitted to the downstream. + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code MaybeSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code MaybeSource}s. + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybe(Function, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapMaybeDelayError(Function> mapper, boolean tillTheEnd, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ObservableConcatMapMaybe(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, prefetch)); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds, emits their success values or terminates immediately if + * either this {@code Observable} or the current inner {@code SingleSource} fail. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapSingleDelayError(Function) + * @see #concatMapSingle(Function, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapSingle(Function> mapper) { + return concatMapSingle(mapper, 2); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds, emits their success values or terminates immediately if + * either this {@code Observable} or the current inner {@code SingleSource} fail. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code SingleSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code SingleSource}s. + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapSingle(Function) + * @see #concatMapSingleDelayError(Function, boolean, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapSingle(Function> mapper, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ObservableConcatMapSingle(this, mapper, ErrorMode.IMMEDIATE, prefetch)); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds or fails, emits their success values and delays all errors + * till both this {@code Observable} and all inner {@code SingleSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapSingle(Function) + * @see #concatMapSingleDelayError(Function, boolean) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapSingleDelayError(Function> mapper) { + return concatMapSingleDelayError(mapper, true, 2); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds or fails, emits their success values and optionally delays all errors + * till both this {@code Observable} and all inner {@code SingleSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Observable} or any of the + * inner {@code SingleSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Observable} is delayed until the current inner + * {@code SingleSource} terminates and only then is + * it emitted to the downstream. + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapSingle(Function, int) + * @see #concatMapSingleDelayError(Function, boolean, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapSingleDelayError(Function> mapper, boolean tillTheEnd) { + return concatMapSingleDelayError(mapper, tillTheEnd, 2); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds or fails, emits their success values and optionally delays errors + * till both this {@code Observable} and all inner {@code SingleSource}s terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Observable} or any of the + * inner {@code SingleSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Observable} is delayed until the current inner + * {@code SingleSource} terminates and only then is + * it emitted to the downstream. + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code SingleSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code SingleSource}s. + * @return a new Observable instance + * @since 2.1.11 - experimental + * @see #concatMapSingle(Function, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable concatMapSingleDelayError(Function> mapper, boolean tillTheEnd, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ObservableConcatMapSingle(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, prefetch)); + } + /** * Returns an Observable that emits the items emitted from the current ObservableSource, then the next, one after * the other, without interleaving them. @@ -11723,6 +12120,151 @@ public final Observable switchMap(Function(this, mapper, bufferSize, false)); } + /** + * Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while + * disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one + * active {@code CompletableSource} running. + *

+ * + *

+ * Since a {@code CompletableSource} doesn't produce any items, the resulting reactive type of + * this operator is a {@link Completable} that can only indicate successful completion or + * a failure in any of the inner {@code CompletableSource}s or the failure of the current + * {@link Observable}. + *

+ *
Scheduler:
+ *
{@code switchMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either this {@code Observable} or the active {@code CompletableSource} signals an {@code onError}, + * the resulting {@code Completable} is terminated immediately with that {@code Throwable}. + * Use the {@link #switchMapCompletableDelayError(Function)} to delay such inner failures until + * every inner {@code CompletableSource}s and the main {@code Observable} terminates in some fashion. + * If they fail concurrently, the operator may combine the {@code Throwable}s into a + * {@link io.reactivex.exceptions.CompositeException CompositeException} + * and signal it to the downstream instead. If any inactivated (switched out) {@code CompletableSource} + * signals an {@code onError} late, the {@code Throwable}s will be signalled to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. + *
+ *
+ * @param mapper the function called with each upstream item and should return a + * {@link CompletableSource} to be subscribed to and awaited for + * (non blockingly) for its terminal event + * @return the new Completable instance + * @since 2.1.11 - experimental + * @see #switchMapCompletableDelayError(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable switchMapCompletable(@NonNull Function mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableSwitchMapCompletable(this, mapper, false)); + } + + /** + * Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while + * disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one + * active {@code CompletableSource} running and delaying any main or inner errors until all + * of them terminate. + *

+ * + *

+ * Since a {@code CompletableSource} doesn't produce any items, the resulting reactive type of + * this operator is a {@link Completable} that can only indicate successful completion or + * a failure in any of the inner {@code CompletableSource}s or the failure of the current + * {@link Observable}. + *

+ *
Scheduler:
+ *
{@code switchMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
Errors of this {@code Observable} and all the {@code CompletableSource}s, who had the chance + * to run to their completion, are delayed until + * all of them terminate in some fashion. At this point, if there was only one failure, the respective + * {@code Throwable} is emitted to the dowstream. It there were more than one failures, the + * operator combines all {@code Throwable}s into a {@link io.reactivex.exceptions.CompositeException CompositeException} + * and signals that to the downstream. + * If any inactivated (switched out) {@code CompletableSource} + * signals an {@code onError} late, the {@code Throwable}s will be signalled to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. + *
+ *
+ * @param mapper the function called with each upstream item and should return a + * {@link CompletableSource} to be subscribed to and awaited for + * (non blockingly) for its terminal event + * @return the new Completable instance + * @since 2.1.11 - experimental + * @see #switchMapCompletableDelayError(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable switchMapCompletableDelayError(@NonNull Function mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableSwitchMapCompletable(this, mapper, true)); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones + * while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if + * available while failing immediately if this {@code Observable} or any of the + * active inner {@code MaybeSource}s fail. + *

+ * + *

+ *
Scheduler:
+ *
{@code switchMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
This operator terminates with an {@code onError} if this {@code Observable} or any of + * the inner {@code MaybeSource}s fail while they are active. When this happens concurrently, their + * individual {@code Throwable} errors may get combined and emitted as a single + * {@link io.reactivex.exceptions.CompositeException CompositeException}. Otherwise, a late + * (i.e., inactive or switched out) {@code onError} from this {@code Observable} or from any of + * the inner {@code MaybeSource}s will be forwarded to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} as + * {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}
+ *
+ * @param the output value type + * @param mapper the function called with the current upstream event and should + * return a {@code MaybeSource} to replace the current active inner source + * and get subscribed to. + * @return the new Observable instance + * @since 2.1.11 - experimental + * @see #switchMapMaybe(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable switchMapMaybe(@NonNull Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableSwitchMapMaybe(this, mapper, false)); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones + * while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if + * available, delaying errors from this {@code Observable} or the inner {@code MaybeSource}s until all terminate. + *

+ * + *

+ *
Scheduler:
+ *
{@code switchMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the output value type + * @param mapper the function called with the current upstream event and should + * return a {@code MaybeSource} to replace the current active inner source + * and get subscribed to. + * @return the new Observable instance + * @since 2.1.11 - experimental + * @see #switchMapMaybe(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable switchMapMaybeDelayError(@NonNull Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableSwitchMapMaybe(this, mapper, true)); + } + /** * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source * ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted @@ -11750,7 +12292,8 @@ public final Observable switchMap(Function Observable switchMapSingle(@NonNull Function> mapper) { - return ObservableInternalHelper.switchMapSingle(this, mapper); + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableSwitchMapSingle(this, mapper, false)); } /** @@ -11781,7 +12324,8 @@ public final Observable switchMapSingle(@NonNull Function Observable switchMapSingleDelayError(@NonNull Function> mapper) { - return ObservableInternalHelper.switchMapSingleDelayError(this, mapper); + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableSwitchMapSingle(this, mapper, true)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java index 43223ec721..6d0548a733 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java @@ -255,10 +255,10 @@ void drain() { consumed = c; } - SingleSource ms; + SingleSource ss; try { - ms = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null SingleSource"); + ss = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null SingleSource"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); upstream.cancel(); @@ -270,7 +270,7 @@ void drain() { } state = STATE_ACTIVE; - ms.subscribe(inner); + ss.subscribe(inner); break; } else if (s == STATE_RESULT_VALUE) { long e = emitted; diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java index 813f779902..3e15dfc7ed 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java @@ -115,10 +115,10 @@ public void onNext(T t) { current.dispose(); } - SingleSource ms; + SingleSource ss; try { - ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource"); + ss = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); upstream.cancel(); @@ -135,7 +135,7 @@ public void onNext(T t) { break; } if (inner.compareAndSet(current, observer)) { - ms.subscribe(observer); + ss.subscribe(observer); break; } } diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java new file mode 100644 index 0000000000..bc765d48b5 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java @@ -0,0 +1,277 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the + * other completes or terminates (in error-delaying mode). + * @param the upstream value type + * @since 2.1.11 - experimental + */ +@Experimental +public final class ObservableConcatMapCompletable extends Completable { + + final Observable source; + + final Function mapper; + + final ErrorMode errorMode; + + final int prefetch; + + public ObservableConcatMapCompletable(Observable source, + Function mapper, + ErrorMode errorMode, + int prefetch) { + this.source = source; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(CompletableObserver s) { + source.subscribe(new ConcatMapCompletableObserver(s, mapper, errorMode, prefetch)); + } + + static final class ConcatMapCompletableObserver + extends AtomicInteger + implements Observer, Disposable { + + private static final long serialVersionUID = 3610901111000061034L; + + final CompletableObserver downstream; + + final Function mapper; + + final ErrorMode errorMode; + + final AtomicThrowable errors; + + final ConcatMapInnerObserver inner; + + final int prefetch; + + final SimplePlainQueue queue; + + Disposable upstream; + + volatile boolean active; + + volatile boolean done; + + volatile boolean disposed; + + ConcatMapCompletableObserver(CompletableObserver downstream, + Function mapper, + ErrorMode errorMode, int prefetch) { + this.downstream = downstream; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + this.errors = new AtomicThrowable(); + this.inner = new ConcatMapInnerObserver(this); + this.queue = new SpscLinkedArrayQueue(prefetch); + } + + @Override + public void onSubscribe(Disposable s) { + if (DisposableHelper.validate(upstream, s)) { + this.upstream = s; + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + queue.offer(t); + drain(); + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (errorMode == ErrorMode.IMMEDIATE) { + disposed = true; + inner.dispose(); + t = errors.terminate(); + if (t != ExceptionHelper.TERMINATED) { + downstream.onError(t); + } + if (getAndIncrement() == 0) { + queue.clear(); + } + } else { + done = true; + drain(); + } + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void dispose() { + disposed = true; + upstream.dispose(); + inner.dispose(); + if (getAndIncrement() == 0) { + queue.clear(); + } + } + + @Override + public boolean isDisposed() { + return disposed; + } + + void innerError(Throwable ex) { + if (errors.addThrowable(ex)) { + if (errorMode == ErrorMode.IMMEDIATE) { + disposed = true; + upstream.dispose(); + ex = errors.terminate(); + if (ex != ExceptionHelper.TERMINATED) { + downstream.onError(ex); + } + if (getAndIncrement() == 0) { + queue.clear(); + } + } else { + active = false; + drain(); + } + } else { + RxJavaPlugins.onError(ex); + } + } + + void innerComplete() { + active = false; + drain(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + do { + if (disposed) { + queue.clear(); + return; + } + + if (!active) { + + if (errorMode == ErrorMode.BOUNDARY) { + if (errors.get() != null) { + disposed = true; + queue.clear(); + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + boolean d = done; + T v = queue.poll(); + boolean empty = v == null; + + if (d && empty) { + disposed = true; + Throwable ex = errors.terminate(); + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } + + if (!empty) { + + CompletableSource cs; + + try { + cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + disposed = true; + queue.clear(); + upstream.dispose(); + errors.addThrowable(ex); + ex = errors.terminate(); + downstream.onError(ex); + return; + } + active = true; + cs.subscribe(inner); + } + } + } while (decrementAndGet() != 0); + } + + static final class ConcatMapInnerObserver extends AtomicReference + implements CompletableObserver { + + private static final long serialVersionUID = 5638352172918776687L; + + final ConcatMapCompletableObserver parent; + + ConcatMapInnerObserver(ConcatMapCompletableObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); + } + + @Override + public void onError(Throwable e) { + parent.innerError(e); + } + + @Override + public void onComplete() { + parent.innerComplete(); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java new file mode 100644 index 0000000000..9d83c77e59 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java @@ -0,0 +1,306 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps each upstream item into a {@link MaybeSource}, subscribes to them one after the other terminates + * and relays their success values, optionally delaying any errors till the main and inner sources + * terminate. + * + * @param the upstream element type + * @param the output element type + * + * @since 2.1.11 - experimental + */ +@Experimental +public final class ObservableConcatMapMaybe extends Observable { + + final Observable source; + + final Function> mapper; + + final ErrorMode errorMode; + + final int prefetch; + + public ObservableConcatMapMaybe(Observable source, + Function> mapper, + ErrorMode errorMode, int prefetch) { + this.source = source; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new ConcatMapMaybeMainObserver(s, mapper, prefetch, errorMode)); + } + + static final class ConcatMapMaybeMainObserver + extends AtomicInteger + implements Observer, Disposable { + + private static final long serialVersionUID = -9140123220065488293L; + + final Observer downstream; + + final Function> mapper; + + final AtomicThrowable errors; + + final ConcatMapMaybeObserver inner; + + final SimplePlainQueue queue; + + final ErrorMode errorMode; + + Disposable upstream; + + volatile boolean done; + + volatile boolean cancelled; + + R item; + + volatile int state; + + /** No inner MaybeSource is running. */ + static final int STATE_INACTIVE = 0; + /** An inner MaybeSource is running but there are no results yet. */ + static final int STATE_ACTIVE = 1; + /** The inner MaybeSource succeeded with a value in {@link #item}. */ + static final int STATE_RESULT_VALUE = 2; + + ConcatMapMaybeMainObserver(Observer downstream, + Function> mapper, + int prefetch, ErrorMode errorMode) { + this.downstream = downstream; + this.mapper = mapper; + this.errorMode = errorMode; + this.errors = new AtomicThrowable(); + this.inner = new ConcatMapMaybeObserver(this); + this.queue = new SpscLinkedArrayQueue(prefetch); + } + + @Override + public void onSubscribe(Disposable s) { + if (DisposableHelper.validate(upstream, s)) { + upstream = s; + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + queue.offer(t); + drain(); + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (errorMode == ErrorMode.IMMEDIATE) { + inner.dispose(); + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void dispose() { + cancelled = true; + upstream.dispose(); + inner.dispose(); + if (getAndIncrement() != 0) { + queue.clear(); + item = null; + } + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void innerSuccess(R item) { + this.item = item; + this.state = STATE_RESULT_VALUE; + drain(); + } + + void innerComplete() { + this.state = STATE_INACTIVE; + drain(); + } + + void innerError(Throwable ex) { + if (errors.addThrowable(ex)) { + if (errorMode != ErrorMode.END) { + upstream.dispose(); + } + this.state = STATE_INACTIVE; + drain(); + } else { + RxJavaPlugins.onError(ex); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Observer downstream = this.downstream; + ErrorMode errorMode = this.errorMode; + SimplePlainQueue queue = this.queue; + AtomicThrowable errors = this.errors; + + for (;;) { + + for (;;) { + if (cancelled) { + queue.clear(); + item = null; + } + + int s = state; + + if (errors.get() != null) { + if (errorMode == ErrorMode.IMMEDIATE + || (errorMode == ErrorMode.BOUNDARY && s == STATE_INACTIVE)) { + queue.clear(); + item = null; + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + if (s == STATE_INACTIVE) { + boolean d = done; + T v = queue.poll(); + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + return; + } + + if (empty) { + break; + } + + MaybeSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.dispose(); + queue.clear(); + errors.addThrowable(ex); + ex = errors.terminate(); + downstream.onError(ex); + return; + } + + state = STATE_ACTIVE; + ms.subscribe(inner); + break; + } else if (s == STATE_RESULT_VALUE) { + R w = item; + item = null; + downstream.onNext(w); + + state = STATE_INACTIVE; + } else { + break; + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class ConcatMapMaybeObserver + extends AtomicReference + implements MaybeObserver { + + private static final long serialVersionUID = -3051469169682093892L; + + final ConcatMapMaybeMainObserver parent; + + ConcatMapMaybeObserver(ConcatMapMaybeMainObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); + } + + @Override + public void onSuccess(R t) { + parent.innerSuccess(t); + } + + @Override + public void onError(Throwable e) { + parent.innerError(e); + } + + @Override + public void onComplete() { + parent.innerComplete(); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java new file mode 100644 index 0000000000..d29204ccc7 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java @@ -0,0 +1,296 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps each upstream item into a {@link SingleSource}, subscribes to them one after the other terminates + * and relays their success values, optionally delaying any errors till the main and inner sources + * terminate. + * + * @param the upstream element type + * @param the output element type + * + * @since 2.1.11 - experimental + */ +@Experimental +public final class ObservableConcatMapSingle extends Observable { + + final Observable source; + + final Function> mapper; + + final ErrorMode errorMode; + + final int prefetch; + + public ObservableConcatMapSingle(Observable source, + Function> mapper, + ErrorMode errorMode, int prefetch) { + this.source = source; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new ConcatMapSingleMainObserver(s, mapper, prefetch, errorMode)); + } + + static final class ConcatMapSingleMainObserver + extends AtomicInteger + implements Observer, Disposable { + + private static final long serialVersionUID = -9140123220065488293L; + + final Observer downstream; + + final Function> mapper; + + final AtomicThrowable errors; + + final ConcatMapSingleObserver inner; + + final SimplePlainQueue queue; + + final ErrorMode errorMode; + + Disposable upstream; + + volatile boolean done; + + volatile boolean cancelled; + + R item; + + volatile int state; + + /** No inner SingleSource is running. */ + static final int STATE_INACTIVE = 0; + /** An inner SingleSource is running but there are no results yet. */ + static final int STATE_ACTIVE = 1; + /** The inner SingleSource succeeded with a value in {@link #item}. */ + static final int STATE_RESULT_VALUE = 2; + + ConcatMapSingleMainObserver(Observer downstream, + Function> mapper, + int prefetch, ErrorMode errorMode) { + this.downstream = downstream; + this.mapper = mapper; + this.errorMode = errorMode; + this.errors = new AtomicThrowable(); + this.inner = new ConcatMapSingleObserver(this); + this.queue = new SpscArrayQueue(prefetch); + } + + @Override + public void onSubscribe(Disposable s) { + if (DisposableHelper.validate(upstream, s)) { + upstream = s; + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + queue.offer(t); + drain(); + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (errorMode == ErrorMode.IMMEDIATE) { + inner.dispose(); + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void dispose() { + cancelled = true; + upstream.dispose(); + inner.dispose(); + if (getAndIncrement() != 0) { + queue.clear(); + item = null; + } + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void innerSuccess(R item) { + this.item = item; + this.state = STATE_RESULT_VALUE; + drain(); + } + + void innerError(Throwable ex) { + if (errors.addThrowable(ex)) { + if (errorMode != ErrorMode.END) { + upstream.dispose(); + } + this.state = STATE_INACTIVE; + drain(); + } else { + RxJavaPlugins.onError(ex); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Observer downstream = this.downstream; + ErrorMode errorMode = this.errorMode; + SimplePlainQueue queue = this.queue; + AtomicThrowable errors = this.errors; + + for (;;) { + + for (;;) { + if (cancelled) { + queue.clear(); + item = null; + } + + int s = state; + + if (errors.get() != null) { + if (errorMode == ErrorMode.IMMEDIATE + || (errorMode == ErrorMode.BOUNDARY && s == STATE_INACTIVE)) { + queue.clear(); + item = null; + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + if (s == STATE_INACTIVE) { + boolean d = done; + T v = queue.poll(); + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + return; + } + + if (empty) { + break; + } + + SingleSource ss; + + try { + ss = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.dispose(); + queue.clear(); + errors.addThrowable(ex); + ex = errors.terminate(); + downstream.onError(ex); + return; + } + + state = STATE_ACTIVE; + ss.subscribe(inner); + break; + } else if (s == STATE_RESULT_VALUE) { + R w = item; + item = null; + downstream.onNext(w); + + state = STATE_INACTIVE; + } else { + break; + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class ConcatMapSingleObserver + extends AtomicReference + implements SingleObserver { + + private static final long serialVersionUID = -3051469169682093892L; + + final ConcatMapSingleMainObserver parent; + + ConcatMapSingleObserver(ConcatMapSingleMainObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); + } + + @Override + public void onSuccess(R t) { + parent.innerSuccess(t); + } + + @Override + public void onError(Throwable e) { + parent.innerError(e); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java new file mode 100644 index 0000000000..650c9dfbd8 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java @@ -0,0 +1,235 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while + * disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one + * active {@code CompletableSource} running. + * + * @param the upstream value type + * @since 2.1.11 - experimental + */ +@Experimental +public final class ObservableSwitchMapCompletable extends Completable { + + final Observable source; + + final Function mapper; + + final boolean delayErrors; + + public ObservableSwitchMapCompletable(Observable source, + Function mapper, boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(CompletableObserver s) { + source.subscribe(new SwitchMapCompletableObserver(s, mapper, delayErrors)); + } + + static final class SwitchMapCompletableObserver implements Observer, Disposable { + + final CompletableObserver downstream; + + final Function mapper; + + final boolean delayErrors; + + final AtomicThrowable errors; + + final AtomicReference inner; + + static final SwitchMapInnerObserver INNER_DISPOSED = new SwitchMapInnerObserver(null); + + volatile boolean done; + + Disposable upstream; + + SwitchMapCompletableObserver(CompletableObserver downstream, + Function mapper, boolean delayErrors) { + this.downstream = downstream; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.inner = new AtomicReference(); + } + + @Override + public void onSubscribe(Disposable s) { + if (DisposableHelper.validate(upstream, s)) { + this.upstream = s; + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + CompletableSource c; + + try { + c = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.dispose(); + onError(ex); + return; + } + + SwitchMapInnerObserver o = new SwitchMapInnerObserver(this); + + for (;;) { + SwitchMapInnerObserver current = inner.get(); + if (current == INNER_DISPOSED) { + break; + } + if (inner.compareAndSet(current, o)) { + if (current != null) { + current.dispose(); + } + c.subscribe(o); + break; + } + } + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (delayErrors) { + onComplete(); + } else { + disposeInner(); + Throwable ex = errors.terminate(); + if (ex != ExceptionHelper.TERMINATED) { + downstream.onError(ex); + } + } + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + if (inner.get() == null) { + Throwable ex = errors.terminate(); + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + } + } + + void disposeInner() { + SwitchMapInnerObserver o = inner.getAndSet(INNER_DISPOSED); + if (o != null && o != INNER_DISPOSED) { + o.dispose(); + } + } + + @Override + public void dispose() { + upstream.dispose(); + disposeInner(); + } + + @Override + public boolean isDisposed() { + return inner.get() == INNER_DISPOSED; + } + + void innerError(SwitchMapInnerObserver sender, Throwable error) { + if (inner.compareAndSet(sender, null)) { + if (errors.addThrowable(error)) { + if (delayErrors) { + if (done) { + Throwable ex = errors.terminate(); + downstream.onError(ex); + } + } else { + dispose(); + Throwable ex = errors.terminate(); + if (ex != ExceptionHelper.TERMINATED) { + downstream.onError(ex); + } + } + return; + } + } + RxJavaPlugins.onError(error); + } + + void innerComplete(SwitchMapInnerObserver sender) { + if (inner.compareAndSet(sender, null)) { + if (done) { + Throwable ex = errors.terminate(); + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + } + } + } + + static final class SwitchMapInnerObserver extends AtomicReference + implements CompletableObserver { + + private static final long serialVersionUID = -8003404460084760287L; + + final SwitchMapCompletableObserver parent; + + SwitchMapInnerObserver(SwitchMapCompletableObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onError(Throwable e) { + parent.innerError(this, e); + } + + @Override + public void onComplete() { + parent.innerComplete(this); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java new file mode 100644 index 0000000000..ef7f863603 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java @@ -0,0 +1,288 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones + * while disposing the older ones and emits the latest success value if available, optionally delaying + * errors from the main source or the inner sources. + * + * @param the upstream value type + * @param the downstream value type + * @since 2.1.11 - experimental + */ +@Experimental +public final class ObservableSwitchMapMaybe extends Observable { + + final Observable source; + + final Function> mapper; + + final boolean delayErrors; + + public ObservableSwitchMapMaybe(Observable source, + Function> mapper, + boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new SwitchMapMaybeMainObserver(s, mapper, delayErrors)); + } + + static final class SwitchMapMaybeMainObserver extends AtomicInteger + implements Observer, Disposable { + + private static final long serialVersionUID = -5402190102429853762L; + + final Observer downstream; + + final Function> mapper; + + final boolean delayErrors; + + final AtomicThrowable errors; + + final AtomicReference> inner; + + static final SwitchMapMaybeObserver INNER_DISPOSED = + new SwitchMapMaybeObserver(null); + + Disposable upstream; + + volatile boolean done; + + volatile boolean cancelled; + + SwitchMapMaybeMainObserver(Observer downstream, + Function> mapper, + boolean delayErrors) { + this.downstream = downstream; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.inner = new AtomicReference>(); + } + + @Override + public void onSubscribe(Disposable s) { + if (DisposableHelper.validate(upstream, s)) { + upstream = s; + downstream.onSubscribe(this); + } + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void onNext(T t) { + SwitchMapMaybeObserver current = inner.get(); + if (current != null) { + current.dispose(); + } + + MaybeSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.dispose(); + inner.getAndSet((SwitchMapMaybeObserver)INNER_DISPOSED); + onError(ex); + return; + } + + SwitchMapMaybeObserver observer = new SwitchMapMaybeObserver(this); + + for (;;) { + current = inner.get(); + if (current == INNER_DISPOSED) { + break; + } + if (inner.compareAndSet(current, observer)) { + ms.subscribe(observer); + break; + } + } + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (!delayErrors) { + disposeInner(); + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + void disposeInner() { + SwitchMapMaybeObserver current = inner.getAndSet((SwitchMapMaybeObserver)INNER_DISPOSED); + if (current != null && current != INNER_DISPOSED) { + current.dispose(); + } + } + + @Override + public void dispose() { + cancelled = true; + upstream.dispose(); + disposeInner(); + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void innerError(SwitchMapMaybeObserver sender, Throwable ex) { + if (inner.compareAndSet(sender, null)) { + if (errors.addThrowable(ex)) { + if (!delayErrors) { + upstream.dispose(); + disposeInner(); + } + drain(); + return; + } + } + RxJavaPlugins.onError(ex); + } + + void innerComplete(SwitchMapMaybeObserver sender) { + if (inner.compareAndSet(sender, null)) { + drain(); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Observer downstream = this.downstream; + AtomicThrowable errors = this.errors; + AtomicReference> inner = this.inner; + + for (;;) { + + for (;;) { + if (cancelled) { + return; + } + + if (errors.get() != null) { + if (!delayErrors) { + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + boolean d = done; + SwitchMapMaybeObserver current = inner.get(); + boolean empty = current == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } + + if (empty || current.item == null) { + break; + } + + inner.compareAndSet(current, null); + + downstream.onNext(current.item); + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class SwitchMapMaybeObserver + extends AtomicReference implements MaybeObserver { + + private static final long serialVersionUID = 8042919737683345351L; + + final SwitchMapMaybeMainObserver parent; + + volatile R item; + + SwitchMapMaybeObserver(SwitchMapMaybeMainObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(R t) { + item = t; + parent.drain(); + } + + @Override + public void onError(Throwable e) { + parent.innerError(this, e); + } + + @Override + public void onComplete() { + parent.innerComplete(this); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java new file mode 100644 index 0000000000..c70bad0bcc --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java @@ -0,0 +1,277 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps the upstream items into {@link SingleSource}s and switches (subscribes) to the newer ones + * while disposing the older ones and emits the latest success value if available, optionally delaying + * errors from the main source or the inner sources. + * + * @param the upstream value type + * @param the downstream value type + * @since 2.1.11 - experimental + */ +@Experimental +public final class ObservableSwitchMapSingle extends Observable { + + final Observable source; + + final Function> mapper; + + final boolean delayErrors; + + public ObservableSwitchMapSingle(Observable source, + Function> mapper, + boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new SwitchMapSingleMainObserver(s, mapper, delayErrors)); + } + + static final class SwitchMapSingleMainObserver extends AtomicInteger + implements Observer, Disposable { + + private static final long serialVersionUID = -5402190102429853762L; + + final Observer downstream; + + final Function> mapper; + + final boolean delayErrors; + + final AtomicThrowable errors; + + final AtomicReference> inner; + + static final SwitchMapSingleObserver INNER_DISPOSED = + new SwitchMapSingleObserver(null); + + Disposable upstream; + + volatile boolean done; + + volatile boolean cancelled; + + SwitchMapSingleMainObserver(Observer downstream, + Function> mapper, + boolean delayErrors) { + this.downstream = downstream; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.inner = new AtomicReference>(); + } + + @Override + public void onSubscribe(Disposable s) { + if (DisposableHelper.validate(upstream, s)) { + upstream = s; + downstream.onSubscribe(this); + } + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void onNext(T t) { + SwitchMapSingleObserver current = inner.get(); + if (current != null) { + current.dispose(); + } + + SingleSource ss; + + try { + ss = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.dispose(); + inner.getAndSet((SwitchMapSingleObserver)INNER_DISPOSED); + onError(ex); + return; + } + + SwitchMapSingleObserver observer = new SwitchMapSingleObserver(this); + + for (;;) { + current = inner.get(); + if (current == INNER_DISPOSED) { + break; + } + if (inner.compareAndSet(current, observer)) { + ss.subscribe(observer); + break; + } + } + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (!delayErrors) { + disposeInner(); + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + void disposeInner() { + SwitchMapSingleObserver current = inner.getAndSet((SwitchMapSingleObserver)INNER_DISPOSED); + if (current != null && current != INNER_DISPOSED) { + current.dispose(); + } + } + + @Override + public void dispose() { + cancelled = true; + upstream.dispose(); + disposeInner(); + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void innerError(SwitchMapSingleObserver sender, Throwable ex) { + if (inner.compareAndSet(sender, null)) { + if (errors.addThrowable(ex)) { + if (!delayErrors) { + upstream.dispose(); + disposeInner(); + } + drain(); + return; + } + } + RxJavaPlugins.onError(ex); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Observer downstream = this.downstream; + AtomicThrowable errors = this.errors; + AtomicReference> inner = this.inner; + + for (;;) { + + for (;;) { + if (cancelled) { + return; + } + + if (errors.get() != null) { + if (!delayErrors) { + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + boolean d = done; + SwitchMapSingleObserver current = inner.get(); + boolean empty = current == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } + + if (empty || current.item == null) { + break; + } + + inner.compareAndSet(current, null); + + downstream.onNext(current.item); + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class SwitchMapSingleObserver + extends AtomicReference implements SingleObserver { + + private static final long serialVersionUID = 8042919737683345351L; + + final SwitchMapSingleMainObserver parent; + + volatile R item; + + SwitchMapSingleObserver(SwitchMapSingleMainObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(R t) { + item = t; + parent.drain(); + } + + @Override + public void onError(Throwable e) { + parent.innerError(this, e); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapCompletable.java deleted file mode 100644 index b53de7cde7..0000000000 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapCompletable.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ -package io.reactivex.internal.operators.observable; - -import java.util.concurrent.atomic.*; - -import io.reactivex.*; -import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.Function; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.*; -import io.reactivex.internal.queue.SpscLinkedArrayQueue; -import io.reactivex.plugins.RxJavaPlugins; - -public final class ObservableConcatMapCompletable extends Completable { - - final ObservableSource source; - final Function mapper; - final int bufferSize; - - public ObservableConcatMapCompletable(ObservableSource source, - Function mapper, int bufferSize) { - this.source = source; - this.mapper = mapper; - this.bufferSize = Math.max(8, bufferSize); - } - @Override - public void subscribeActual(CompletableObserver observer) { - source.subscribe(new SourceObserver(observer, mapper, bufferSize)); - } - - static final class SourceObserver extends AtomicInteger implements Observer, Disposable { - - private static final long serialVersionUID = 6893587405571511048L; - final CompletableObserver actual; - final Function mapper; - final InnerObserver inner; - final int bufferSize; - - SimpleQueue queue; - - Disposable s; - - volatile boolean active; - - volatile boolean disposed; - - volatile boolean done; - - int sourceMode; - - SourceObserver(CompletableObserver actual, - Function mapper, int bufferSize) { - this.actual = actual; - this.mapper = mapper; - this.bufferSize = bufferSize; - this.inner = new InnerObserver(actual, this); - } - @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - if (s instanceof QueueDisposable) { - @SuppressWarnings("unchecked") - QueueDisposable qd = (QueueDisposable) s; - - int m = qd.requestFusion(QueueDisposable.ANY); - if (m == QueueDisposable.SYNC) { - sourceMode = m; - queue = qd; - done = true; - - actual.onSubscribe(this); - - drain(); - return; - } - - if (m == QueueDisposable.ASYNC) { - sourceMode = m; - queue = qd; - - actual.onSubscribe(this); - - return; - } - } - - queue = new SpscLinkedArrayQueue(bufferSize); - - actual.onSubscribe(this); - } - } - @Override - public void onNext(T t) { - if (done) { - return; - } - if (sourceMode == QueueDisposable.NONE) { - queue.offer(t); - } - drain(); - } - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - done = true; - dispose(); - actual.onError(t); - } - @Override - public void onComplete() { - if (done) { - return; - } - done = true; - drain(); - } - - void innerComplete() { - active = false; - drain(); - } - - @Override - public boolean isDisposed() { - return disposed; - } - - @Override - public void dispose() { - disposed = true; - inner.dispose(); - s.dispose(); - - if (getAndIncrement() == 0) { - queue.clear(); - } - } - - void drain() { - if (getAndIncrement() != 0) { - return; - } - - for (;;) { - if (disposed) { - queue.clear(); - return; - } - if (!active) { - - boolean d = done; - - T t; - - try { - t = queue.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - dispose(); - queue.clear(); - actual.onError(ex); - return; - } - - boolean empty = t == null; - - if (d && empty) { - disposed = true; - actual.onComplete(); - return; - } - - if (!empty) { - CompletableSource c; - - try { - c = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null CompletableSource"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - dispose(); - queue.clear(); - actual.onError(ex); - return; - } - - active = true; - c.subscribe(inner); - } - } - - if (decrementAndGet() == 0) { - break; - } - } - } - - static final class InnerObserver extends AtomicReference implements CompletableObserver { - private static final long serialVersionUID = -5987419458390772447L; - final CompletableObserver actual; - final SourceObserver parent; - - InnerObserver(CompletableObserver actual, SourceObserver parent) { - this.actual = actual; - this.parent = parent; - } - - @Override - public void onSubscribe(Disposable s) { - DisposableHelper.set(this, s); - } - - @Override - public void onError(Throwable t) { - parent.dispose(); - actual.onError(t); - } - @Override - public void onComplete() { - parent.innerComplete(); - } - - void dispose() { - DisposableHelper.dispose(this); - } - } - } -} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index b5fe5f48b5..7bc6305703 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -17,11 +17,8 @@ import io.reactivex.*; import io.reactivex.functions.*; -import io.reactivex.internal.functions.Functions; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.operators.single.SingleToObservable; +import io.reactivex.internal.functions.*; import io.reactivex.observables.ConnectableObservable; -import io.reactivex.plugins.RxJavaPlugins; /** * Helper utility class to support Observable with inner classes. @@ -294,37 +291,6 @@ public static Function>, ObservableSou return new ZipIterableFunction(zipper); } - public static Observable switchMapSingle(Observable source, final Function> mapper) { - return source.switchMap(convertSingleMapperToObservableMapper(mapper), 1); - } - - public static Observable switchMapSingleDelayError(Observable source, - Function> mapper) { - return source.switchMapDelayError(convertSingleMapperToObservableMapper(mapper), 1); - } - - private static Function> convertSingleMapperToObservableMapper( - final Function> mapper) { - ObjectHelper.requireNonNull(mapper, "mapper is null"); - return new ObservableMapper(mapper); - } - - static final class ObservableMapper implements Function> { - - final Function> mapper; - - ObservableMapper(Function> mapper) { - this.mapper = mapper; - } - - @Override - public Observable apply(T t) throws Exception { - return RxJavaPlugins.onAssembly(new SingleToObservable( - ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource"))); - } - - } - static final class ReplayCallable implements Callable> { private final Observable parent; diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletableTest.java new file mode 100644 index 0000000000..50c79bb01f --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletableTest.java @@ -0,0 +1,362 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.*; + +public class ObservableConcatMapCompletableTest { + + @Test + public void simple() { + Observable.range(1, 5) + .concatMapCompletable(Functions.justFunction(Completable.complete())) + .test() + .assertResult(); + } + + @Test + public void simple2() { + final AtomicInteger counter = new AtomicInteger(); + Observable.range(1, 5) + .concatMapCompletable(Functions.justFunction(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + counter.incrementAndGet(); + } + }))) + .test() + .assertResult(); + + assertEquals(5, counter.get()); + } + + @Test + public void simpleLongPrefetch() { + Observable.range(1, 1024) + .concatMapCompletable(Functions.justFunction(Completable.complete()), 32) + .test() + .assertResult(); + } + + @Test + public void mainError() { + Observable.error(new TestException()) + .concatMapCompletable(Functions.justFunction(Completable.complete())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Observable.just(1) + .concatMapCompletable(Functions.justFunction(Completable.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerErrorDelayed() { + Observable.range(1, 5) + .concatMapCompletableDelayError( + new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + } + ) + .test() + .assertFailure(CompositeException.class) + .assertOf(new Consumer>() { + @Override + public void accept(TestObserver to) throws Exception { + assertEquals(5, ((CompositeException)to.errors().get(0)).getExceptions().size()); + } + }); + } + + @Test + public void mapperCrash() { + Observable.just(1) + .concatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void immediateError() { + PublishSubject ps = PublishSubject.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.concatMapCompletable( + Functions.justFunction(cs)).test(); + + to.assertEmpty(); + + assertTrue(ps.hasObservers()); + assertFalse(cs.hasObservers()); + + ps.onNext(1); + + assertTrue(cs.hasObservers()); + + ps.onError(new TestException()); + + assertFalse(cs.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void immediateError2() { + PublishSubject ps = PublishSubject.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.concatMapCompletable( + Functions.justFunction(cs)).test(); + + to.assertEmpty(); + + assertTrue(ps.hasObservers()); + assertFalse(cs.hasObservers()); + + ps.onNext(1); + + assertTrue(cs.hasObservers()); + + cs.onError(new TestException()); + + assertFalse(ps.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void boundaryError() { + PublishSubject ps = PublishSubject.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.concatMapCompletableDelayError( + Functions.justFunction(cs), false).test(); + + to.assertEmpty(); + + assertTrue(ps.hasObservers()); + assertFalse(cs.hasObservers()); + + ps.onNext(1); + + assertTrue(cs.hasObservers()); + + ps.onError(new TestException()); + + assertTrue(cs.hasObservers()); + + to.assertEmpty(); + + cs.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void endError() { + PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + final CompletableSubject cs2 = CompletableSubject.create(); + + TestObserver to = ps.concatMapCompletableDelayError( + new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + if (v == 1) { + return cs; + } + return cs2; + } + }, true, 32 + ) + .test(); + + to.assertEmpty(); + + assertTrue(ps.hasObservers()); + assertFalse(cs.hasObservers()); + + ps.onNext(1); + + assertTrue(cs.hasObservers()); + + cs.onError(new TestException()); + + assertTrue(ps.hasObservers()); + + ps.onNext(2); + + to.assertEmpty(); + + cs2.onComplete(); + + assertTrue(ps.hasObservers()); + + to.assertEmpty(); + + ps.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservableToCompletable( + new Function, Completable>() { + @Override + public Completable apply(Observable f) + throws Exception { + return f.concatMapCompletable( + Functions.justFunction(Completable.complete())); + } + } + ); + } + + @Test + public void disposed() { + TestHelper.checkDisposed( + Observable.never() + .concatMapCompletable( + Functions.justFunction(Completable.complete())) + ); + } + + @Test + public void immediateOuterInnerErrorRace() { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.concatMapCompletable( + Functions.justFunction(cs) + ) + .test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }) + .assertNotComplete(); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void disposeInDrainLoop() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + final TestObserver to = ps.concatMapCompletable( + Functions.justFunction(cs) + ) + .test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onComplete(); + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + + to.assertEmpty(); + } + } + + @Test + public void doneButNotEmpty() { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + final TestObserver to = ps.concatMapCompletable( + Functions.justFunction(cs) + ) + .test(); + + ps.onNext(1); + ps.onNext(2); + ps.onComplete(); + + cs.onComplete(); + + to.assertResult(); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java new file mode 100644 index 0000000000..154508bcf7 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java @@ -0,0 +1,349 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.*; + +public class ObservableConcatMapMaybeTest { + + @Test + public void simple() { + Observable.range(1, 5) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void simpleLong() { + Observable.range(1, 1024) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }, 32) + .test() + .assertValueCount(1024) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void empty() { + Observable.range(1, 10) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }) + .test() + .assertResult(); + } + + @Test + public void mixed() { + Observable.range(1, 10) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v % 2 == 0) { + return Maybe.just(v); + } + return Maybe.empty(); + } + }) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void mixedLong() { + Observable.range(1, 1024) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v % 2 == 0) { + return Maybe.just(v).subscribeOn(Schedulers.computation()); + } + return Maybe.empty().subscribeOn(Schedulers.computation()); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(512) + .assertNoErrors() + .assertComplete() + .assertOf(new Consumer>() { + @Override + public void accept(TestObserver ts) throws Exception { + for (int i = 0; i < 512; i ++) { + ts.assertValueAt(i, (i + 1) * 2); + } + } + }); + } + + @Test + public void mainError() { + Observable.error(new TestException()) + .concatMapMaybe(Functions.justFunction(Maybe.just(1))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Observable.just(1) + .concatMapMaybe(Functions.justFunction(Maybe.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mainBoundaryErrorInnerSuccess() { + PublishSubject ps = PublishSubject.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestObserver ts = ps.concatMapMaybeDelayError(Functions.justFunction(ms), false).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + assertTrue(ms.hasObservers()); + + ps.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void mainBoundaryErrorInnerEmpty() { + PublishSubject ps = PublishSubject.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestObserver ts = ps.concatMapMaybeDelayError(Functions.justFunction(ms), false).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + assertTrue(ms.hasObservers()); + + ps.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onComplete(); + + ts.assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable( + new Function, Observable>() { + @Override + public Observable apply(Observable f) + throws Exception { + return f.concatMapMaybeDelayError( + Functions.justFunction(Maybe.empty())); + } + } + ); + } + + @Test + public void take() { + Observable.range(1, 5) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .take(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void cancel() { + Observable.range(1, 5).concatWith(Observable.never()) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .test() + .assertValues(1, 2, 3, 4, 5) + .assertNoErrors() + .assertNotComplete() + .cancel(); + } + + @Test + public void mainErrorAfterInnerError() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .concatMapMaybe( + Functions.justFunction(Maybe.error(new TestException("inner"))), 1 + ) + .test() + .assertFailureAndMessage(TestException.class, "inner"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "outer"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void innerErrorAfterMainError() { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final AtomicReference> obs = new AtomicReference>(); + + TestObserver ts = ps.concatMapMaybe( + new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return new Maybe() { + @Override + protected void subscribeActual( + MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + obs.set(observer); + } + }; + } + } + ).test(); + + ps.onNext(1); + + ps.onError(new TestException("outer")); + obs.get().onError(new TestException("inner")); + + ts.assertFailureAndMessage(TestException.class, "outer"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void delayAllErrors() { + Observable.range(1, 5) + .concatMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.error(new TestException()); + } + }) + .test() + .assertFailure(CompositeException.class) + .assertOf(new Consumer>() { + @Override + public void accept(TestObserver ts) throws Exception { + CompositeException ce = (CompositeException)ts.errors().get(0); + assertEquals(5, ce.getExceptions().size()); + } + }); + } + + @Test + public void mapperCrash() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver ts = ps + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + throw new TestException(); + } + }) + .test(); + + ts.assertEmpty(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + ts.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.just(1) + .concatMapMaybe(Functions.justFunction(Maybe.never())) + ); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java new file mode 100644 index 0000000000..48a35df533 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java @@ -0,0 +1,286 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.*; + +public class ObservableConcatMapSingleTest { + + @Test + public void simple() { + Observable.range(1, 5) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void simpleLong() { + Observable.range(1, 1024) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }, 32) + .test() + .assertValueCount(1024) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mainError() { + Observable.error(new TestException()) + .concatMapSingle(Functions.justFunction(Single.just(1))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Observable.just(1) + .concatMapSingle(Functions.justFunction(Single.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mainBoundaryErrorInnerSuccess() { + PublishSubject ps = PublishSubject.create(); + SingleSubject ms = SingleSubject.create(); + + TestObserver ts = ps.concatMapSingleDelayError(Functions.justFunction(ms), false).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + assertTrue(ms.hasObservers()); + + ps.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable( + new Function, Observable>() { + @Override + public Observable apply(Observable f) + throws Exception { + return f.concatMapSingleDelayError( + Functions.justFunction(Single.never())); + } + } + ); + } + + @Test + public void take() { + Observable.range(1, 5) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .take(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void cancel() { + Observable.range(1, 5).concatWith(Observable.never()) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .test() + .assertValues(1, 2, 3, 4, 5) + .assertNoErrors() + .assertNotComplete() + .cancel(); + } + + @Test + public void mainErrorAfterInnerError() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .concatMapSingle( + Functions.justFunction(Single.error(new TestException("inner"))), 1 + ) + .test() + .assertFailureAndMessage(TestException.class, "inner"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "outer"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void innerErrorAfterMainError() { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final AtomicReference> obs = new AtomicReference>(); + + TestObserver ts = ps.concatMapSingle( + new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return new Single() { + @Override + protected void subscribeActual( + SingleObserver observer) { + observer.onSubscribe(Disposables.empty()); + obs.set(observer); + } + }; + } + } + ).test(); + + ps.onNext(1); + + ps.onError(new TestException("outer")); + obs.get().onError(new TestException("inner")); + + ts.assertFailureAndMessage(TestException.class, "outer"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void delayAllErrors() { + Observable.range(1, 5) + .concatMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.error(new TestException()); + } + }) + .test() + .assertFailure(CompositeException.class) + .assertOf(new Consumer>() { + @Override + public void accept(TestObserver ts) throws Exception { + CompositeException ce = (CompositeException)ts.errors().get(0); + assertEquals(5, ce.getExceptions().size()); + } + }); + } + + @Test + public void mapperCrash() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver ts = ps + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + throw new TestException(); + } + }) + .test(); + + ts.assertEmpty(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + ts.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.just(1) + .concatMapSingle(Functions.justFunction(Single.never())) + ); + } + + @Test + public void mainCompletesWhileInnerActive() { + PublishSubject ps = PublishSubject.create(); + SingleSubject ms = SingleSubject.create(); + + TestObserver ts = ps.concatMapSingleDelayError(Functions.justFunction(ms), false).test(); + + ts.assertEmpty(); + + ps.onNext(1); + ps.onNext(2); + ps.onComplete(); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertResult(1, 1); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletableTest.java new file mode 100644 index 0000000000..17edbf46e5 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletableTest.java @@ -0,0 +1,387 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.*; + +public class ObservableSwitchMapCompletableTest { + + @Test + public void normal() { + Observable.range(1, 10) + .switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }) + .test() + .assertResult(); + } + + @Test + public void mainError() { + Observable.error(new TestException()) + .switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + PublishSubject ps = PublishSubject.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.switchMapCompletable(Functions.justFunction(cs)) + .test(); + + assertTrue(ps.hasObservers()); + assertFalse(cs.hasObservers()); + + ps.onNext(1); + + assertTrue(cs.hasObservers()); + + to.assertEmpty(); + + cs.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + assertFalse(cs.hasObservers()); + } + + @Test + public void switchOver() { + final CompletableSubject[] css = { + CompletableSubject.create(), + CompletableSubject.create() + }; + + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return css[v]; + } + }) + .test(); + + to.assertEmpty(); + + ps.onNext(0); + + assertTrue(css[0].hasObservers()); + + ps.onNext(1); + + assertFalse(css[0].hasObservers()); + assertTrue(css[1].hasObservers()); + + ps.onComplete(); + + to.assertEmpty(); + + assertTrue(css[1].hasObservers()); + + css[1].onComplete(); + + to.assertResult(); + } + + @Test + public void dispose() { + PublishSubject ps = PublishSubject.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.switchMapCompletable(Functions.justFunction(cs)) + .test(); + + ps.onNext(1); + + assertTrue(ps.hasObservers()); + assertTrue(cs.hasObservers()); + + to.dispose(); + + assertFalse(ps.hasObservers()); + assertFalse(cs.hasObservers()); + } + + @Test + public void checkDisposed() { + PublishSubject ps = PublishSubject.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestHelper.checkDisposed(ps.switchMapCompletable(Functions.justFunction(cs))); + } + + @Test + public void checkBadSource() { + TestHelper.checkDoubleOnSubscribeObservableToCompletable(new Function, Completable>() { + @Override + public Completable apply(Observable f) throws Exception { + return f.switchMapCompletable(Functions.justFunction(Completable.never())); + } + }); + } + + @Test + public void mapperCrash() { + Observable.range(1, 5).switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer f) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperCancels() { + final TestObserver to = new TestObserver(); + + Observable.range(1, 5).switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer f) throws Exception { + to.cancel(); + return Completable.complete(); + } + }) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void onNextInnerCompleteRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.switchMapCompletable(Functions.justFunction(cs)).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + to.assertEmpty(); + } + } + + @Test + public void onNextInnerErrorRace() { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.switchMapCompletable(Functions.justFunction(cs)).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void onErrorInnerErrorRace() { + final TestException ex0 = new TestException(); + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.switchMapCompletable(Functions.justFunction(cs)).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onError(ex0); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void innerErrorThenMainError() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onError(new TestException("main")); + } + } + .switchMapCompletable(Functions.justFunction(Completable.error(new TestException("inner")))) + .test() + .assertFailureAndMessage(TestException.class, "inner"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "main"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void innerErrorDelayed() { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.switchMapCompletableDelayError(Functions.justFunction(cs)).test(); + + ps.onNext(1); + + cs.onError(new TestException()); + + to.assertEmpty(); + + assertTrue(ps.hasObservers()); + + ps.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void mainCompletesinnerErrorDelayed() { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.switchMapCompletableDelayError(Functions.justFunction(cs)).test(); + + ps.onNext(1); + ps.onComplete(); + + to.assertEmpty(); + + cs.onError(new TestException()); + + to.assertFailure(TestException.class); + } + + @Test + public void mainErrorDelayed() { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.switchMapCompletableDelayError(Functions.justFunction(cs)).test(); + + ps.onNext(1); + + ps.onError(new TestException()); + + to.assertEmpty(); + + assertTrue(cs.hasObservers()); + + cs.onComplete(); + + to.assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybeTest.java new file mode 100644 index 0000000000..65f487a14e --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybeTest.java @@ -0,0 +1,645 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.*; + +public class ObservableSwitchMapMaybeTest { + + @Test + public void simple() { + Observable.range(1, 5) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void simpleEmpty() { + Observable.range(1, 5) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }) + .test() + .assertResult(); + } + + @Test + public void simpleMixed() { + Observable.range(1, 10) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v % 2 == 0) { + return Maybe.just(v); + } + return Maybe.empty(); + } + }) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void mainError() { + Observable.error(new TestException()) + .switchMapMaybe(Functions.justFunction(Maybe.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Observable.just(1) + .switchMapMaybe(Functions.justFunction(Maybe.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>() { + @Override + public Observable apply(Observable f) + throws Exception { + return f + .switchMapMaybe(Functions.justFunction(Maybe.never())); + } + } + ); + } + + @Test + public void take() { + Observable.range(1, 5) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .take(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void switchOver() { + PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms1 = MaybeSubject.create(); + final MaybeSubject ms2 = MaybeSubject.create(); + + TestObserver ts = ps.switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms1; + } + return ms2; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms1.hasObservers()); + + ps.onNext(2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + assertFalse(ps.hasObservers()); + + ts.assertFailure(TestException.class); + } + + @Test + public void switchOverDelayError() { + PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms1 = MaybeSubject.create(); + final MaybeSubject ms2 = MaybeSubject.create(); + + TestObserver ts = ps.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms1; + } + return ms2; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms1.hasObservers()); + + ps.onNext(2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + ts.assertEmpty(); + + assertTrue(ps.hasObservers()); + + ps.onComplete(); + + ts.assertFailure(TestException.class); + } + + @Test + public void mainErrorInnerCompleteDelayError() { + PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + TestObserver ts = ps.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + ps.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onComplete(); + + ts.assertFailure(TestException.class); + } + + @Test + public void mainErrorInnerSuccessDelayError() { + PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + TestObserver ts = ps.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + ps.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void mapperCrash() { + Observable.just(1) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void disposeBeforeSwitchInOnNext() { + final TestObserver ts = new TestObserver(); + + Observable.just(1) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + ts.cancel(); + return Maybe.just(1); + } + }).subscribe(ts); + + ts.assertEmpty(); + } + + @Test + public void disposeOnNextAfterFirst() { + final TestObserver ts = new TestObserver(); + + Observable.just(1, 2) + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 2) { + ts.cancel(); + } + return Maybe.just(1); + } + }).subscribe(ts); + + ts.assertValue(1) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void cancel() { + PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + TestObserver ts = ps.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ps.hasObservers()); + assertTrue(ms.hasObservers()); + + ts.cancel(); + + assertFalse(ps.hasObservers()); + assertFalse(ms.hasObservers()); + } + + @Test + public void mainErrorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.error(new TestException("inner")); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "inner"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "outer"); + } finally { + RxJavaPlugins.reset(); + } + } + + + @Test + public void innerErrorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> moRef = new AtomicReference>(); + + TestObserver ts = new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return new Maybe() { + @Override + protected void subscribeActual( + MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + moRef.set(observer); + } + }; + } + }) + .test(); + + ts.assertFailureAndMessage(TestException.class, "outer"); + + moRef.get().onError(new TestException("inner")); + moRef.get().onComplete(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void nextCancelRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + final TestObserver ts = ps.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors() + .assertNotComplete(); + } + } + + @Test + public void nextInnerErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + final TestObserver ts = ps.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Maybe.never(); + } + }).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + if (ts.errorCount() != 0) { + assertTrue(errors.isEmpty()); + ts.assertFailure(TestException.class); + } else if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void mainErrorInnerErrorRace() { + final TestException ex = new TestException(); + final TestException ex2 = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + final TestObserver ts = ps.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Maybe.never(); + } + }).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onError(ex2); + } + }; + + TestHelper.race(r1, r2); + + ts.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void nextInnerSuccessRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final MaybeSubject ms = MaybeSubject.create(); + + final TestObserver ts = ps.switchMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Maybe.empty(); + } + }).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onSuccess(3); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors() + .assertNotComplete(); + } + } + + @Test + public void checkDisposed() { + PublishSubject ps = PublishSubject.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestHelper.checkDisposed(ps.switchMapMaybe(Functions.justFunction(ms))); + } + + @Test + public void drainReentrant() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onNext(2); + } + } + }; + + ps.switchMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }).subscribe(to); + + ps.onNext(1); + ps.onComplete(); + + to.assertResult(1, 2); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingleTest.java new file mode 100644 index 0000000000..cafbf15e4d --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingleTest.java @@ -0,0 +1,613 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.*; + +public class ObservableSwitchMapSingleTest { + + @Test + public void simple() { + Observable.range(1, 5) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void mainError() { + Observable.error(new TestException()) + .switchMapSingle(Functions.justFunction(Single.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Observable.just(1) + .switchMapSingle(Functions.justFunction(Single.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>() { + @Override + public Observable apply(Observable f) + throws Exception { + return f + .switchMapSingle(Functions.justFunction(Single.never())); + } + } + ); + } + + @Test + public void take() { + Observable.range(1, 5) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .take(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void switchOver() { + PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms1 = SingleSubject.create(); + final SingleSubject ms2 = SingleSubject.create(); + + TestObserver ts = ps.switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms1; + } + return ms2; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms1.hasObservers()); + + ps.onNext(2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + assertFalse(ps.hasObservers()); + + ts.assertFailure(TestException.class); + } + + @Test + public void switchOverDelayError() { + PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms1 = SingleSubject.create(); + final SingleSubject ms2 = SingleSubject.create(); + + TestObserver ts = ps.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms1; + } + return ms2; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms1.hasObservers()); + + ps.onNext(2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + ts.assertEmpty(); + + assertTrue(ps.hasObservers()); + + ps.onComplete(); + + ts.assertFailure(TestException.class); + } + + @Test + public void mainErrorInnerCompleteDelayError() { + PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms = SingleSubject.create(); + + TestObserver ts = ps.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + ps.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void mainErrorInnerSuccessDelayError() { + PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms = SingleSubject.create(); + + TestObserver ts = ps.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + ps.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void mapperCrash() { + Observable.just(1) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void disposeBeforeSwitchInOnNext() { + final TestObserver ts = new TestObserver(); + + Observable.just(1) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + ts.cancel(); + return Single.just(1); + } + }).subscribe(ts); + + ts.assertEmpty(); + } + + @Test + public void disposeOnNextAfterFirst() { + final TestObserver ts = new TestObserver(); + + Observable.just(1, 2) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 2) { + ts.cancel(); + } + return Single.just(1); + } + }).subscribe(ts); + + ts.assertValue(1) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void cancel() { + PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms = SingleSubject.create(); + + TestObserver ts = ps.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + ts.assertEmpty(); + + ps.onNext(1); + + ts.assertEmpty(); + + assertTrue(ps.hasObservers()); + assertTrue(ms.hasObservers()); + + ts.cancel(); + + assertFalse(ps.hasObservers()); + assertFalse(ms.hasObservers()); + } + + @Test + public void mainErrorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.error(new TestException("inner")); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "inner"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "outer"); + } finally { + RxJavaPlugins.reset(); + } + } + + + @Test + public void innerErrorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> moRef = new AtomicReference>(); + + TestObserver ts = new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onError(new TestException("outer")); + } + } + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return new Single() { + @Override + protected void subscribeActual( + SingleObserver observer) { + observer.onSubscribe(Disposables.empty()); + moRef.set(observer); + } + }; + } + }) + .test(); + + ts.assertFailureAndMessage(TestException.class, "outer"); + + moRef.get().onError(new TestException("inner")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void nextCancelRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms = SingleSubject.create(); + + final TestObserver ts = ps.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return ms; + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors() + .assertNotComplete(); + } + } + + @Test + public void nextInnerErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms = SingleSubject.create(); + + final TestObserver ts = ps.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Single.never(); + } + }).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + if (ts.errorCount() != 0) { + assertTrue(errors.isEmpty()); + ts.assertFailure(TestException.class); + } else if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void mainErrorInnerErrorRace() { + final TestException ex = new TestException(); + final TestException ex2 = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms = SingleSubject.create(); + + final TestObserver ts = ps.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Single.never(); + } + }).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onError(ex2); + } + }; + + TestHelper.race(r1, r2); + + ts.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void nextInnerSuccessRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final SingleSubject ms = SingleSubject.create(); + + final TestObserver ts = ps.switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + if (v == 1) { + return ms; + } + return Single.never(); + } + }).test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ms.onSuccess(3); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors() + .assertNotComplete(); + } + } + + @Test + public void checkDisposed() { + PublishSubject ps = PublishSubject.create(); + SingleSubject ms = SingleSubject.create(); + + TestHelper.checkDisposed(ps.switchMapSingle(Functions.justFunction(ms))); + } + + @Test + public void drainReentrant() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onNext(2); + } + } + }; + + ps.switchMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }).subscribe(to); + + ps.onNext(1); + ps.onComplete(); + + to.assertResult(1, 2); + } +} diff --git a/src/test/java/io/reactivex/tck/ConcatMapMaybeTckTest.java b/src/test/java/io/reactivex/tck/ConcatMapMaybeTckTest.java new file mode 100644 index 0000000000..0e41d3f515 --- /dev/null +++ b/src/test/java/io/reactivex/tck/ConcatMapMaybeTckTest.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@Test +public class ConcatMapMaybeTckTest extends BaseTck { + + @Override + public Publisher createPublisher(long elements) { + return + Flowable.range(0, (int)elements) + .concatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + return Maybe.just(v); + } + }) + ; + } +} diff --git a/src/test/java/io/reactivex/tck/ConcatMapSingleTckTest.java b/src/test/java/io/reactivex/tck/ConcatMapSingleTckTest.java new file mode 100644 index 0000000000..dcb2564fba --- /dev/null +++ b/src/test/java/io/reactivex/tck/ConcatMapSingleTckTest.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@Test +public class ConcatMapSingleTckTest extends BaseTck { + + @Override + public Publisher createPublisher(long elements) { + return + Flowable.range(0, (int)elements) + .concatMapSingle(new Function>() { + @Override + public Single apply(Integer v) throws Exception { + return Single.just(v); + } + }) + ; + } +}