diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index b40864ea6a..7fea0a1fb1 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -48,7 +48,6 @@ import rx.observers.SafeSubscriber; import rx.operators.OnSubscribeFromIterable; import rx.operators.OnSubscribeRange; -import rx.operators.OperationAsObservable; import rx.operators.OperationBuffer; import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; @@ -93,6 +92,7 @@ import rx.operators.OperatorAll; import rx.operators.OperatorAmb; import rx.operators.OperatorAny; +import rx.operators.OperatorAsObservable; import rx.operators.OperatorCache; import rx.operators.OperatorCast; import rx.operators.OperatorDoOnEach; @@ -2952,7 +2952,7 @@ public final Observable all(Func1 predicate) { * @return an Observable that hides the identity of this Observable */ public final Observable asObservable() { - return create(new OperationAsObservable(this)); + return lift(new OperatorAsObservable()); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationAsObservable.java b/rxjava-core/src/main/java/rx/operators/OperatorAsObservable.java similarity index 61% rename from rxjava-core/src/main/java/rx/operators/OperationAsObservable.java rename to rxjava-core/src/main/java/rx/operators/OperatorAsObservable.java index d5c907ff5d..3c25e4b4f9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationAsObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorAsObservable.java @@ -15,11 +15,8 @@ */ package rx.operators; -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; -import rx.observers.Subscribers; +import rx.Observable.Operator; +import rx.Subscriber; /** * Hides the identity of another observable. @@ -27,15 +24,11 @@ * @param * the return value type of the wrapped observable. */ -public final class OperationAsObservable implements OnSubscribeFunc { - private final Observable source; - - public OperationAsObservable(Observable source) { - this.source = source; - } +public final class OperatorAsObservable implements Operator { @Override - public Subscription onSubscribe(final Observer t1) { - return source.unsafeSubscribe(Subscribers.from(t1)); + public Subscriber call(Subscriber s) { + return s; } + } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorAsObservableTest.java b/rxjava-core/src/test/java/rx/operators/OperatorAsObservableTest.java new file mode 100644 index 0000000000..c1773f1c4d --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorAsObservableTest.java @@ -0,0 +1,65 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import static org.junit.Assert.assertFalse; +import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observer; +import rx.subjects.PublishSubject; + +public class OperatorAsObservableTest { + @Test + public void testHiding() { + PublishSubject src = PublishSubject.create(); + + Observable dst = src.asObservable(); + + assertFalse(dst instanceof PublishSubject); + + Observer o = mock(Observer.class); + + dst.subscribe(o); + + src.onNext(1); + src.onCompleted(); + + verify(o).onNext(1); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testHidingError() { + PublishSubject src = PublishSubject.create(); + + Observable dst = src.asObservable(); + + assertFalse(dst instanceof PublishSubject); + + Observer o = mock(Observer.class); + + dst.subscribe(o); + + src.onError(new OperationReduceTest.CustomException()); + + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + verify(o).onError(any(OperationReduceTest.CustomException.class)); + } +}