diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index ac1abea4f5..edd5c620ef 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -86,7 +86,7 @@ * given {@code Subscription} being cancelled immediately. *

* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} - * is still required to be serialized (called from the same thread or called non-overlappingly from different threads + * is required to be serialized (called from the same thread or called non-overlappingly from different threads * through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber} * consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively). diff --git a/src/main/java/io/reactivex/subjects/AsyncSubject.java b/src/main/java/io/reactivex/subjects/AsyncSubject.java index 0561144387..581ec9e8c9 100644 --- a/src/main/java/io/reactivex/subjects/AsyncSubject.java +++ b/src/main/java/io/reactivex/subjects/AsyncSubject.java @@ -24,14 +24,86 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * An Subject that emits the very last value followed by a completion event or the received error to Observers. - * - *

The implementation of onXXX methods are technically thread-safe but non-serialized calls + * A Subject that emits the very last value followed by a completion event or the received error to Observers. + *

+ * This subject does not have a public constructor by design; a new empty instance of this + * {@code AsyncSubject} can be created via the {@link #create()} method. + *

+ * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) + * as parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + *

+ * Since an {@code AsyncSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. + *

+ * When this {@code AsyncSubject} is terminated via {@link #onError(Throwable)}, the + * last observed item (if any) is cleared and late {@link io.reactivex.Observer}s only receive + * the {@code onError} event. + *

+ * The {@code AsyncSubject} caches the latest item internally and it emits this item only when {@code onComplete} is called. + * Therefore, it is not recommended to use this {@code Subject} with infinite or never-completing sources. + *

+ * Even though {@code AsyncSubject} implements the {@code Observer} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code AsyncSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *

+ * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} + * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). + * The implementation of onXXX methods are technically thread-safe but non-serialized calls * to them may lead to undefined state in the currently subscribed Observers. + *

+ * This {@code AsyncSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the very last observed value - + * after this {@code AsyncSubject} has been completed - in a non-blocking and thread-safe + * manner via {@link #hasValue()}, {@link #getValue()}, {@link #getValues()} or {@link #getValues(Object[])}. + *

+ *
Scheduler:
+ *
{@code AsyncSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code Observer}s get notified on the thread where the terminating {@code onError} or {@code onComplete} + * methods were invoked.
+ *
Error handling:
+ *
When the {@link #onError(Throwable)} is called, the {@code AsyncSubject} enters into a terminal state + * and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission, + * if one or more {@code Observer}s dispose their respective {@code Disposable}s, the + * {@code Throwable} is delivered to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s + * cancel at once). + * If there were no {@code Observer}s subscribed to this {@code AsyncSubject} when the {@code onError()} + * was called, the global error handler is not invoked. + *
+ *
+ *

+ * Example usage: + *


+ * AsyncSubject<Object> subject = AsyncSubject.create();
+ * 
+ * TestObserver<Object> to1 = subject.test();
+ * 
+ * to1.assertEmpty();
+ * 
+ * subject.onNext(1);
+ * 
+ * // AsyncSubject only emits when onComplete was called.
+ * to1.assertEmpty();
  *
+ * subject.onNext(2);
+ * subject.onComplete();
+ * 
+ * // onComplete triggers the emission of the last cached item and the onComplete event.
+ * to1.assertResult(2);
+ * 
+ * TestObserver<Object> to2 = subject.test();
+ * 
+ * // late Observers receive the last cached item too
+ * to2.assertResult(2);
+ * 
* @param the value type */ - public final class AsyncSubject extends Subject { @SuppressWarnings("rawtypes") diff --git a/src/main/java/io/reactivex/subjects/BehaviorSubject.java b/src/main/java/io/reactivex/subjects/BehaviorSubject.java index f7ac689b34..42878adee2 100644 --- a/src/main/java/io/reactivex/subjects/BehaviorSubject.java +++ b/src/main/java/io/reactivex/subjects/BehaviorSubject.java @@ -36,10 +36,11 @@ * a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid * overload resolution conflict with {@code Observable.create} that creates an Observable, not a {@code BehaviorSubject}). *

- * Since the {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, * {@code null}s are not allowed (Rule 2.13) as * default initial values in {@link #createDefault(Object)} or as parameters to {@link #onNext(Object)} and - * {@link #onError(Throwable)}. + * {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. *

* Since a {@code BehaviorSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. *

@@ -83,11 +84,11 @@ * Even though {@code BehaviorSubject} implements the {@code Observer} interface, calling * {@code onSubscribe} is not required (Rule 2.12) * if the subject is used as a standalone source. However, calling {@code onSubscribe} - * after the {@code BehaviorSubjecct} reached its terminal state will result in the + * after the {@code BehaviorSubject} reached its terminal state will result in the * given {@code Disposable} being disposed immediately. *

* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} - * is still required to be serialized (called from the same thread or called non-overlappingly from different threads + * is required to be serialized (called from the same thread or called non-overlappingly from different threads * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). diff --git a/src/main/java/io/reactivex/subjects/CompletableSubject.java b/src/main/java/io/reactivex/subjects/CompletableSubject.java index 2f0e209624..c1862a6f51 100644 --- a/src/main/java/io/reactivex/subjects/CompletableSubject.java +++ b/src/main/java/io/reactivex/subjects/CompletableSubject.java @@ -24,12 +24,61 @@ /** * Represents a hot Completable-like source and consumer of events similar to Subjects. *

- * All methods are thread safe. Calling onComplete multiple - * times has no effect. Calling onError multiple times relays the Throwable to - * the RxJavaPlugins' error handler. + * This subject does not have a public constructor by design; a new non-terminated instance of this + * {@code CompletableSubject} can be created via the {@link #create()} method. *

- * The CompletableSubject doesn't store the Disposables coming through onSubscribe but - * disposes them once the other onXXX methods were called (terminal state reached). + * Since the {@code CompletableSubject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) + * as parameters to {@link #onError(Throwable)}. + *

+ * Even though {@code CompletableSubject} implements the {@code CompletableObserver} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code CompletableSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *

+ * All methods are thread safe. Calling {@link #onComplete()} multiple + * times has no effect. Calling {@link #onError(Throwable)} multiple times relays the {@code Throwable} to + * the {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} global error handler. + *

+ * This {@code CompletableSubject} supports the standard state-peeking methods {@link #hasComplete()}, + * {@link #hasThrowable()}, {@link #getThrowable()} and {@link #hasObservers()}. + *

+ *
Scheduler:
+ *
{@code CompletableSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code CompletableObserver}s get notified on the thread where the terminating {@code onError} or {@code onComplete} + * methods were invoked.
+ *
Error handling:
+ *
When the {@link #onError(Throwable)} is called, the {@code CompletableSubject} enters into a terminal state + * and emits the same {@code Throwable} instance to the last set of {@code CompletableObserver}s. During this emission, + * if one or more {@code CompletableObserver}s dispose their respective {@code Disposable}s, the + * {@code Throwable} is delivered to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code CompletableObserver}s + * cancel at once). + * If there were no {@code CompletableObserver}s subscribed to this {@code CompletableSubject} when the {@code onError()} + * was called, the global error handler is not invoked. + *
+ *
+ *

+ * Example usage: + *


+ * CompletableSubject subject = CompletableSubject.create();
+ * 
+ * TestObserver<Void> to1 = subject.test();
+ *
+ * // a fresh CompletableSubject is empty
+ * to1.assertEmpty();
+ * 
+ * subject.onComplete();
+ *
+ * // a CompletableSubject is always void of items
+ * to1.assertResult();
+ *
+ * TestObserver<Void> to2 = subject.test()
+ *
+ * // late CompletableObservers receive the terminal event
+ * to2.assertResult();
+ * 
*

History: 2.0.5 - experimental * @since 2.1 */ diff --git a/src/main/java/io/reactivex/subjects/MaybeSubject.java b/src/main/java/io/reactivex/subjects/MaybeSubject.java index 7f303deefc..d6f84254f2 100644 --- a/src/main/java/io/reactivex/subjects/MaybeSubject.java +++ b/src/main/java/io/reactivex/subjects/MaybeSubject.java @@ -24,12 +24,85 @@ /** * Represents a hot Maybe-like source and consumer of events similar to Subjects. *

- * All methods are thread safe. Calling onSuccess or onComplete multiple - * times has no effect. Calling onError multiple times relays the Throwable to - * the RxJavaPlugins' error handler. + * This subject does not have a public constructor by design; a new non-terminated instance of this + * {@code MaybeSubject} can be created via the {@link #create()} method. *

- * The MaybeSubject doesn't store the Disposables coming through onSubscribe but - * disposes them once the other onXXX methods were called (terminal state reached). + * Since the {@code MaybeSubject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) + * as parameters to {@link #onSuccess(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + *

+ * Since a {@code MaybeSubject} is a {@link io.reactivex.Maybe}, calling {@code onSuccess}, {@code onError} + * or {@code onComplete} will move this {@code MaybeSubject} into its terminal state atomically. + *

+ * All methods are thread safe. Calling {@link #onSuccess(Object)} or {@link #onComplete()} multiple + * times has no effect. Calling {@link #onError(Throwable)} multiple times relays the {@code Throwable} to + * the {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} global error handler. + *

+ * Even though {@code MaybeSubject} implements the {@code MaybeObserver} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code MaybeSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *

+ * This {@code MaybeSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()} as well as means to read any success item in a non-blocking + * and thread-safe manner via {@link #hasValue()} and {@link #getValue()}. + *

+ * The {@code MaybeSubject} does not support clearing its cached {@code onSuccess} value. + *

+ *
Scheduler:
+ *
{@code MaybeSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code MaybeObserver}s get notified on the thread where the terminating {@code onSuccess}, {@code onError} or {@code onComplete} + * methods were invoked.
+ *
Error handling:
+ *
When the {@link #onError(Throwable)} is called, the {@code MaybeSubject} enters into a terminal state + * and emits the same {@code Throwable} instance to the last set of {@code MaybeObserver}s. During this emission, + * if one or more {@code MaybeObserver}s dispose their respective {@code Disposable}s, the + * {@code Throwable} is delivered to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code MaybeObserver}s + * cancel at once). + * If there were no {@code MaybeObserver}s subscribed to this {@code MaybeSubject} when the {@code onError()} + * was called, the global error handler is not invoked. + *
+ *
+ *

+ * Example usage: + *


+ * MaybeSubject<Integer> subject1 = MaybeSubject.create();
+ * 
+ * TestObserver<Integer> to1 = subject1.test();
+ * 
+ * // MaybeSubjects are empty by default
+ * to1.assertEmpty();
+ * 
+ * subject1.onSuccess(1);
+ * 
+ * // onSuccess is a terminal event with MaybeSubjects
+ * // TestObserver converts onSuccess into onNext + onComplete
+ * to1.assertResult(1);
+ *
+ * TestObserver<Integer> to2 = subject1.test();
+ * 
+ * // late Observers receive the terminal signal (onSuccess) too
+ * to2.assertResult(1);
+ *
+ * // -----------------------------------------------------
+ *
+ * MaybeSubject<Integer> subject2 = MaybeSubject.create();
+ *
+ * TestObserver<Integer> to3 = subject2.test();
+ * 
+ * subject2.onComplete();
+ * 
+ * // a completed MaybeSubject completes its MaybeObservers
+ * to3.assertResult();
+ *
+ * TestObserver<Integer> to4 = subject1.test();
+ * 
+ * // late Observers receive the terminal signal (onComplete) too
+ * to4.assertResult();
+ * 
*

History: 2.0.5 - experimental * @param the value type received and emitted * @since 2.1 diff --git a/src/main/java/io/reactivex/subjects/PublishSubject.java b/src/main/java/io/reactivex/subjects/PublishSubject.java index ad0f66d14c..957b2a0ac6 100644 --- a/src/main/java/io/reactivex/subjects/PublishSubject.java +++ b/src/main/java/io/reactivex/subjects/PublishSubject.java @@ -22,11 +22,57 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the - * subscriber. + * A Subject that emits (multicasts) items to currently subscribed {@link Observer}s and terminal events to current + * or late {@code Observer}s. *

* *

+ * This subject does not have a public constructor by design; a new empty instance of this + * {@code PublishSubject} can be created via the {@link #create()} method. + *

+ * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) as + * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + *

+ * Since a {@code PublishSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. + *

+ * When this {@code PublishSubject} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, + * late {@link io.reactivex.Observer}s only receive the respective terminal event. + *

+ * Unlike a {@link BehaviorSubject}, a {@code PublishSubject} doesn't retain/cache items, therefore, a new + * {@code Observer} won't receive any past items. + *

+ * Even though {@code PublishSubject} implements the {@code Observer} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code PublishSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *

+ * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} + * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). + *

+ * This {@code PublishSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()}. + *

+ *
Scheduler:
+ *
{@code PublishSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code Observer}s get notified on the thread the respective {@code onXXX} methods were invoked.
+ *
Error handling:
+ *
When the {@link #onError(Throwable)} is called, the {@code PublishSubject} enters into a terminal state + * and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission, + * if one or more {@code Observer}s dispose their respective {@code Disposable}s, the + * {@code Throwable} is delivered to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s + * cancel at once). + * If there were no {@code Observer}s subscribed to this {@code PublishSubject} when the {@code onError()} + * was called, the global error handler is not invoked. + *
+ *
+ *

* Example usage: *

 {@code
 
@@ -40,6 +86,8 @@
   subject.onNext("three");
   subject.onComplete();
 
+  // late Observers only receive the terminal event
+  subject.test().assertEmpty();
   } 
* * @param diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 52d0616884..d23756b785 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -27,14 +27,80 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * Replays events to Observers. + * Replays events (in a configurable bounded or unbounded manner) to current and late {@link Observer}s. *

* *

+ * This subject does not have a public constructor by design; a new empty instance of this + * {@code ReplaySubject} can be created via the following {@code create} methods that + * allow specifying the retention policy for items: + *

    + *
  • {@link #create()} - creates an empty, unbounded {@code ReplaySubject} that + * caches all items and the terminal event it receives.
  • + *
  • {@link #create(int)} - creates an empty, unbounded {@code ReplaySubject} + * with a hint about how many total items one expects to retain.
  • + *
  • {@link #createWithSize(int)} - creates an empty, size-bound {@code ReplaySubject} + * that retains at most the given number of the latest item it receives.
  • + *
  • {@link #createWithTime(long, TimeUnit, Scheduler)} - creates an empty, time-bound + * {@code ReplaySubject} that retains items no older than the specified time amount.
  • + *
  • {@link #createWithTimeAndSize(long, TimeUnit, Scheduler, int)} - creates an empty, + * time- and size-bound {@code ReplaySubject} that retains at most the given number + * items that are also not older than the specified time amount.
  • + *
+ *

+ * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) as + * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + *

+ * Since a {@code ReplaySubject} is an {@link io.reactivex.Observable}, it does not support backpressure. + *

+ * When this {@code ReplaySubject} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, + * late {@link io.reactivex.Observer}s will receive the retained/cached items first (if any) followed by the respective + * terminal event. If the {@code ReplaySubject} has a time-bound, the age of the retained/cached items are still considered + * when replaying and thus it may result in no items being emitted before the terminal event. + *

+ * Once an {@code Observer} has subscribed, it will receive items continuously from that point on. Bounds only affect how + * many past items a new {@code Observer} will receive before it catches up with the live event feed. + *

+ * Even though {@code ReplaySubject} implements the {@code Observer} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code ReplaySubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *

+ * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} + * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). + *

+ * This {@code ReplaySubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the retained/cached items + * in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()}, + * {@link #getValues()} or {@link #getValues(Object[])}. + *

+ *
Scheduler:
+ *
{@code ReplaySubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code Observer}s get notified on the thread the respective {@code onXXX} methods were invoked. + * Time-bound {@code ReplaySubject}s use the given {@code Scheduler} in their {@code create} methods + * as time source to timestamp of items received for the age checks.
+ *
Error handling:
+ *
When the {@link #onError(Throwable)} is called, the {@code ReplaySubject} enters into a terminal state + * and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission, + * if one or more {@code Observer}s dispose their respective {@code Disposable}s, the + * {@code Throwable} is delivered to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s + * cancel at once). + * If there were no {@code Observer}s subscribed to this {@code ReplaySubject} when the {@code onError()} + * was called, the global error handler is not invoked. + *
+ *
+ *

* Example usage: *

 {@code
 
-  ReplaySubject subject = new ReplaySubject<>();
+  ReplaySubject subject = ReplaySubject.create();
   subject.onNext("one");
   subject.onNext("two");
   subject.onNext("three");
diff --git a/src/main/java/io/reactivex/subjects/SingleSubject.java b/src/main/java/io/reactivex/subjects/SingleSubject.java
index d09d1f436c..d3b4cf63c3 100644
--- a/src/main/java/io/reactivex/subjects/SingleSubject.java
+++ b/src/main/java/io/reactivex/subjects/SingleSubject.java
@@ -24,12 +24,69 @@
 /**
  * Represents a hot Single-like source and consumer of events similar to Subjects.
  * 

- * All methods are thread safe. Calling onSuccess multiple - * times has no effect. Calling onError multiple times relays the Throwable to - * the RxJavaPlugins' error handler. + * This subject does not have a public constructor by design; a new non-terminated instance of this + * {@code SingleSubject} can be created via the {@link #create()} method. *

- * The SingleSubject doesn't store the Disposables coming through onSubscribe but - * disposes them once the other onXXX methods were called (terminal state reached). + * Since the {@code SingleSubject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) + * as parameters to {@link #onSuccess(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + *

+ * Since a {@code SingleSubject} is a {@link io.reactivex.Maybe}, calling {@code onSuccess}, {@code onError} + * or {@code onComplete} will move this {@code SingleSubject} into its terminal state atomically. + *

+ * All methods are thread safe. Calling {@link #onSuccess(Object)} multiple + * times has no effect. Calling {@link #onError(Throwable)} multiple times relays the {@code Throwable} to + * the {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} global error handler. + *

+ * Even though {@code SingleSubject} implements the {@code SingleObserver} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code SingleSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *

+ * This {@code SingleSubject} supports the standard state-peeking methods {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()} as well as means to read any success item in a non-blocking + * and thread-safe manner via {@link #hasValue()} and {@link #getValue()}. + *

+ * The {@code SingleSubject} does not support clearing its cached {@code onSuccess} value. + *

+ *
Scheduler:
+ *
{@code SingleSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code SingleObserver}s get notified on the thread where the terminating {@code onSuccess}, {@code onError} + * or {@code onComplete} methods were invoked.
+ *
Error handling:
+ *
When the {@link #onError(Throwable)} is called, the {@code SingleSubject} enters into a terminal state + * and emits the same {@code Throwable} instance to the last set of {@code SingleObserver}s. During this emission, + * if one or more {@code SingleObserver}s dispose their respective {@code Disposable}s, the + * {@code Throwable} is delivered to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code SingleObserver}s + * cancel at once). + * If there were no {@code SingleObserver}s subscribed to this {@code SingleSubject} when the {@code onError()} + * was called, the global error handler is not invoked. + *
+ *
+ *

+ * Example usage: + *


+ * SingleSubject<Integer> subject1 = SingleSubject.create();
+ * 
+ * TestObserver<Integer> to1 = subject1.test();
+ * 
+ * // SingleSubjects are empty by default
+ * to1.assertEmpty();
+ * 
+ * subject1.onSuccess(1);
+ * 
+ * // onSuccess is a terminal event with SingleSubjects
+ * // TestObserver converts onSuccess into onNext + onComplete
+ * to1.assertResult(1);
+ *
+ * TestObserver<Integer> to2 = subject1.test();
+ * 
+ * // late Observers receive the terminal signal (onSuccess) too
+ * to2.assertResult(1);
+ * 
*

History: 2.0.5 - experimental * @param the value type received and emitted * @since 2.1 diff --git a/src/main/java/io/reactivex/subjects/Subject.java b/src/main/java/io/reactivex/subjects/Subject.java index 6bd105be85..4a57a4a77d 100644 --- a/src/main/java/io/reactivex/subjects/Subject.java +++ b/src/main/java/io/reactivex/subjects/Subject.java @@ -17,9 +17,11 @@ import io.reactivex.annotations.*; /** - * Represents an Observer and an Observable at the same time, allowing - * multicasting events from a single source to multiple child Subscribers. - *

All methods except the onSubscribe, onNext, onError and onComplete are thread-safe. + * Represents an {@link Observer} and an {@link Observable} at the same time, allowing + * multicasting events from a single source to multiple child {@code Observer}s. + *

+ * All methods except the {@link #onSubscribe(io.reactivex.disposables.Disposable)}, {@link #onNext(Object)}, + * {@link #onError(Throwable)} and {@link #onComplete()} are thread-safe. * Use {@link #toSerialized()} to make these methods thread-safe as well. * * @param the item value type diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index a254eaa853..2f5d77f6ba 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -16,9 +16,10 @@ import io.reactivex.annotations.Experimental; import io.reactivex.annotations.Nullable; import io.reactivex.plugins.RxJavaPlugins; + import java.util.concurrent.atomic.*; -import io.reactivex.Observer; +import io.reactivex.*; import io.reactivex.annotations.CheckReturnValue; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.EmptyDisposable; @@ -28,19 +29,116 @@ import io.reactivex.internal.queue.SpscLinkedArrayQueue; /** - * Subject that allows only a single Subscriber to subscribe to it during its lifetime. - * - *

This subject buffers notifications and replays them to the Subscriber as requested. - * - *

This subject holds an unbounded internal buffer. - * - *

If more than one Subscriber attempts to subscribe to this Subject, they - * will receive an IllegalStateException if this Subject hasn't terminated yet, - * or the Subscribers receive the terminal event (error or completion) if this - * Subject has terminated. + * A Subject that queues up events until a single {@link Observer} subscribes to it, replays + * those events to it until the {@code Observer} catches up and then switches to relaying events live to + * this single {@code Observer} until this {@code UnicastSubject} terminates or the {@code Observer} unsubscribes. *

* - * + *

+ * Note that {@code UnicastSubject} holds an unbounded internal buffer. + *

+ * This subject does not have a public constructor by design; a new empty instance of this + * {@code UnicastSubject} can be created via the following {@code create} methods that + * allow specifying the retention policy for items: + *

    + *
  • {@link #create()} - creates an empty, unbounded {@code UnicastSubject} that + * caches all items and the terminal event it receives.
  • + *
  • {@link #create(int)} - creates an empty, unbounded {@code UnicastSubject} + * with a hint about how many total items one expects to retain.
  • + *
  • {@link #create(boolean)} - creates an empty, unbounded {@code UnicastSubject} that + * optionally delays an error it receives and replays it after the regular items have been emitted.
  • + *
  • {@link #create(int, Runnable)} - creates an empty, unbounded {@code UnicastSubject} + * with a hint about how many total items one expects to retain and a callback that will be + * called exactly once when the {@code UnicastSubject} gets terminated or the single {@code Observer} unsubscribes.
  • + *
  • {@link #create(int, Runnable, boolean)} - creates an empty, unbounded {@code UnicastSubject} + * with a hint about how many total items one expects to retain and a callback that will be + * called exactly once when the {@code UnicastSubject} gets terminated or the single {@code Observer} unsubscribes + * and optionally delays an error it receives and replays it after the regular items have been emitted.
  • + *
+ *

+ * If more than one {@code Observer} attempts to subscribe to this {@code UnicastSubject}, they + * will receive an {@code IllegalStateException} indicating the single-use-only nature of this {@code UnicastSubject}, + * even if the {@code UnicastSubject} already terminated with an error. + *

+ * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) as + * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + *

+ * Since a {@code UnicastSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. + *

+ * When this {@code UnicastSubject} is terminated via {@link #onError(Throwable)} the current or late single {@code Observer} + * may receive the {@code Throwable} before any available items could be emitted. To make sure an onError event is delivered + * to the {@code Observer} after the normal items, create a {@code UnicastSubject} with the {@link #create(boolean)} or + * {@link #create(int, Runnable, boolean)} factory methods. + *

+ * Even though {@code UnicastSubject} implements the {@code Observer} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code UnicastSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *

+ * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} + * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). + *

+ * This {@code UnicastSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()}. + *

+ *
Scheduler:
+ *
{@code UnicastSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code Observer}s get notified on the thread the respective {@code onXXX} methods were invoked.
+ *
Error handling:
+ *
When the {@link #onError(Throwable)} is called, the {@code UnicastSubject} enters into a terminal state + * and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission, + * if one or more {@code Observer}s dispose their respective {@code Disposable}s, the + * {@code Throwable} is delivered to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s + * cancel at once). + * If there were no {@code Observer}s subscribed to this {@code UnicastSubject} when the {@code onError()} + * was called, the global error handler is not invoked. + *
+ *
+ *

+ * Example usage: + *


+ * UnicastSubject<Integer> subject = UnicastSubject.create();
+ * 
+ * TestObserver<Integer> to1 = subject.test();
+ * 
+ * // fresh UnicastSubjects are empty
+ * to1.assertEmpty();
+ * 
+ * TestObserver<Integer> to2 = subject.test();
+ * 
+ * // A UnicastSubject only allows one Observer during its lifetime
+ * to2.assertFailure(IllegalStateException.class);
+ * 
+ * subject.onNext(1);
+ * to1.assertValue(1);
+ * 
+ * subject.onNext(2);
+ * to1.assertValues(1, 2);
+ * 
+ * subject.onComplete();
+ * to1.assertResult(1, 2);
+ * 
+ * // ----------------------------------------------------
+ * 
+ * UnicastSubject<Integer> subject2 = UnicastSubject.create();
+ * 
+ * // a UnicastSubject caches events util its single Observer subscribes
+ * subject.onNext(1);
+ * subject.onNext(2);
+ * subject.onComplete();
+ * 
+ * TestObserver<Integer> to3 = subject2.test();
+ * 
+ * // the cached events are emitted in order
+ * to3.assertResult(1, 2);
+ * 
* @param the value type received and emitted by this Subject subclass * @since 2.0 */ diff --git a/src/main/java/io/reactivex/subjects/package-info.java b/src/main/java/io/reactivex/subjects/package-info.java index e209f5db24..8bd3b06ac2 100644 --- a/src/main/java/io/reactivex/subjects/package-info.java +++ b/src/main/java/io/reactivex/subjects/package-info.java @@ -15,7 +15,44 @@ */ /** - * Classes extending the Observable base reactive class and implementing - * the Observer interface at the same time (aka hot Observables). + * Classes representing so-called hot sources, aka subjects, that implement a base reactive class and + * the respective consumer type at once to allow forms of multicasting events to multiple + * consumers as well as consuming another base reactive type of their kind. + *

+ * Available subject classes with their respective base classes and consumer interfaces: + *
+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Subject typeBase classConsumer interface
{@link io.reactivex.subjects.Subject Subject} + *
   {@link io.reactivex.subjects.AsyncSubject AsyncSubject} + *
   {@link io.reactivex.subjects.BehaviorSubject BehaviorSubject} + *
   {@link io.reactivex.subjects.PublishSubject PublishSubject} + *
   {@link io.reactivex.subjects.ReplaySubject ReplaySubject} + *
   {@link io.reactivex.subjects.UnicastSubject UnicastSubjectSubject} + *
{@link io.reactivex.Observable Observable}{@link io.reactivex.Observer Observer}
{@link io.reactivex.subjects.SingleSubject SingleSubject}{@link io.reactivex.Single Single}{@link io.reactivex.SingleObserver SingleObserver}
{@link io.reactivex.subjects.MaybeSubject MaybeSubject}{@link io.reactivex.Maybe Maybe}{@link io.reactivex.MaybeObserver MaybeObserver}
{@link io.reactivex.subjects.CompletableSubject CompletableSubject}{@link io.reactivex.Completable Completable}{@link io.reactivex.CompletableObserver CompletableObserver}
+ *

+ * The backpressure-aware variants of the {@code Subject} class are called + * {@link org.reactivestreams.Processor}s and reside in the {@code io.reactivex.processors} package. + * @see io.reactivex.processors */ package io.reactivex.subjects;