Skip to content

Commit

Permalink
Operations doOnX.
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Aug 28, 2015
1 parent b2b252c commit 2359c94
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 0 deletions.
68 changes: 68 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1193,4 +1193,72 @@ public final Observable<Timed<T>> timeInterval(TimeUnit unit) {
public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler) {
return lift(new OperatorTimeInterval<>(unit, scheduler));
}

private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete, Runnable onAfterTerminate) {
Objects.requireNonNull(onNext);
Objects.requireNonNull(onError);
Objects.requireNonNull(onComplete);
Objects.requireNonNull(onAfterTerminate);
return lift(new OperatorDoOnEach<>(onNext, onError, onComplete, onAfterTerminate));
}

public final Observable<T> doOnNext(Consumer<? super T> onNext) {
return doOnEach(onNext, e -> { }, () -> { }, () -> { });
}

public final Observable<T> doOnError(Consumer<? super Throwable> onError) {
return doOnEach(v -> { }, onError, () -> { }, () -> { });
}

public final Observable<T> doOnComplete(Runnable onComplete) {
return doOnEach(v -> { }, e -> { }, onComplete, () -> { });
}

public final Observable<T> doOnTerminate(Runnable onTerminate) {
return doOnEach(v -> { }, e -> onTerminate.run(), onTerminate, () -> { });
}

public final Observable<T> finallyDo(Runnable onFinally) {
return doOnEach(v -> { }, e -> { }, () -> { }, onFinally);
}

public final Observable<T> doOnEach(Observer<? super T> observer) {
return doOnEach(observer::onNext, observer::onError, observer::onComplete, () -> { });
}

public final Observable<T> doOnEach(Consumer<? super Try<Optional<T>>> consumer) {
return doOnEach(
v -> consumer.accept(Try.ofValue(Optional.of(v))),
e -> consumer.accept(Try.ofError(e)),
() -> consumer.accept(Try.ofValue(Optional.empty())),
() -> { }
);
}

/**
*
* @deprecated use composition
*/
@Deprecated
public static Observable<Integer> range(int start, int count, Scheduler scheduler) {
return range(start, count).subscribeOn(scheduler);
}

/**
*
* @deprecated use composition
*/
@Deprecated
public final Observable<T> repeat(Scheduler scheduler) {
return repeat().subscribeOn(scheduler);
}

/**
*
* @deprecated use composition
*/
@Deprecated
public final Observable<T> repeat(long times, Scheduler scheduler) {
return repeat(times).subscribeOn(scheduler);
}
}
116 changes: 116 additions & 0 deletions src/main/java/io/reactivex/internal/operators/OperatorDoOnEach.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators;

import java.util.function.Consumer;

import org.reactivestreams.*;

import io.reactivex.Observable.Operator;
import io.reactivex.plugins.RxJavaPlugins;

public final class OperatorDoOnEach<T> implements Operator<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Runnable onComplete;
final Runnable onAfterTerminate;

public OperatorDoOnEach(Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete,
Runnable onAfterTerminate) {
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}

@Override
public Subscriber<? super T> apply(Subscriber<? super T> t) {
// TODO Auto-generated method stub
return null;
}

static final class DoOnEachSubscriber<T> implements Subscriber<T> {
final Subscriber<? super T> actual;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Runnable onComplete;
final Runnable onAfterTerminate;

public DoOnEachSubscriber(
Subscriber<? super T> actual,
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete,
Runnable onAfterTerminate) {
this.actual = actual;
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}

@Override
public void onSubscribe(Subscription s) {
actual.onSubscribe(s);
}

@Override
public void onNext(T t) {
try {
onNext.accept(t);
} catch (Throwable e) {
onError(e);
return;
}

actual.onNext(t);
}

@Override
public void onError(Throwable t) {
try {
onError.accept(t);
} catch (Throwable e) {
t.addSuppressed(e);
}
actual.onError(t);

try {
onAfterTerminate.run();
} catch (Throwable e) {
RxJavaPlugins.onError(e);
}
}

@Override
public void onComplete() {
try {
onComplete.run();
} catch (Throwable e) {
onError(e);
return;
}

actual.onComplete();

try {
onAfterTerminate.run();
} catch (Throwable e) {
RxJavaPlugins.onError(e);
}
}
}
}

0 comments on commit 2359c94

Please sign in to comment.