Skip to content

Commit

Permalink
Merge pull request #1457 from benjchristensen/mergeDelayError-backpre…
Browse files Browse the repository at this point in the history
…ssure

MergeDelayError & OnErrorFlatMap w/ Merge
  • Loading branch information
benjchristensen committed Jul 17, 2014
2 parents cb20468 + cc11773 commit fc9dbd5
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 149 deletions.
100 changes: 86 additions & 14 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.MissingBackpressureException;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func1;
import rx.internal.util.RxRingBuffer;
import rx.internal.util.ScalarSynchronousObservable;
Expand All @@ -33,17 +37,26 @@
* <p>
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/Netflix/RxJava/images/rx-operators/merge.png" alt="">
* <p>
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single
* {@code Observable}, by using the merge operation.
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
*
* @param <T>
* the type of the items emitted by both the source and merged {@code Observable}s
*/
public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {

public OperatorMerge() {
this.delayErrors = false;
}

public OperatorMerge(boolean delayErrors) {
this.delayErrors = delayErrors;
}

private final boolean delayErrors;

@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
return new MergeSubscriber<T>(child);
return new MergeSubscriber<T>(child, delayErrors);

}

Expand All @@ -53,6 +66,8 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
private final MergeProducer<T> mergeProducer;
private int wip;
private boolean completed;
private final boolean delayErrors;
private ConcurrentLinkedQueue<Throwable> exceptions;

private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;

Expand All @@ -77,10 +92,11 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
* } </pre>
*/

public MergeSubscriber(Subscriber<? super T> actual) {
public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) {
super(actual);
this.actual = actual;
this.mergeProducer = new MergeProducer<T>(this);
this.delayErrors = delayErrors;
// decoupled the subscription chain because we need to decouple and control backpressure
actual.add(this);
actual.setProducer(mergeProducer);
Expand Down Expand Up @@ -337,8 +353,26 @@ public Boolean call(InnerSubscriber<T> s) {

@Override
public void onError(Throwable e) {
actual.onError(e);
unsubscribe();
if (delayErrors) {
synchronized (this) {
if (exceptions == null) {
exceptions = new ConcurrentLinkedQueue<Throwable>();
}
}
exceptions.add(e);
boolean sendOnComplete = false;
synchronized (this) {
wip--;
if (wip == 0 && completed) {
sendOnComplete = true;
}
}
if (sendOnComplete) {
drainAndComplete();
}
} else {
actual.onError(e);
}
}

@Override
Expand Down Expand Up @@ -372,7 +406,25 @@ void completeInner(InnerSubscriber<T> s) {

private void drainAndComplete() {
drainQueuesIfNeeded(); // TODO need to confirm whether this is needed or not
actual.onCompleted();
if (delayErrors) {
Queue<Throwable> es = null;
synchronized (this) {
es = exceptions;
}
if (es != null) {
if (es.isEmpty()) {
actual.onCompleted();
} else if (es.size() == 1) {
actual.onError(es.poll());
} else {
actual.onError(new CompositeException(es));
}
} else {
actual.onCompleted();
}
} else {
actual.onCompleted();
}
}

}
Expand Down Expand Up @@ -493,7 +545,12 @@ private void emit(T t, boolean complete) {
if (complete) {
parentSubscriber.completeInner(this);
} else {
parentSubscriber.actual.onNext(t);
try {
parentSubscriber.actual.onNext(t);
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
emitted++;
}
} else {
Expand All @@ -503,7 +560,12 @@ private void emit(T t, boolean complete) {
if (complete) {
parentSubscriber.completeInner(this);
} else {
parentSubscriber.actual.onNext(t);
try {
parentSubscriber.actual.onNext(t);
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
emitted++;
producer.REQUESTED.decrementAndGet(producer);
}
Expand Down Expand Up @@ -585,8 +647,13 @@ private int drainRequested() {
} else if (q.isCompleted(o)) {
parentSubscriber.completeInner(this);
} else {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
try {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
}
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, o));
}
}
}
Expand All @@ -604,8 +671,13 @@ private int drainAll() {
if (q.isCompleted(o)) {
parentSubscriber.completeInner(this);
} else {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
try {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
}
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, o));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.observers.SerializedSubscriber;
import rx.subscriptions.CompositeSubscription;

/**
* This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
* an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error
Expand All @@ -37,113 +28,15 @@
* This operation allows an Observer to receive all successfully emitted items from all of the
* source Observables without being interrupted by an error notification from one of them.
* <p>
* <em>Note:</em> If this is used on an Observable that never completes, it will never call
* {@code onError} and will effectively swallow errors.
* <em>Note:</em> If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors.
*
* @param <T> the source and result value type
* @param <T>
* the source and result value type
*/
public final class OperatorMergeDelayError<T> implements Operator<T, Observable<? extends T>> {

@Override
public Subscriber<? super Observable<? extends T>> call(Subscriber<? super T> child) {
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
final CompositeSubscription csub = new CompositeSubscription();
child.add(csub);

return new MergeDelayErrorSubscriber<T>(s, csub);
}

static final class MergeDelayErrorSubscriber<T> extends Subscriber<Observable<? extends T>> {
final Subscriber<? super T> s;
final CompositeSubscription csub;
final ConcurrentLinkedQueue<Throwable> exceptions = new ConcurrentLinkedQueue<Throwable>();

volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MergeDelayErrorSubscriber> WIP_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(MergeDelayErrorSubscriber.class, "wip");

public MergeDelayErrorSubscriber(Subscriber<? super T> s, CompositeSubscription csub) {
super(s);
this.s = s;
this.csub = csub;
this.wip = 1;
}

@Override
public void onNext(Observable<? extends T> t) {
WIP_UPDATER.incrementAndGet(this);

Subscriber<T> itemSub = new Subscriber<T>() {
/** Make sure terminal events are handled once to avoid wip problems. */
boolean once = true;
@Override
public void onNext(T t) {
// prevent misbehaving source to emit past the error
if (once) {
try {
s.onNext(t);
} catch (Throwable e) {
// in case the source doesn't properly handle exceptions
onError(e);
}
}
}

@Override
public void onError(Throwable e) {
if (once) {
once = false;
error(e);
}
}

@Override
public void onCompleted() {
if (once) {
once = false;
try {
complete();
} finally {
csub.remove(this);
}
}
}

};
csub.add(itemSub);

t.unsafeSubscribe(itemSub);
}

@Override
public void onError(Throwable e) {
error(e);
}

@Override
public void onCompleted() {
complete();
}
public final class OperatorMergeDelayError<T> extends OperatorMerge<T> {

void error(Throwable e) {
exceptions.add(e);
complete();
}

void complete() {
if (WIP_UPDATER.decrementAndGet(this) == 0) {
if (exceptions.isEmpty()) {
s.onCompleted();
} else
if (exceptions.size() > 1) {
s.onError(new CompositeException(exceptions));
} else {
s.onError(exceptions.peek());
}
exceptions.clear();
unsubscribe();
}
}
public OperatorMergeDelayError() {
super(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public void onError(final Throwable e) {
if (terminated) {
return;
}
terminated = true;
if (emitting) {
if (queue == null) {
queue = new FastList();
Expand All @@ -121,6 +120,9 @@ public void onError(final Throwable e) {
}
drainQueue(list);
actual.onError(e);
synchronized(this) {
emitting = false;
}
}

@Override
Expand Down
Loading

0 comments on commit fc9dbd5

Please sign in to comment.