From 13473da61366945f49aed5a29729841836abf2ab Mon Sep 17 00:00:00 2001 From: David Karnok Date: Fri, 24 Jan 2020 17:36:18 +0100 Subject: [PATCH] 3.x: Fix many marbles in Maybe (#6866) --- .../java/io/reactivex/rxjava3/core/Maybe.java | 80 ++++++++++++------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 0d0d6f5473..375a439e65 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -562,6 +562,8 @@ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends /** * Provides an API (via a cold {@code Maybe}) that bridges the reactive world with the callback-style world. *

+ * + *

* Example: *


      * Maybe.<Event>create(emitter -> {
@@ -615,6 +617,8 @@ public static  Maybe create(@NonNull MaybeOnSubscribe onSubscribe) {
     /**
      * Calls a {@link Supplier} for each individual {@link MaybeObserver} to return the actual {@link MaybeSource} source to
      * be subscribed to.
+     * 

+ * *

*
Scheduler:
*
{@code defer} does not operate by default on a particular {@link Scheduler}.
@@ -683,7 +687,7 @@ public static Maybe error(@NonNull Throwable throwable) { * Returns a {@code Maybe} that invokes a {@link MaybeObserver}'s {@link MaybeObserver#onError onError} method when the * {@code MaybeObserver} subscribes to it. *

- * + * *

*
Scheduler:
*
{@code error} does not operate by default on a particular {@link Scheduler}.
@@ -708,6 +712,8 @@ public static Maybe error(@NonNull Supplier supplier /** * Returns a {@code Maybe} instance that runs the given {@link Action} for each subscriber and * emits either its exception or simply completes. + *

+ * *

*
Scheduler:
*
{@code fromAction} does not operate by default on a particular {@link Scheduler}.
@@ -734,7 +740,8 @@ public static Maybe fromAction(@NonNull Action action) { /** * Wraps a {@link CompletableSource} into a {@code Maybe}. - * + *

+ * *

*
Scheduler:
*
{@code fromCompletable} does not operate by default on a particular {@link Scheduler}.
@@ -754,7 +761,8 @@ public static Maybe fromCompletable(@NonNull CompletableSource completabl /** * Wraps a {@link SingleSource} into a {@code Maybe}. - * + *

+ * *

*
Scheduler:
*
{@code fromSingle} does not operate by default on a particular {@link Scheduler}.
@@ -778,6 +786,8 @@ public static Maybe fromSingle(@NonNull SingleSource single) { * considering a {@code null} result from the {@code Callable} as indication for valueless completion * via {@code onComplete}. *

+ * + *

* This operator allows you to defer the execution of the given {@code Callable} until a {@code MaybeObserver} * subscribes to the returned {@code Maybe}. In other terms, this source operator evaluates the given * {@code Callable} "lazily". @@ -820,7 +830,7 @@ public static Maybe fromSingle(@NonNull SingleSource single) { /** * Converts a {@link Future} into a {@code Maybe}, treating a {@code null} result as an indication of emptiness. *

- * + * *

* The operator calls {@link Future#get()}, which is a blocking method, on the subscription thread. * It is recommended applying {@link #subscribeOn(Scheduler)} to move this blocking wait to a @@ -854,7 +864,7 @@ public static Maybe fromSingle(@NonNull SingleSource single) { /** * Converts a {@link Future} into a {@code Maybe}, with a timeout on the {@code Future}. *

- * + * *

* The operator calls {@link Future#get(long, TimeUnit)}, which is a blocking method, on the subscription thread. * It is recommended applying {@link #subscribeOn(Scheduler)} to move this blocking wait to a @@ -893,6 +903,8 @@ public static Maybe fromSingle(@NonNull SingleSource single) { /** * Returns a {@code Maybe} instance that runs the given {@link Runnable} for each subscriber and * emits either its exception or simply completes. + *

+ * *

*
Scheduler:
*
{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
@@ -989,6 +1001,8 @@ public static Maybe fromRunnable(@NonNull Runnable run) { /** * Merges an {@link Iterable} sequence of {@link MaybeSource} instances into a single {@link Flowable} sequence, * running all {@code MaybeSource}s at once. + *

+ * *

*
Backpressure:
*
The operator honors backpressure from downstream.
@@ -1025,6 +1039,8 @@ public static Flowable merge(@NonNull Iterable<@NonNull ? extends MaybeSo /** * Merges a {@link Publisher} sequence of {@link MaybeSource} instances into a single {@link Flowable} sequence, * running all {@code MaybeSource}s at once. + *

+ * *

*
Backpressure:
*
The operator honors backpressure from downstream.
@@ -1061,6 +1077,8 @@ public static Flowable merge(@NonNull Publisher<@NonNull ? extends MaybeS /** * Merges a {@link Publisher} sequence of {@link MaybeSource} instances into a single {@link Flowable} sequence, * running at most maxConcurrency {@code MaybeSource}s at once. + *

+ * *

*
Backpressure:
*
The operator honors backpressure from downstream.
@@ -1134,7 +1152,7 @@ public static Maybe merge(@NonNull MaybeSource - * + * *

* You can combine items emitted by multiple {@code MaybeSource}s so that they appear as a single {@code Flowable}, by * using the {@code merge} method. @@ -1183,7 +1201,7 @@ public static Flowable merge( /** * Flattens three {@link MaybeSource}s into a single {@link Flowable}, without any transformation. *

- * + * *

* You can combine items emitted by multiple {@code MaybeSource}s so that they appear as a single {@code Flowable}, by using * the {@code merge} method. @@ -1236,7 +1254,7 @@ public static Flowable merge( /** * Flattens four {@link MaybeSource}s into a single {@link Flowable}, without any transformation. *

- * + * *

* You can combine items emitted by multiple {@code MaybeSource}s so that they appear as a single {@code Flowable}, by using * the {@code merge} method. @@ -1292,6 +1310,8 @@ public static Flowable merge( /** * Merges an array sequence of {@link MaybeSource} instances into a single {@link Flowable} sequence, * running all {@code MaybeSource}s at once. + *

+ * *

*
Backpressure:
*
The operator honors backpressure from downstream.
@@ -1340,12 +1360,12 @@ public static Flowable mergeArray(MaybeSource... sources) { * successfully emitted items from each of the source {@code MaybeSource}s without being interrupted by an error * notification from one of them. *

+ * + *

* This behaves like {@link #merge(Publisher)} except that if any of the merged {@code MaybeSource}s notify of an * error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged {@code MaybeSource}s have finished emitting items. *

- * - *

* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its subscribers once. *

@@ -1381,11 +1401,13 @@ public static Flowable mergeArrayDelayError(@NonNull MaybeSource + * + *

* This behaves like {@link #merge(Publisher)} except that if any of the merged {@code MaybeSource}s notify of an * error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged {@code MaybeSource}s have finished emitting items. *

- * + * *

* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its subscribers once. @@ -1417,12 +1439,12 @@ public static Flowable mergeDelayError(@NonNull Iterable<@NonNull ? exten * receive all successfully emitted items from all of the source {@code MaybeSource}s without being interrupted by * an error notification from one of them or even the main {@code Publisher}. *

+ * + *

* This behaves like {@link #merge(Publisher)} except that if any of the merged {@code MaybeSource}s notify of an * error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged {@code MaybeSource}s and the main {@code Publisher} have finished emitting items. *

- * - *

* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its subscribers once. *

@@ -1453,12 +1475,12 @@ public static Flowable mergeDelayError(@NonNull Publisher<@NonNull ? exte * receive all successfully emitted items from all of the source {@code MaybeSource}s without being interrupted by * an error notification from one of them or even the main {@code Publisher} as well as limiting the total number of active {@code MaybeSource}s. *

+ * + *

* This behaves like {@link #merge(Publisher, int)} except that if any of the merged {@code MaybeSource}s notify of an * error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged {@code MaybeSource}s and the main {@code Publisher} have finished emitting items. *

- * - *

* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its subscribers once. *

@@ -1495,12 +1517,12 @@ public static Flowable mergeDelayError(@NonNull Publisher<@NonNull ? exte * successfully emitted items from each of the source {@code MaybeSource}s without being interrupted by an error * notification from one of them. *

+ * + *

* This behaves like {@link #merge(MaybeSource, MaybeSource)} except that if any of the merged {@code MaybeSource}s * notify of an error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from * propagating that error notification until all of the merged {@code MaybeSource}s have finished emitting items. *

- * - *

* Even if both merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its subscribers once. *

@@ -1534,13 +1556,13 @@ public static Flowable mergeDelayError(@NonNull MaybeSource * successfully emitted items from all of the source {@code MaybeSource}s without being interrupted by an error * notification from one of them. *

+ * + *

* This behaves like {@link #merge(MaybeSource, MaybeSource, MaybeSource)} except that if any of the merged * {@code MaybeSource}s notify of an error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged {@code MaybeSource}s have finished emitting * items. *

- * - *

* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its subscribers once. *

@@ -1578,13 +1600,13 @@ public static Flowable mergeDelayError(@NonNull MaybeSource * successfully emitted items from all of the source {@code MaybeSource}s without being interrupted by an error * notification from one of them. *

+ * + *

* This behaves like {@link #merge(MaybeSource, MaybeSource, MaybeSource, MaybeSource)} except that if any of * the merged {@code MaybeSource}s notify of an error via {@link Subscriber#onError onError}, {@code mergeDelayError} * will refrain from propagating that error notification until all of the merged {@code MaybeSource}s have finished * emitting items. *

- * - *

* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its subscribers once. *

@@ -1649,7 +1671,7 @@ public static Maybe never() { * Returns a {@link Single} that emits a {@link Boolean} value that indicates whether two {@link MaybeSource} sequences are the * same by comparing the items emitted by each {@code MaybeSource} pairwise. *

- * + * *

*
Scheduler:
*
{@code sequenceEqual} does not operate by default on a particular {@link Scheduler}.
@@ -1677,7 +1699,7 @@ public static Single sequenceEqual(@NonNull MaybeSource - * + * *
*
Scheduler:
*
{@code sequenceEqual} does not operate by default on a particular {@link Scheduler}.
@@ -1709,7 +1731,7 @@ public static Single sequenceEqual(@NonNull MaybeSource - * + * *
*
Scheduler:
*
{@code timer} operates by default on the {@code computation} {@link Scheduler}.
@@ -1733,7 +1755,7 @@ public static Maybe timer(long delay, @NonNull TimeUnit unit) { /** * Returns a {@code Maybe} that emits {@code 0L} after a specified delay on a specified {@link Scheduler}. *

- * + * *

*
Scheduler:
*
You specify which {@code Scheduler} this operator will use.
@@ -1762,6 +1784,8 @@ public static Maybe timer(long delay, @NonNull TimeUnit unit, @NonNull Sch /** * Advanced use only: creates a {@code Maybe} instance without * any safeguards by using a callback that is called with a {@link MaybeObserver}. + *

+ * *

*
Scheduler:
*
{@code unsafeCreate} does not operate by default on a particular {@link Scheduler}.
@@ -1787,7 +1811,7 @@ public static Maybe unsafeCreate(@NonNull MaybeSource onSubscribe) { * Constructs a {@code Maybe} that creates a dependent resource object which is disposed of when the * generated {@link MaybeSource} terminates or the downstream calls dispose(). *

- * + * *

*
Scheduler:
*
{@code using} does not operate by default on a particular {@link Scheduler}.
@@ -1818,7 +1842,7 @@ public static Maybe using(@NonNull Supplier resourceSuppl * Constructs a {@code Maybe} that creates a dependent resource object which is disposed first ({code eager == true}) * when the generated {@link MaybeSource} terminates or the downstream disposes; or after ({code eager == false}). *

- * + * *

* Eager disposal is particularly appropriate for a synchronous {@code Maybe} that reuses resources. {@code disposeAction} will * only be called once per subscription. @@ -1859,6 +1883,8 @@ public static Maybe using(@NonNull Supplier resourceSuppl /** * Wraps a {@link MaybeSource} instance into a new {@code Maybe} instance if not already a {@code Maybe} * instance. + *

+ * *

*
Scheduler:
*
{@code wrap} does not operate by default on a particular {@link Scheduler}.