Skip to content

Commit

Permalink
1.x: update map() and filter() to implement OnSubscribe directly (#4097)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jun 25, 2016
1 parent c60a08b commit c110f69
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 35 deletions.
4 changes: 2 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5816,7 +5816,7 @@ public final Observable<Boolean> exists(Func1<? super T, Boolean> predicate) {
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
*/
public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
return lift(new OperatorFilter<T>(predicate));
return create(new OnSubscribeFilter<T>(this, predicate));
}

/**
Expand Down Expand Up @@ -6623,7 +6623,7 @@ public final Observable<T> limit(int count) {
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
*/
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
return create(new OnSubscribeMap<T, R>(this, func));
}

private <R> Observable<R> mapNotification(Func1<? super T, ? extends R> onNext, Func1<? super Throwable, ? extends R> onError, Func0<? extends R> onCompleted) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ public final <R> Observable<R> flatMapObservable(Func1<? super T, ? extends Obse
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
*/
public final <R> Single<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
return create(new SingleOnSubscribeMap<T, R>(this, func));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package rx.internal.operators;

import rx.*;
import rx.Observable.Operator;
import rx.Observable.OnSubscribe;
import rx.exceptions.*;
import rx.functions.Func1;
import rx.internal.util.RxJavaPluginUtils;
Expand All @@ -27,19 +27,22 @@
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/filter.png" alt="">
* @param <T> the value type
*/
public final class OperatorFilter<T> implements Operator<T, T> {
public final class OnSubscribeFilter<T> implements OnSubscribe<T> {

final Observable<T> source;

final Func1<? super T, Boolean> predicate;

public OperatorFilter(Func1<? super T, Boolean> predicate) {
public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
this.source = source;
this.predicate = predicate;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
public void call(final Subscriber<? super T> child) {
FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
child.add(parent);
return parent;
source.unsafeSubscribe(parent);
}

static final class FilterSubscriber<T> extends Subscriber<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package rx.internal.operators;

import rx.*;
import rx.Observable.Operator;
import rx.Observable.OnSubscribe;
import rx.exceptions.*;
import rx.functions.Func1;
import rx.internal.util.RxJavaPluginUtils;
Expand All @@ -30,19 +30,22 @@
* @param <T> the input value type
* @param <R> the return value type
*/
public final class OperatorMap<T, R> implements Operator<R, T> {
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

final Observable<T> source;

final Func1<? super T, ? extends R> transformer;

public OperatorMap(Func1<? super T, ? extends R> transformer) {
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
source.unsafeSubscribe(parent);
}

static final class MapSubscriber<T, R> extends Subscriber<T> {
Expand Down
90 changes: 90 additions & 0 deletions src/main/java/rx/internal/operators/SingleOnSubscribeMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import rx.*;
import rx.exceptions.*;
import rx.functions.Func1;
import rx.internal.util.RxJavaPluginUtils;

/**
* Applies a function of your choosing to every item emitted by an {@code Single}, and emits the results of
* this transformation as a new {@code Single}.
*
* @param <T> the input value type
* @param <R> the return value type
*/
public final class SingleOnSubscribeMap<T, R> implements Single.OnSubscribe<R> {

final Single<T> source;

final Func1<? super T, ? extends R> transformer;

public SingleOnSubscribeMap(Single<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}

@Override
public void call(final SingleSubscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.subscribe(parent);
}

static final class MapSubscriber<T, R> extends SingleSubscriber<T> {

final SingleSubscriber<? super R> actual;

final Func1<? super T, ? extends R> mapper;

boolean done;

public MapSubscriber(SingleSubscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}

@Override
public void onSuccess(T t) {
R result;

try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}

actual.onSuccess(result);
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaPluginUtils.handleException(e);
return;
}
done = true;

actual.onError(e);
}
}

}

15 changes: 2 additions & 13 deletions src/perf/java/rx/operators/OperatorMapPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,9 @@

import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.*;

import rx.Observable.Operator;
import rx.functions.Func1;
import rx.internal.operators.OperatorMap;
import rx.jmh.InputWithIncrementingInteger;

@BenchmarkMode(Mode.Throughput)
Expand All @@ -49,7 +41,7 @@ public int getSize() {

@Benchmark
public void mapPassThruViaLift(Input input) throws InterruptedException {
input.observable.lift(MAP_OPERATOR).subscribe(input.observer);
input.observable.map(IDENTITY_FUNCTION).subscribe(input.observer);
}

@Benchmark
Expand All @@ -63,7 +55,4 @@ public Integer call(Integer value) {
return value;
}
};

private static final Operator<Integer, Integer> MAP_OPERATOR = new OperatorMap<Integer, Integer>(IDENTITY_FUNCTION);

}
8 changes: 4 additions & 4 deletions src/test/java/rx/ObservableConversionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.operators.OperatorFilter;
import rx.internal.operators.OperatorMap;
import rx.internal.operators.OnSubscribeFilter;
import rx.internal.operators.OnSubscribeMap;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

Expand Down Expand Up @@ -76,11 +76,11 @@ public <R> CylonDetectorObservable<? extends R> compose(Func1<CylonDetectorObser
}

public final CylonDetectorObservable<T> beep(Func1<? super T, Boolean> predicate) {
return lift(new OperatorFilter<T>(predicate));
return create(new OnSubscribeFilter<T>(Observable.create(onSubscribe), predicate));
}

public final <R> CylonDetectorObservable<R> boop(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
return create(new OnSubscribeMap<T, R>(Observable.create(onSubscribe), func));
}

public CylonDetectorObservable<String> DESTROY() {
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/rx/internal/operators/OperatorMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ public void testMap() {
Map<String, String> m2 = getMap("Two");
Observable<Map<String, String>> observable = Observable.just(m1, m2);

Observable<String> m = observable.lift(new OperatorMap<Map<String, String>, String>(new Func1<Map<String, String>, String>() {
Observable<String> m = observable.map(new Func1<Map<String, String>, String>() {

@Override
public String call(Map<String, String> map) {
return map.get("firstName");
}

}));
});
m.subscribe(stringObserver);

verify(stringObserver, never()).onError(any(Throwable.class));
Expand Down Expand Up @@ -155,15 +155,15 @@ public String call(Map<String, String> map) {
@Test
public void testMapWithError() {
Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");
Observable<String> m = w.lift(new OperatorMap<String, String>(new Func1<String, String>() {
Observable<String> m = w.map(new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
return s;
}
})).doOnError(new Action1<Throwable>() {
}).doOnError(new Action1<Throwable>() {

@Override
public void call(Throwable t1) {
Expand Down

0 comments on commit c110f69

Please sign in to comment.