Skip to content

Commit

Permalink
Added nullable annotations to subjects (#5890)
Browse files Browse the repository at this point in the history
  • Loading branch information
bangarharshit authored and akarnokd committed Mar 5, 2018
1 parent 3ba1d35 commit 51dd03b
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 33 deletions.
14 changes: 8 additions & 6 deletions src/main/java/io/reactivex/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -85,22 +86,22 @@
* AsyncSubject<Object> subject = AsyncSubject.create();
*
* TestObserver<Object> to1 = subject.test();
*
*
* to1.assertEmpty();
*
*
* subject.onNext(1);
*
*
* // AsyncSubject only emits when onComplete was called.
* to1.assertEmpty();
*
* subject.onNext(2);
* subject.onComplete();
*
*
* // onComplete triggers the emission of the last cached item and the onComplete event.
* to1.assertResult(2);
*
*
* TestObserver<Object> to2 = subject.test();
*
*
* // late Observers receive the last cached item too
* to2.assertResult(2);
* </code></pre>
Expand Down Expand Up @@ -313,6 +314,7 @@ public boolean hasValue() {
* <p>The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
*/
@Nullable
public T getValue() {
return subscribers.get() == TERMINATED ? value : null;
}
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/io/reactivex/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.subjects;

import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.Nullable;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.*;
Expand Down Expand Up @@ -63,19 +64,19 @@
* observable.onNext(1);
* // this will "clear" the cache
* observable.onNext(EMPTY);
*
*
* TestObserver&lt;Integer&gt; to2 = observable.test();
*
*
* subject.onNext(2);
* subject.onComplete();
*
*
* // to1 received both non-empty items
* to1.assertResult(1, 2);
*
*
* // to2 received only 2 even though the current item was EMPTY
* // when it got subscribed
* to2.assertResult(2);
*
*
* // Observers coming after the subject was terminated receive
* // no items and only the onComplete event in this case.
* observable.test().assertResult();
Expand Down Expand Up @@ -300,6 +301,7 @@ public boolean hasObservers() {
}

@Override
@Nullable
public Throwable getThrowable() {
Object o = value.get();
if (NotificationLite.isError(o)) {
Expand All @@ -313,6 +315,7 @@ public Throwable getThrowable() {
* <p>The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
*/
@Nullable
public T getValue() {
Object o = value.get();
if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/reactivex/subjects/CompletableSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import java.util.concurrent.atomic.*;

import io.reactivex.*;
Expand Down Expand Up @@ -65,12 +66,12 @@
* Example usage:
* <pre><code>
* CompletableSubject subject = CompletableSubject.create();
*
*
* TestObserver&lt;Void&gt; to1 = subject.test();
*
* // a fresh CompletableSubject is empty
* to1.assertEmpty();
*
*
* subject.onComplete();
*
* // a CompletableSubject is always void of items
Expand Down Expand Up @@ -213,6 +214,7 @@ void remove(CompletableDisposable inner) {
* Returns the terminal error if this CompletableSubject has been terminated with an error, null otherwise.
* @return the terminal error or null if not terminated or not with an error
*/
@Nullable
public Throwable getThrowable() {
if (observers.get() == TERMINATED) {
return error;
Expand Down
18 changes: 10 additions & 8 deletions src/main/java/io/reactivex/subjects/MaybeSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,20 @@
* Example usage:
* <pre><code>
* MaybeSubject&lt;Integer&gt; subject1 = MaybeSubject.create();
*
*
* TestObserver&lt;Integer&gt; to1 = subject1.test();
*
*
* // MaybeSubjects are empty by default
* to1.assertEmpty();
*
*
* subject1.onSuccess(1);
*
*
* // onSuccess is a terminal event with MaybeSubjects
* // TestObserver converts onSuccess into onNext + onComplete
* to1.assertResult(1);
*
* TestObserver&lt;Integer&gt; to2 = subject1.test();
*
*
* // late Observers receive the terminal signal (onSuccess) too
* to2.assertResult(1);
*
Expand All @@ -94,14 +94,14 @@
* MaybeSubject&lt;Integer&gt; subject2 = MaybeSubject.create();
*
* TestObserver&lt;Integer&gt; to3 = subject2.test();
*
*
* subject2.onComplete();
*
*
* // a completed MaybeSubject completes its MaybeObservers
* to3.assertResult();
*
* TestObserver&lt;Integer&gt; to4 = subject1.test();
*
*
* // late Observers receive the terminal signal (onComplete) too
* to4.assertResult();
* </code></pre>
Expand Down Expand Up @@ -263,6 +263,7 @@ void remove(MaybeDisposable<T> inner) {
* Returns the success value if this MaybeSubject was terminated with a success value.
* @return the success value or null
*/
@Nullable
public T getValue() {
if (observers.get() == TERMINATED) {
return value;
Expand All @@ -282,6 +283,7 @@ public boolean hasValue() {
* Returns the terminal error if this MaybeSubject has been terminated with an error, null otherwise.
* @return the terminal error or null if not terminated or not with an error
*/
@Nullable
public Throwable getThrowable() {
if (observers.get() == TERMINATED) {
return error;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.subjects;

import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.Nullable;
import java.util.concurrent.atomic.*;

import io.reactivex.Observer;
Expand Down Expand Up @@ -263,6 +264,7 @@ public boolean hasObservers() {
}

@Override
@Nullable
public Throwable getThrowable() {
if (subscribers.get() == TERMINATED) {
return error;
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -395,6 +396,7 @@ public boolean hasObservers() {
}

@Override
@Nullable
public Throwable getThrowable() {
Object o = buffer.get();
if (NotificationLite.isError(o)) {
Expand All @@ -408,6 +410,7 @@ public Throwable getThrowable() {
* <p>The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
*/
@Nullable
public T getValue() {
return buffer.getValue();
}
Expand Down Expand Up @@ -542,6 +545,7 @@ interface ReplayBuffer<T> {

int size();

@Nullable
T getValue();

T[] getValues(T[] array);
Expand Down Expand Up @@ -620,6 +624,7 @@ public void addFinal(Object notificationLite) {
}

@Override
@Nullable
@SuppressWarnings("unchecked")
public T getValue() {
int s = size;
Expand Down Expand Up @@ -838,6 +843,7 @@ public void addFinal(Object notificationLite) {
}

@Override
@Nullable
@SuppressWarnings("unchecked")
public T getValue() {
Node<Object> prev = null;
Expand Down Expand Up @@ -1080,6 +1086,7 @@ public void addFinal(Object notificationLite) {
}

@Override
@Nullable
@SuppressWarnings("unchecked")
public T getValue() {
TimedNode<Object> prev = null;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/SerializedSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.subjects;

import io.reactivex.Observer;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.util.*;
import io.reactivex.internal.util.AppendOnlyLinkedArrayList.NonThrowingPredicate;
Expand Down Expand Up @@ -193,6 +194,7 @@ public boolean hasThrowable() {
}

@Override
@Nullable
public Throwable getThrowable() {
return actual.getThrowable();
}
Expand Down
25 changes: 13 additions & 12 deletions src/main/java/io/reactivex/subjects/UnicastSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,37 +105,37 @@
* Example usage:
* <pre><code>
* UnicastSubject&lt;Integer&gt; subject = UnicastSubject.create();
*
*
* TestObserver&lt;Integer&gt; to1 = subject.test();
*
*
* // fresh UnicastSubjects are empty
* to1.assertEmpty();
*
*
* TestObserver&lt;Integer&gt; to2 = subject.test();
*
*
* // A UnicastSubject only allows one Observer during its lifetime
* to2.assertFailure(IllegalStateException.class);
*
*
* subject.onNext(1);
* to1.assertValue(1);
*
*
* subject.onNext(2);
* to1.assertValues(1, 2);
*
*
* subject.onComplete();
* to1.assertResult(1, 2);
*
*
* // ----------------------------------------------------
*
*
* UnicastSubject&lt;Integer&gt; subject2 = UnicastSubject.create();
*
*
* // a UnicastSubject caches events util its single Observer subscribes
* subject.onNext(1);
* subject.onNext(2);
* subject.onComplete();
*
*
* TestObserver&lt;Integer&gt; to3 = subject2.test();
*
*
* // the cached events are emitted in order
* to3.assertResult(1, 2);
* </code></pre>
Expand Down Expand Up @@ -498,6 +498,7 @@ public boolean hasObservers() {
}

@Override
@Nullable
public Throwable getThrowable() {
if (done) {
return error;
Expand Down

0 comments on commit 51dd03b

Please sign in to comment.