diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d24e48106c..fd14a42ce3 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -41,6 +41,7 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; +import rx.operators.OperationFinally; import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -1183,6 +1184,18 @@ public static Observable concat(Observable... source) { return _create(OperationConcat.concat(source)); } + /** + * Emits the same objects as the given Observable, calling the given action + * when it calls onComplete or onError. + * @param source an observable + * @param action an action to be called when the source completes or errors. + * @return an Observable that emits the same objects, then calls the action. + * @see MSDN: Observable.Finally Method + */ + public static Observable finallyDo(Observable source, Action0 action) { + return _create(OperationFinally.finallyDo(source, action)); + } + /** * Groups the elements of an observable and selects the resulting elements by using a specified function. * @@ -2443,6 +2456,17 @@ public Observable filter(Func1 predicate) { return filter(this, predicate); } + /** + * Registers an action to be called when this observable calls + * onComplete or onError. + * @param action an action to be called when this observable completes or errors. + * @return an Observable that emits the same objects as this observable, then calls the action. + * @see MSDN: Observable.Finally Method + */ + public Observable finallyDo(Action0 action) { + return _create(OperationFinally.finallyDo(this, action)); + } + /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationFinally.java b/rxjava-core/src/main/java/rx/operators/OperationFinally.java new file mode 100644 index 0000000000..636a8e61ae --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationFinally.java @@ -0,0 +1,124 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.junit.Before; +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +public final class OperationFinally { + + /** + * Call a given action when a sequence completes (with or without an + * exception). The returned observable is exactly as threadsafe as the + * source observable. + *

+ * Note that "finally" is a Java reserved word and cannot be an identifier, + * so we use "finallyDo". + * + * @param sequence An observable sequence of elements + * @param action An action to be taken when the sequence is complete or throws an exception + * @return An observable sequence with the same elements as the input. + * After the last element is consumed (and {@link Observer#onCompleted} has been called), + * or after an exception is thrown (and {@link Observer#onError} has been called), + * the given action will be called. + * @see MSDN Observable.Finally method + */ + public static Func1, Subscription> finallyDo(final Observable sequence, final Action0 action) { + return new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + return new Finally(sequence, action).call(observer); + } + }; + } + + private static class Finally implements Func1, Subscription> { + private final Observable sequence; + private final Action0 finalAction; + + Finally(final Observable sequence, Action0 finalAction) { + this.sequence = sequence; + this.finalAction = finalAction; + } + + public Subscription call(Observer observer) { + return sequence.subscribe(new FinallyObserver(observer)); + } + + private class FinallyObserver implements Observer { + private final Observer observer; + + FinallyObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + try { + observer.onCompleted(); + } finally { + finalAction.call(); + } + } + + @Override + public void onError(Exception e) { + try { + observer.onError(e); + } finally { + finalAction.call(); + } + } + + @Override + public void onNext(T args) { + observer.onNext(args); + } + } + } + + public static class UnitTest { + private Action0 aAction0; + private Observer aObserver; + @Before + public void before() { + aAction0 = mock(Action0.class); + aObserver = mock(Observer.class); + } + private void checkActionCalled(Observable input) { + Observable.create(finallyDo(input, aAction0)).subscribe(aObserver); + verify(aAction0, times(1)).call(); + } + @Test + public void testFinallyCalledOnComplete() { + checkActionCalled(Observable.toObservable(new String[] {"1", "2", "3"})); + } + @Test + public void testFinallyCalledOnError() { + checkActionCalled(Observable.error(new RuntimeException("expected"))); + } + } +}