Skip to content

Commit

Permalink
3.x: Add onErrorComplete to Flowable, Observable and Single (#6867)
Browse files Browse the repository at this point in the history
* 3.x: Add onErrorComplete to F/O/S

* Add version tags.
  • Loading branch information
akarnokd authored Jan 24, 2020
1 parent 13473da commit ad1840b
Show file tree
Hide file tree
Showing 11 changed files with 793 additions and 4 deletions.
54 changes: 53 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.flowables.*;
Expand All @@ -28,7 +29,7 @@
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.rxjava3.internal.operators.observable.*;
import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.rxjava3.internal.subscribers.*;
import io.reactivex.rxjava3.internal.util.*;
Expand Down Expand Up @@ -12308,6 +12309,57 @@ public final Flowable<T> onBackpressureLatest() {
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this));
}

/**
* Returns a {@code Flowable} instance that if the current {@code Flowable} emits an error, it will emit an {@code onComplete}
* and swallow the throwable.
* <p>
* <img width="640" height="372" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.onErrorComplete.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new {@code Flowable} instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@NonNull
public final Flowable<T> onErrorComplete() {
return onErrorComplete(Functions.alwaysTrue());
}

/**
* Returns a {@code Flowable} instance that if the current {@code Flowable} emits an error and the predicate returns
* {@code true}, it will emit an {@code onComplete} and swallow the throwable.
* <p>
* <img width="640" height="201" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.onErrorComplete.f.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param predicate the predicate to call when an {@link Throwable} is emitted which should return {@code true}
* if the {@code Throwable} should be swallowed and replaced with an {@code onComplete}.
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code predicate} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Flowable<T> onErrorComplete(@NonNull Predicate<? super Throwable> predicate) {
Objects.requireNonNull(predicate, "predicate is null");

return RxJavaPlugins.onAssembly(new FlowableOnErrorComplete<>(this, predicate));
}

/**
* Resumes the flow with a {@link Publisher} returned for the failure {@link Throwable} of the current {@code Flowable} by a
* function instead of signaling the error via {@code onError}.
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -4042,6 +4042,8 @@ public final Single<T> toSingle() {
/**
* Returns a {@code Maybe} instance that if this {@code Maybe} emits an error, it will emit an {@code onComplete}
* and swallow the throwable.
* <p>
* <img width="640" height="372" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorComplete.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -4058,6 +4060,8 @@ public final Maybe<T> onErrorComplete() {
/**
* Returns a {@code Maybe} instance that if this {@code Maybe} emits an error and the predicate returns
* {@code true}, it will emit an {@code onComplete} and swallow the throwable.
* <p>
* <img width="640" height="220" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorComplete.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10346,6 +10346,49 @@ public final <U> Observable<U> ofType(@NonNull Class<U> clazz) {
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}

/**
* Returns an {@code Observable} instance that if the current {@code Observable} emits an error, it will emit an {@code onComplete}
* and swallow the throwable.
* <p>
* <img width="640" height="373" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Observable.onErrorComplete.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new {@code Observable} instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Observable<T> onErrorComplete() {
return onErrorComplete(Functions.alwaysTrue());
}

/**
* Returns an {@code Observable} instance that if the current {@code Observable} emits an error and the predicate returns
* {@code true}, it will emit an {@code onComplete} and swallow the throwable.
* <p>
* <img width="640" height="215" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Observable.onErrorComplete.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param predicate the predicate to call when an {@link Throwable} is emitted which should return {@code true}
* if the {@code Throwable} should be swallowed and replaced with an {@code onComplete}.
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code predicate} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> onErrorComplete(@NonNull Predicate<? super Throwable> predicate) {
Objects.requireNonNull(predicate, "predicate is null");

return RxJavaPlugins.onAssembly(new ObservableOnErrorComplete<>(this, predicate));
}

/**
* Resumes the flow with an {@link ObservableSource} returned for the failure {@link Throwable} of the current {@code Observable} by a
* function instead of signaling the error via {@code onError}.
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -3439,6 +3439,49 @@ public final Single<T> onErrorResumeWith(@NonNull SingleSource<? extends T> fall
return onErrorResumeNext(Functions.justFunction(fallback));
}

/**
* Returns a {@link Maybe} instance that if the current {@code Single} emits an error, it will emit an {@code onComplete}
* and swallow the throwable.
* <p>
* <img width="640" height="554" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.onErrorComplete.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new {@code Maybe} instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Maybe<T> onErrorComplete() {
return onErrorComplete(Functions.alwaysTrue());
}

/**
* Returns a {@link Maybe} instance that if this {@code Single} emits an error and the predicate returns
* {@code true}, it will emit an {@code onComplete} and swallow the throwable.
* <p>
* <img width="640" height="220" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.onErrorComplete.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param predicate the predicate to call when an {@link Throwable} is emitted which should return {@code true}
* if the {@code Throwable} should be swallowed and replaced with an {@code onComplete}.
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code predicate} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> onErrorComplete(@NonNull Predicate<? super Throwable> predicate) {
Objects.requireNonNull(predicate, "predicate is null");

return RxJavaPlugins.onAssembly(new SingleOnErrorComplete<>(this, predicate));
}

/**
* Resumes the flow with a {@link SingleSource} returned for the failure {@link Throwable} of the current {@code Single} by a
* function instead of signaling the error via {@code onError}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.flowable;

import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;

/**
* Emits an onComplete if the source emits an onError and the predicate returns true for
* that Throwable.
*
* @param <T> the value type
* @since 3.0.0
*/
public final class FlowableOnErrorComplete<T> extends AbstractFlowableWithUpstream<T, T> {

final Predicate<? super Throwable> predicate;

public FlowableOnErrorComplete(Flowable<T> source,
Predicate<? super Throwable> predicate) {
super(source);
this.predicate = predicate;
}

@Override
protected void subscribeActual(Subscriber<? super T> observer) {
source.subscribe(new OnErrorCompleteSubscriber<>(observer, predicate));
}

public static final class OnErrorCompleteSubscriber<T>
implements FlowableSubscriber<T>, Subscription {

final Subscriber<? super T> downstream;

final Predicate<? super Throwable> predicate;

Subscription upstream;

public OnErrorCompleteSubscriber(Subscriber<? super T> actual, Predicate<? super Throwable> predicate) {
this.downstream = actual;
this.predicate = predicate;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;

downstream.onSubscribe(this);
}
}

@Override
public void onNext(T value) {
downstream.onNext(value);
}

@Override
public void onError(Throwable e) {
boolean b;

try {
b = predicate.test(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(e, ex));
return;
}

if (b) {
downstream.onComplete();
} else {
downstream.onError(e);
}
}

@Override
public void onComplete() {
downstream.onComplete();
}

@Override
public void cancel() {
upstream.cancel();
}

@Override
public void request(long n) {
upstream.request(n);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ public MaybeOnErrorComplete(MaybeSource<T> source,

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new OnErrorCompleteMaybeObserver<>(observer, predicate));
source.subscribe(new OnErrorCompleteMultiObserver<>(observer, predicate));
}

static final class OnErrorCompleteMaybeObserver<T> implements MaybeObserver<T>, Disposable {
public static final class OnErrorCompleteMultiObserver<T>
implements MaybeObserver<T>, SingleObserver<T>, Disposable {

final MaybeObserver<? super T> downstream;

final Predicate<? super Throwable> predicate;

Disposable upstream;

OnErrorCompleteMaybeObserver(MaybeObserver<? super T> actual, Predicate<? super Throwable> predicate) {
public OnErrorCompleteMultiObserver(MaybeObserver<? super T> actual, Predicate<? super Throwable> predicate) {
this.downstream = actual;
this.predicate = predicate;
}
Expand Down
Loading

0 comments on commit ad1840b

Please sign in to comment.