Skip to content

Commit

Permalink
Common approach for IllegalArgumentException on subscribe methods
Browse files Browse the repository at this point in the history
As brought up in ReactiveX#278 there were inconsistencies in how arguments were being validated on subscribe methods.

All arguments to subscribe are now validated correctly at the beginning of method invocation and IllegalArgumentException thrown if null arguments are injected.

According to Rx Guideline 6.5 a null argument is considered "a catastrophic error occurs that should bring down the whole program anyway" and thus we immediately throw. A null argument should be caught immediately in development and has nothing to do with subscribe invocation which is what guideline 6.5 is talking about (since a null Observer can't have onError called on it anyways).
  • Loading branch information
benjchristensen committed Jun 3, 2013
1 parent d17a5e2 commit 94165ca
Showing 1 changed file with 53 additions and 32 deletions.
85 changes: 53 additions & 32 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,16 @@ protected Observable() {
* @param observer
* @return a {@link Subscription} reference that allows observers
* to stop receiving notifications before the provider has finished sending them
* @throws IllegalArgumentException
* if null argument provided
*/
public Subscription subscribe(Observer<T> observer) {
// allow the hook to intercept and/or decorate
Func1<Observer<T>, Subscription> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
// validate and proceed
if (observer == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (onSubscribeFunction == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
Expand Down Expand Up @@ -224,6 +229,8 @@ public Subscription subscribe(Observer<T> observer) {
* The {@link Scheduler} that the sequence is subscribed to on.
* @return a {@link Subscription} reference that allows observers
* to stop receiving notifications before the provider has finished sending them
* @throws IllegalArgumentException
* if null argument provided
*/
public Subscription subscribe(Observer<T> observer, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(observer);
Expand All @@ -241,11 +248,14 @@ private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Map<String, Object> callbacks) {
// lookup and memoize onNext
if (callbacks == null) {
throw new RuntimeException("callbacks map can not be null");
}
Object _onNext = callbacks.get("onNext");
if (_onNext == null) {
throw new RuntimeException("onNext must be implemented");
throw new RuntimeException("'onNext' key must contain an implementation");
}
// lookup and memoize onNext
final FuncN onNext = Functions.from(_onNext);

/**
Expand Down Expand Up @@ -291,10 +301,11 @@ public Subscription subscribe(final Object o) {
return subscribe((Observer) o);
}

// lookup and memoize onNext
if (o == null) {
throw new RuntimeException("onNext must be implemented");
throw new IllegalArgumentException("onNext can not be null");
}

// lookup and memoize onNext
final FuncN onNext = Functions.from(o);

/**
Expand Down Expand Up @@ -328,6 +339,9 @@ public Subscription subscribe(final Object o, Scheduler scheduler) {
}

public Subscription subscribe(final Action1<T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}

/**
* Wrapping since raw functions provided by the user are being invoked.
Expand All @@ -349,9 +363,6 @@ public void onError(Exception e) {

@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}

Expand All @@ -364,10 +375,14 @@ public Subscription subscribe(final Action1<T> onNext, Scheduler scheduler) {

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError) {
// lookup and memoize onNext
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}

// lookup and memoize onNext
final FuncN onNextFunction = Functions.from(onNext);

/**
Expand All @@ -385,9 +400,7 @@ public void onCompleted() {
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
Functions.from(onError).call(e);
}
Functions.from(onError).call(e);
}

@Override
Expand All @@ -403,6 +416,12 @@ public Subscription subscribe(final Object onNext, final Object onError, Schedul
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}

/**
* Wrapping since raw functions provided by the user are being invoked.
Expand All @@ -419,16 +438,11 @@ public void onCompleted() {
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
onError.call(e);
}
onError.call(e);
}

@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}

Expand All @@ -441,10 +455,17 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) {
// lookup and memoize onNext
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}

// lookup and memoize onNext
final FuncN onNextFunction = Functions.from(onNext);

/**
Expand All @@ -456,17 +477,13 @@ public Subscription subscribe(final Object onNext, final Object onError, final O

@Override
public void onCompleted() {
if (onComplete != null) {
Functions.from(onComplete).call();
}
Functions.from(onComplete).call();
}

@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
Functions.from(onError).call(e);
}
Functions.from(onError).call(e);
}

@Override
Expand All @@ -482,6 +499,15 @@ public Subscription subscribe(final Object onNext, final Object onError, final O
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}

/**
* Wrapping since raw functions provided by the user are being invoked.
Expand All @@ -498,16 +524,11 @@ public void onCompleted() {
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
onError.call(e);
}
onError.call(e);
}

@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}

Expand Down

0 comments on commit 94165ca

Please sign in to comment.