diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 4507c4480f..e028984dfe 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5816,7 +5816,7 @@ public final Observable exists(Func1 predicate) { * @see ReactiveX operators documentation: Filter */ public final Observable filter(Func1 predicate) { - return lift(new OperatorFilter(predicate)); + return create(new OnSubscribeFilter(this, predicate)); } /** @@ -6623,7 +6623,7 @@ public final Observable limit(int count) { * @see ReactiveX operators documentation: Map */ public final Observable map(Func1 func) { - return lift(new OperatorMap(func)); + return create(new OnSubscribeMap(this, func)); } private Observable mapNotification(Func1 onNext, Func1 onError, Func0 onCompleted) { diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 183f869fd0..e9c2890320 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1413,7 +1413,7 @@ public final Observable flatMapObservable(Func1ReactiveX operators documentation: Map */ public final Single map(Func1 func) { - return lift(new OperatorMap(func)); + return create(new SingleOnSubscribeMap(this, func)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorFilter.java b/src/main/java/rx/internal/operators/OnSubscribeFilter.java similarity index 88% rename from src/main/java/rx/internal/operators/OperatorFilter.java rename to src/main/java/rx/internal/operators/OnSubscribeFilter.java index 6d489b0e67..2fe3abcde0 100644 --- a/src/main/java/rx/internal/operators/OperatorFilter.java +++ b/src/main/java/rx/internal/operators/OnSubscribeFilter.java @@ -16,7 +16,7 @@ package rx.internal.operators; import rx.*; -import rx.Observable.Operator; +import rx.Observable.OnSubscribe; import rx.exceptions.*; import rx.functions.Func1; import rx.internal.util.RxJavaPluginUtils; @@ -27,19 +27,22 @@ * * @param the value type */ -public final class OperatorFilter implements Operator { +public final class OnSubscribeFilter implements OnSubscribe { + final Observable source; + final Func1 predicate; - public OperatorFilter(Func1 predicate) { + public OnSubscribeFilter(Observable source, Func1 predicate) { + this.source = source; this.predicate = predicate; } @Override - public Subscriber call(final Subscriber child) { + public void call(final Subscriber child) { FilterSubscriber parent = new FilterSubscriber(child, predicate); child.add(parent); - return parent; + source.unsafeSubscribe(parent); } static final class FilterSubscriber extends Subscriber { diff --git a/src/main/java/rx/internal/operators/OperatorMap.java b/src/main/java/rx/internal/operators/OnSubscribeMap.java similarity index 88% rename from src/main/java/rx/internal/operators/OperatorMap.java rename to src/main/java/rx/internal/operators/OnSubscribeMap.java index a8a33178ca..0ad3ea39e8 100644 --- a/src/main/java/rx/internal/operators/OperatorMap.java +++ b/src/main/java/rx/internal/operators/OnSubscribeMap.java @@ -16,7 +16,7 @@ package rx.internal.operators; import rx.*; -import rx.Observable.Operator; +import rx.Observable.OnSubscribe; import rx.exceptions.*; import rx.functions.Func1; import rx.internal.util.RxJavaPluginUtils; @@ -30,19 +30,22 @@ * @param the input value type * @param the return value type */ -public final class OperatorMap implements Operator { +public final class OnSubscribeMap implements OnSubscribe { + final Observable source; + final Func1 transformer; - public OperatorMap(Func1 transformer) { + public OnSubscribeMap(Observable source, Func1 transformer) { + this.source = source; this.transformer = transformer; } - + @Override - public Subscriber call(final Subscriber o) { + public void call(final Subscriber o) { MapSubscriber parent = new MapSubscriber(o, transformer); o.add(parent); - return parent; + source.unsafeSubscribe(parent); } static final class MapSubscriber extends Subscriber { diff --git a/src/main/java/rx/internal/operators/SingleOnSubscribeMap.java b/src/main/java/rx/internal/operators/SingleOnSubscribeMap.java new file mode 100644 index 0000000000..78ff4936ff --- /dev/null +++ b/src/main/java/rx/internal/operators/SingleOnSubscribeMap.java @@ -0,0 +1,90 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import rx.*; +import rx.exceptions.*; +import rx.functions.Func1; +import rx.internal.util.RxJavaPluginUtils; + +/** + * Applies a function of your choosing to every item emitted by an {@code Single}, and emits the results of + * this transformation as a new {@code Single}. + * + * @param the input value type + * @param the return value type + */ +public final class SingleOnSubscribeMap implements Single.OnSubscribe { + + final Single source; + + final Func1 transformer; + + public SingleOnSubscribeMap(Single source, Func1 transformer) { + this.source = source; + this.transformer = transformer; + } + + @Override + public void call(final SingleSubscriber o) { + MapSubscriber parent = new MapSubscriber(o, transformer); + o.add(parent); + source.subscribe(parent); + } + + static final class MapSubscriber extends SingleSubscriber { + + final SingleSubscriber actual; + + final Func1 mapper; + + boolean done; + + public MapSubscriber(SingleSubscriber actual, Func1 mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void onSuccess(T t) { + R result; + + try { + result = mapper.call(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + unsubscribe(); + onError(OnErrorThrowable.addValueAsLastCause(ex, t)); + return; + } + + actual.onSuccess(result); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPluginUtils.handleException(e); + return; + } + done = true; + + actual.onError(e); + } + } + +} + diff --git a/src/perf/java/rx/operators/OperatorMapPerf.java b/src/perf/java/rx/operators/OperatorMapPerf.java index 124aee65e4..81ee0ae702 100644 --- a/src/perf/java/rx/operators/OperatorMapPerf.java +++ b/src/perf/java/rx/operators/OperatorMapPerf.java @@ -17,17 +17,9 @@ import java.util.concurrent.TimeUnit; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.*; -import rx.Observable.Operator; import rx.functions.Func1; -import rx.internal.operators.OperatorMap; import rx.jmh.InputWithIncrementingInteger; @BenchmarkMode(Mode.Throughput) @@ -49,7 +41,7 @@ public int getSize() { @Benchmark public void mapPassThruViaLift(Input input) throws InterruptedException { - input.observable.lift(MAP_OPERATOR).subscribe(input.observer); + input.observable.map(IDENTITY_FUNCTION).subscribe(input.observer); } @Benchmark @@ -63,7 +55,4 @@ public Integer call(Integer value) { return value; } }; - - private static final Operator MAP_OPERATOR = new OperatorMap(IDENTITY_FUNCTION); - } diff --git a/src/test/java/rx/ObservableConversionTest.java b/src/test/java/rx/ObservableConversionTest.java index 56f3ee2cd5..b90132fed7 100644 --- a/src/test/java/rx/ObservableConversionTest.java +++ b/src/test/java/rx/ObservableConversionTest.java @@ -31,8 +31,8 @@ import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Func1; import rx.functions.Func2; -import rx.internal.operators.OperatorFilter; -import rx.internal.operators.OperatorMap; +import rx.internal.operators.OnSubscribeFilter; +import rx.internal.operators.OnSubscribeMap; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -76,11 +76,11 @@ public CylonDetectorObservable compose(Func1 beep(Func1 predicate) { - return lift(new OperatorFilter(predicate)); + return create(new OnSubscribeFilter(Observable.create(onSubscribe), predicate)); } public final CylonDetectorObservable boop(Func1 func) { - return lift(new OperatorMap(func)); + return create(new OnSubscribeMap(Observable.create(onSubscribe), func)); } public CylonDetectorObservable DESTROY() { diff --git a/src/test/java/rx/internal/operators/OperatorMapTest.java b/src/test/java/rx/internal/operators/OperatorMapTest.java index 18e3e523e3..d13439b5de 100644 --- a/src/test/java/rx/internal/operators/OperatorMapTest.java +++ b/src/test/java/rx/internal/operators/OperatorMapTest.java @@ -57,14 +57,14 @@ public void testMap() { Map m2 = getMap("Two"); Observable> observable = Observable.just(m1, m2); - Observable m = observable.lift(new OperatorMap, String>(new Func1, String>() { + Observable m = observable.map(new Func1, String>() { @Override public String call(Map map) { return map.get("firstName"); } - })); + }); m.subscribe(stringObserver); verify(stringObserver, never()).onError(any(Throwable.class)); @@ -155,7 +155,7 @@ public String call(Map map) { @Test public void testMapWithError() { Observable w = Observable.just("one", "fail", "two", "three", "fail"); - Observable m = w.lift(new OperatorMap(new Func1() { + Observable m = w.map(new Func1() { @Override public String call(String s) { if ("fail".equals(s)) { @@ -163,7 +163,7 @@ public String call(String s) { } return s; } - })).doOnError(new Action1() { + }).doOnError(new Action1() { @Override public void call(Throwable t1) {