diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index ac1abea4f5..edd5c620ef 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -86,7 +86,7 @@ * given {@code Subscription} being cancelled immediately. *
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} - * is still required to be serialized (called from the same thread or called non-overlappingly from different threads + * is required to be serialized (called from the same thread or called non-overlappingly from different threads * through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber} * consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively). diff --git a/src/main/java/io/reactivex/subjects/AsyncSubject.java b/src/main/java/io/reactivex/subjects/AsyncSubject.java index 0561144387..581ec9e8c9 100644 --- a/src/main/java/io/reactivex/subjects/AsyncSubject.java +++ b/src/main/java/io/reactivex/subjects/AsyncSubject.java @@ -24,14 +24,86 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * An Subject that emits the very last value followed by a completion event or the received error to Observers. - * - *
The implementation of onXXX methods are technically thread-safe but non-serialized calls + * A Subject that emits the very last value followed by a completion event or the received error to Observers. + *
+ * This subject does not have a public constructor by design; a new empty instance of this + * {@code AsyncSubject} can be created via the {@link #create()} method. + *
+ * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) + * as parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + *
+ * Since an {@code AsyncSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. + *
+ * When this {@code AsyncSubject} is terminated via {@link #onError(Throwable)}, the + * last observed item (if any) is cleared and late {@link io.reactivex.Observer}s only receive + * the {@code onError} event. + *
+ * The {@code AsyncSubject} caches the latest item internally and it emits this item only when {@code onComplete} is called. + * Therefore, it is not recommended to use this {@code Subject} with infinite or never-completing sources. + *
+ * Even though {@code AsyncSubject} implements the {@code Observer} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code AsyncSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *
+ * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} + * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). + * The implementation of onXXX methods are technically thread-safe but non-serialized calls * to them may lead to undefined state in the currently subscribed Observers. + *
+ * This {@code AsyncSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the very last observed value - + * after this {@code AsyncSubject} has been completed - in a non-blocking and thread-safe + * manner via {@link #hasValue()}, {@link #getValue()}, {@link #getValues()} or {@link #getValues(Object[])}. + *
+ * Example usage: + *
+ * AsyncSubject<Object> subject = AsyncSubject.create();
+ *
+ * TestObserver<Object> to1 = subject.test();
+ *
+ * to1.assertEmpty();
+ *
+ * subject.onNext(1);
+ *
+ * // AsyncSubject only emits when onComplete was called.
+ * to1.assertEmpty();
*
+ * subject.onNext(2);
+ * subject.onComplete();
+ *
+ * // onComplete triggers the emission of the last cached item and the onComplete event.
+ * to1.assertResult(2);
+ *
+ * TestObserver<Object> to2 = subject.test();
+ *
+ * // late Observers receive the last cached item too
+ * to2.assertResult(2);
+ *
* @param - * Since the {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, * {@code null}s are not allowed (Rule 2.13) as * default initial values in {@link #createDefault(Object)} or as parameters to {@link #onNext(Object)} and - * {@link #onError(Throwable)}. + * {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. *
* Since a {@code BehaviorSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. *
@@ -83,11 +84,11 @@ * Even though {@code BehaviorSubject} implements the {@code Observer} interface, calling * {@code onSubscribe} is not required (Rule 2.12) * if the subject is used as a standalone source. However, calling {@code onSubscribe} - * after the {@code BehaviorSubjecct} reached its terminal state will result in the + * after the {@code BehaviorSubject} reached its terminal state will result in the * given {@code Disposable} being disposed immediately. *
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} - * is still required to be serialized (called from the same thread or called non-overlappingly from different threads + * is required to be serialized (called from the same thread or called non-overlappingly from different threads * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). diff --git a/src/main/java/io/reactivex/subjects/CompletableSubject.java b/src/main/java/io/reactivex/subjects/CompletableSubject.java index 2f0e209624..c1862a6f51 100644 --- a/src/main/java/io/reactivex/subjects/CompletableSubject.java +++ b/src/main/java/io/reactivex/subjects/CompletableSubject.java @@ -24,12 +24,61 @@ /** * Represents a hot Completable-like source and consumer of events similar to Subjects. *
- * All methods are thread safe. Calling onComplete multiple - * times has no effect. Calling onError multiple times relays the Throwable to - * the RxJavaPlugins' error handler. + * This subject does not have a public constructor by design; a new non-terminated instance of this + * {@code CompletableSubject} can be created via the {@link #create()} method. *
- * The CompletableSubject doesn't store the Disposables coming through onSubscribe but - * disposes them once the other onXXX methods were called (terminal state reached). + * Since the {@code CompletableSubject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) + * as parameters to {@link #onError(Throwable)}. + *
+ * Even though {@code CompletableSubject} implements the {@code CompletableObserver} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code CompletableSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *
+ * All methods are thread safe. Calling {@link #onComplete()} multiple + * times has no effect. Calling {@link #onError(Throwable)} multiple times relays the {@code Throwable} to + * the {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} global error handler. + *
+ * This {@code CompletableSubject} supports the standard state-peeking methods {@link #hasComplete()}, + * {@link #hasThrowable()}, {@link #getThrowable()} and {@link #hasObservers()}. + *
+ * Example usage: + *
+ * CompletableSubject subject = CompletableSubject.create();
+ *
+ * TestObserver<Void> to1 = subject.test();
+ *
+ * // a fresh CompletableSubject is empty
+ * to1.assertEmpty();
+ *
+ * subject.onComplete();
+ *
+ * // a CompletableSubject is always void of items
+ * to1.assertResult();
+ *
+ * TestObserver<Void> to2 = subject.test()
+ *
+ * // late CompletableObservers receive the terminal event
+ * to2.assertResult();
+ *
* History: 2.0.5 - experimental * @since 2.1 */ diff --git a/src/main/java/io/reactivex/subjects/MaybeSubject.java b/src/main/java/io/reactivex/subjects/MaybeSubject.java index 7f303deefc..d6f84254f2 100644 --- a/src/main/java/io/reactivex/subjects/MaybeSubject.java +++ b/src/main/java/io/reactivex/subjects/MaybeSubject.java @@ -24,12 +24,85 @@ /** * Represents a hot Maybe-like source and consumer of events similar to Subjects. *
- * All methods are thread safe. Calling onSuccess or onComplete multiple - * times has no effect. Calling onError multiple times relays the Throwable to - * the RxJavaPlugins' error handler. + * This subject does not have a public constructor by design; a new non-terminated instance of this + * {@code MaybeSubject} can be created via the {@link #create()} method. *
- * The MaybeSubject doesn't store the Disposables coming through onSubscribe but - * disposes them once the other onXXX methods were called (terminal state reached). + * Since the {@code MaybeSubject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) + * as parameters to {@link #onSuccess(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + *
+ * Since a {@code MaybeSubject} is a {@link io.reactivex.Maybe}, calling {@code onSuccess}, {@code onError} + * or {@code onComplete} will move this {@code MaybeSubject} into its terminal state atomically. + *
+ * All methods are thread safe. Calling {@link #onSuccess(Object)} or {@link #onComplete()} multiple + * times has no effect. Calling {@link #onError(Throwable)} multiple times relays the {@code Throwable} to + * the {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} global error handler. + *
+ * Even though {@code MaybeSubject} implements the {@code MaybeObserver} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code MaybeSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + *
+ * This {@code MaybeSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()} as well as means to read any success item in a non-blocking + * and thread-safe manner via {@link #hasValue()} and {@link #getValue()}. + *
+ * The {@code MaybeSubject} does not support clearing its cached {@code onSuccess} value. + *
+ * Example usage: + *
+ * MaybeSubject<Integer> subject1 = MaybeSubject.create();
+ *
+ * TestObserver<Integer> to1 = subject1.test();
+ *
+ * // MaybeSubjects are empty by default
+ * to1.assertEmpty();
+ *
+ * subject1.onSuccess(1);
+ *
+ * // onSuccess is a terminal event with MaybeSubjects
+ * // TestObserver converts onSuccess into onNext + onComplete
+ * to1.assertResult(1);
+ *
+ * TestObserver<Integer> to2 = subject1.test();
+ *
+ * // late Observers receive the terminal signal (onSuccess) too
+ * to2.assertResult(1);
+ *
+ * // -----------------------------------------------------
+ *
+ * MaybeSubject<Integer> subject2 = MaybeSubject.create();
+ *
+ * TestObserver<Integer> to3 = subject2.test();
+ *
+ * subject2.onComplete();
+ *
+ * // a completed MaybeSubject completes its MaybeObservers
+ * to3.assertResult();
+ *
+ * TestObserver<Integer> to4 = subject1.test();
+ *
+ * // late Observers receive the terminal signal (onComplete) too
+ * to4.assertResult();
+ *
* History: 2.0.5 - experimental
* @param
*
+ * This subject does not have a public constructor by design; a new empty instance of this
+ * {@code PublishSubject} can be created via the {@link #create()} method.
+ *
+ * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
+ * {@code null}s are not allowed (Rule 2.13) as
+ * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a
+ * {@link NullPointerException} being thrown and the subject's state is not changed.
+ *
+ * Since a {@code PublishSubject} is an {@link io.reactivex.Observable}, it does not support backpressure.
+ *
+ * When this {@code PublishSubject} is terminated via {@link #onError(Throwable)} or {@link #onComplete()},
+ * late {@link io.reactivex.Observer}s only receive the respective terminal event.
+ *
+ * Unlike a {@link BehaviorSubject}, a {@code PublishSubject} doesn't retain/cache items, therefore, a new
+ * {@code Observer} won't receive any past items.
+ *
+ * Even though {@code PublishSubject} implements the {@code Observer} interface, calling
+ * {@code onSubscribe} is not required (Rule 2.12)
+ * if the subject is used as a standalone source. However, calling {@code onSubscribe}
+ * after the {@code PublishSubject} reached its terminal state will result in the
+ * given {@code Disposable} being disposed immediately.
+ *
+ * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
+ * is required to be serialized (called from the same thread or called non-overlappingly from different threads
+ * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s
+ * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer}
+ * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively).
+ *
+ * This {@code PublishSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
+ * {@link #getThrowable()} and {@link #hasObservers()}.
+ *
* Example usage:
*
*
+ * This subject does not have a public constructor by design; a new empty instance of this
+ * {@code ReplaySubject} can be created via the following {@code create} methods that
+ * allow specifying the retention policy for items:
+ *
+ * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
+ * {@code null}s are not allowed (Rule 2.13) as
+ * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a
+ * {@link NullPointerException} being thrown and the subject's state is not changed.
+ *
+ * Since a {@code ReplaySubject} is an {@link io.reactivex.Observable}, it does not support backpressure.
+ *
+ * When this {@code ReplaySubject} is terminated via {@link #onError(Throwable)} or {@link #onComplete()},
+ * late {@link io.reactivex.Observer}s will receive the retained/cached items first (if any) followed by the respective
+ * terminal event. If the {@code ReplaySubject} has a time-bound, the age of the retained/cached items are still considered
+ * when replaying and thus it may result in no items being emitted before the terminal event.
+ *
+ * Once an {@code Observer} has subscribed, it will receive items continuously from that point on. Bounds only affect how
+ * many past items a new {@code Observer} will receive before it catches up with the live event feed.
+ *
+ * Even though {@code ReplaySubject} implements the {@code Observer} interface, calling
+ * {@code onSubscribe} is not required (Rule 2.12)
+ * if the subject is used as a standalone source. However, calling {@code onSubscribe}
+ * after the {@code ReplaySubject} reached its terminal state will result in the
+ * given {@code Disposable} being disposed immediately.
+ *
+ * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
+ * is required to be serialized (called from the same thread or called non-overlappingly from different threads
+ * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s
+ * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer}
+ * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively).
+ *
+ * This {@code ReplaySubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
+ * {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the retained/cached items
+ * in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
+ * {@link #getValues()} or {@link #getValues(Object[])}.
+ *
* Example usage:
*
*
+ *
+ * {@code
@@ -40,6 +86,8 @@
subject.onNext("three");
subject.onComplete();
+ // late Observers only receive the terminal event
+ subject.test().assertEmpty();
}
*
* @param
*
+ *
+ *
+ *
+ * {@code
- ReplaySubject