From be560b58da1c89e56091cd109f620d7eda3a24fb Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Fri, 29 Mar 2013 04:22:56 +0000 Subject: [PATCH] Incorporate review suggestions. - Changes finally0 to finallyDo. - Removes unnecessary subscription-wrapping. - Handle exceptions in onCompleted/onError --- rxjava-core/src/main/java/rx/Observable.java | 8 ++-- .../java/rx/operators/OperationFinally.java | 38 ++++++++----------- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 631e175751..fd14a42ce3 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1192,8 +1192,8 @@ public static Observable concat(Observable... source) { * @return an Observable that emits the same objects, then calls the action. * @see MSDN: Observable.Finally Method */ - public static Observable finally0(Observable source, Action0 action) { - return _create(OperationFinally.finally0(source, action)); + public static Observable finallyDo(Observable source, Action0 action) { + return _create(OperationFinally.finallyDo(source, action)); } /** @@ -2463,8 +2463,8 @@ public Observable filter(Func1 predicate) { * @return an Observable that emits the same objects as this observable, then calls the action. * @see MSDN: Observable.Finally Method */ - public Observable finally0(Action0 action) { - return _create(OperationFinally.finally0(this, action)); + public Observable finallyDo(Action0 action) { + return _create(OperationFinally.finallyDo(this, action)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationFinally.java b/rxjava-core/src/main/java/rx/operators/OperationFinally.java index d90b0572a6..636a8e61ae 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFinally.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFinally.java @@ -33,12 +33,10 @@ public final class OperationFinally { /** * Call a given action when a sequence completes (with or without an * exception). The returned observable is exactly as threadsafe as the - * source observable; in particular, any situation allowing the source to - * call onComplete or onError multiple times allows the returned observable - * to call the final action multiple times. + * source observable. *

* Note that "finally" is a Java reserved word and cannot be an identifier, - * so we use "finally0". + * so we use "finallyDo". * * @param sequence An observable sequence of elements * @param action An action to be taken when the sequence is complete or throws an exception @@ -48,7 +46,7 @@ public final class OperationFinally { * the given action will be called. * @see MSDN Observable.Finally method */ - public static Func1, Subscription> finally0(final Observable sequence, final Action0 action) { + public static Func1, Subscription> finallyDo(final Observable sequence, final Action0 action) { return new Func1, Subscription>() { @Override public Subscription call(Observer observer) { @@ -60,26 +58,14 @@ public Subscription call(Observer observer) { private static class Finally implements Func1, Subscription> { private final Observable sequence; private final Action0 finalAction; - private Subscription s; Finally(final Observable sequence, Action0 finalAction) { this.sequence = sequence; this.finalAction = finalAction; } - private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription(); - - private final Subscription actualSubscription = new Subscription() { - @Override - public void unsubscribe() { - if (null != s) - s.unsubscribe(); - } - }; - public Subscription call(Observer observer) { - s = sequence.subscribe(new FinallyObserver(observer)); - return Subscription.wrap(actualSubscription); + return sequence.subscribe(new FinallyObserver(observer)); } private class FinallyObserver implements Observer { @@ -91,14 +77,20 @@ private class FinallyObserver implements Observer { @Override public void onCompleted() { - observer.onCompleted(); - finalAction.call(); + try { + observer.onCompleted(); + } finally { + finalAction.call(); + } } @Override public void onError(Exception e) { - observer.onError(e); - finalAction.call(); + try { + observer.onError(e); + } finally { + finalAction.call(); + } } @Override @@ -117,7 +109,7 @@ public void before() { aObserver = mock(Observer.class); } private void checkActionCalled(Observable input) { - Observable.create(finally0(input, aAction0)).subscribe(aObserver); + Observable.create(finallyDo(input, aAction0)).subscribe(aObserver); verify(aAction0, times(1)).call(); } @Test