Skip to content

Commit

Permalink
Incorporate review suggestions.
Browse files Browse the repository at this point in the history
- Changes finally0 to finallyDo.
- Removes unnecessary subscription-wrapping.
- Handle exceptions in onCompleted/onError
  • Loading branch information
abliss committed Mar 29, 2013
1 parent fb0e8b0 commit be560b5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 27 deletions.
8 changes: 4 additions & 4 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1192,8 +1192,8 @@ public static <T> Observable<T> concat(Observable<T>... source) {
* @return an Observable that emits the same objects, then calls the action.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
*/
public static <T> Observable<T> finally0(Observable source, Action0 action) {
return _create(OperationFinally.finally0(source, action));
public static <T> Observable<T> finallyDo(Observable source, Action0 action) {
return _create(OperationFinally.finallyDo(source, action));
}

/**
Expand Down Expand Up @@ -2463,8 +2463,8 @@ public Observable<T> filter(Func1<T, Boolean> predicate) {
* @return an Observable that emits the same objects as this observable, then calls the action.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
*/
public Observable<T> finally0(Action0 action) {
return _create(OperationFinally.finally0(this, action));
public Observable<T> finallyDo(Action0 action) {
return _create(OperationFinally.finallyDo(this, action));
}

/**
Expand Down
38 changes: 15 additions & 23 deletions rxjava-core/src/main/java/rx/operators/OperationFinally.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ public final class OperationFinally {
/**
* Call a given action when a sequence completes (with or without an
* exception). The returned observable is exactly as threadsafe as the
* source observable; in particular, any situation allowing the source to
* call onComplete or onError multiple times allows the returned observable
* to call the final action multiple times.
* source observable.
* <p/>
* Note that "finally" is a Java reserved word and cannot be an identifier,
* so we use "finally0".
* so we use "finallyDo".
*
* @param sequence An observable sequence of elements
* @param action An action to be taken when the sequence is complete or throws an exception
Expand All @@ -48,7 +46,7 @@ public final class OperationFinally {
* the given action will be called.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN Observable.Finally method</a>
*/
public static <T> Func1<Observer<T>, Subscription> finally0(final Observable<T> sequence, final Action0 action) {
public static <T> Func1<Observer<T>, Subscription> finallyDo(final Observable<T> sequence, final Action0 action) {
return new Func1<Observer<T>, Subscription>() {
@Override
public Subscription call(Observer<T> observer) {
Expand All @@ -60,26 +58,14 @@ public Subscription call(Observer<T> observer) {
private static class Finally<T> implements Func1<Observer<T>, Subscription> {
private final Observable<T> sequence;
private final Action0 finalAction;
private Subscription s;

Finally(final Observable<T> sequence, Action0 finalAction) {
this.sequence = sequence;
this.finalAction = finalAction;
}

private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription();

private final Subscription actualSubscription = new Subscription() {
@Override
public void unsubscribe() {
if (null != s)
s.unsubscribe();
}
};

public Subscription call(Observer<T> observer) {
s = sequence.subscribe(new FinallyObserver(observer));
return Subscription.wrap(actualSubscription);
return sequence.subscribe(new FinallyObserver(observer));
}

private class FinallyObserver implements Observer<T> {
Expand All @@ -91,14 +77,20 @@ private class FinallyObserver implements Observer<T> {

@Override
public void onCompleted() {
observer.onCompleted();
finalAction.call();
try {
observer.onCompleted();
} finally {
finalAction.call();
}
}

@Override
public void onError(Exception e) {
observer.onError(e);
finalAction.call();
try {
observer.onError(e);
} finally {
finalAction.call();
}
}

@Override
Expand All @@ -117,7 +109,7 @@ public void before() {
aObserver = mock(Observer.class);
}
private void checkActionCalled(Observable<String> input) {
Observable.create(finally0(input, aAction0)).subscribe(aObserver);
Observable.create(finallyDo(input, aAction0)).subscribe(aObserver);
verify(aAction0, times(1)).call();
}
@Test
Expand Down

0 comments on commit be560b5

Please sign in to comment.