diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 926df1ad2c..537dd838e0 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -1193,4 +1193,72 @@ public final Observable> timeInterval(TimeUnit unit) { public final Observable> timeInterval(TimeUnit unit, Scheduler scheduler) { return lift(new OperatorTimeInterval<>(unit, scheduler)); } + + private Observable doOnEach(Consumer onNext, Consumer onError, Runnable onComplete, Runnable onAfterTerminate) { + Objects.requireNonNull(onNext); + Objects.requireNonNull(onError); + Objects.requireNonNull(onComplete); + Objects.requireNonNull(onAfterTerminate); + return lift(new OperatorDoOnEach<>(onNext, onError, onComplete, onAfterTerminate)); + } + + public final Observable doOnNext(Consumer onNext) { + return doOnEach(onNext, e -> { }, () -> { }, () -> { }); + } + + public final Observable doOnError(Consumer onError) { + return doOnEach(v -> { }, onError, () -> { }, () -> { }); + } + + public final Observable doOnComplete(Runnable onComplete) { + return doOnEach(v -> { }, e -> { }, onComplete, () -> { }); + } + + public final Observable doOnTerminate(Runnable onTerminate) { + return doOnEach(v -> { }, e -> onTerminate.run(), onTerminate, () -> { }); + } + + public final Observable finallyDo(Runnable onFinally) { + return doOnEach(v -> { }, e -> { }, () -> { }, onFinally); + } + + public final Observable doOnEach(Observer observer) { + return doOnEach(observer::onNext, observer::onError, observer::onComplete, () -> { }); + } + + public final Observable doOnEach(Consumer>> consumer) { + return doOnEach( + v -> consumer.accept(Try.ofValue(Optional.of(v))), + e -> consumer.accept(Try.ofError(e)), + () -> consumer.accept(Try.ofValue(Optional.empty())), + () -> { } + ); + } + + /** + * + * @deprecated use composition + */ + @Deprecated + public static Observable range(int start, int count, Scheduler scheduler) { + return range(start, count).subscribeOn(scheduler); + } + + /** + * + * @deprecated use composition + */ + @Deprecated + public final Observable repeat(Scheduler scheduler) { + return repeat().subscribeOn(scheduler); + } + + /** + * + * @deprecated use composition + */ + @Deprecated + public final Observable repeat(long times, Scheduler scheduler) { + return repeat(times).subscribeOn(scheduler); + } } diff --git a/src/main/java/io/reactivex/internal/operators/OperatorDoOnEach.java b/src/main/java/io/reactivex/internal/operators/OperatorDoOnEach.java new file mode 100644 index 0000000000..42c8edf6a2 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/OperatorDoOnEach.java @@ -0,0 +1,116 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators; + +import java.util.function.Consumer; + +import org.reactivestreams.*; + +import io.reactivex.Observable.Operator; +import io.reactivex.plugins.RxJavaPlugins; + +public final class OperatorDoOnEach implements Operator { + final Consumer onNext; + final Consumer onError; + final Runnable onComplete; + final Runnable onAfterTerminate; + + public OperatorDoOnEach(Consumer onNext, + Consumer onError, + Runnable onComplete, + Runnable onAfterTerminate) { + this.onNext = onNext; + this.onError = onError; + this.onComplete = onComplete; + this.onAfterTerminate = onAfterTerminate; + } + + @Override + public Subscriber apply(Subscriber t) { + // TODO Auto-generated method stub + return null; + } + + static final class DoOnEachSubscriber implements Subscriber { + final Subscriber actual; + final Consumer onNext; + final Consumer onError; + final Runnable onComplete; + final Runnable onAfterTerminate; + + public DoOnEachSubscriber( + Subscriber actual, + Consumer onNext, + Consumer onError, + Runnable onComplete, + Runnable onAfterTerminate) { + this.actual = actual; + this.onNext = onNext; + this.onError = onError; + this.onComplete = onComplete; + this.onAfterTerminate = onAfterTerminate; + } + + @Override + public void onSubscribe(Subscription s) { + actual.onSubscribe(s); + } + + @Override + public void onNext(T t) { + try { + onNext.accept(t); + } catch (Throwable e) { + onError(e); + return; + } + + actual.onNext(t); + } + + @Override + public void onError(Throwable t) { + try { + onError.accept(t); + } catch (Throwable e) { + t.addSuppressed(e); + } + actual.onError(t); + + try { + onAfterTerminate.run(); + } catch (Throwable e) { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + try { + onComplete.run(); + } catch (Throwable e) { + onError(e); + return; + } + + actual.onComplete(); + + try { + onAfterTerminate.run(); + } catch (Throwable e) { + RxJavaPlugins.onError(e); + } + } + } +}