diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java index 23e065bc66..db8f50f22d 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java @@ -15,6 +15,8 @@ */ package rx.internal.operators; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -22,7 +24,9 @@ import rx.Observable.Operator; import rx.Producer; import rx.Subscriber; +import rx.exceptions.CompositeException; import rx.exceptions.MissingBackpressureException; +import rx.exceptions.OnErrorThrowable; import rx.functions.Func1; import rx.internal.util.RxRingBuffer; import rx.internal.util.ScalarSynchronousObservable; @@ -33,17 +37,26 @@ *

* *

- * You can combine the items emitted by multiple {@code Observable}s so that they act like a single - * {@code Observable}, by using the merge operation. + * You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation. * * @param * the type of the items emitted by both the source and merged {@code Observable}s */ -public final class OperatorMerge implements Operator> { +public class OperatorMerge implements Operator> { + + public OperatorMerge() { + this.delayErrors = false; + } + + public OperatorMerge(boolean delayErrors) { + this.delayErrors = delayErrors; + } + + private final boolean delayErrors; @Override public Subscriber> call(final Subscriber child) { - return new MergeSubscriber(child); + return new MergeSubscriber(child, delayErrors); } @@ -53,6 +66,8 @@ private static final class MergeSubscriber extends Subscriber mergeProducer; private int wip; private boolean completed; + private final boolean delayErrors; + private ConcurrentLinkedQueue exceptions; private volatile SubscriptionIndexedRingBuffer> childrenSubscribers; @@ -77,10 +92,11 @@ private static final class MergeSubscriber extends Subscriber */ - public MergeSubscriber(Subscriber actual) { + public MergeSubscriber(Subscriber actual, boolean delayErrors) { super(actual); this.actual = actual; this.mergeProducer = new MergeProducer(this); + this.delayErrors = delayErrors; // decoupled the subscription chain because we need to decouple and control backpressure actual.add(this); actual.setProducer(mergeProducer); @@ -337,8 +353,26 @@ public Boolean call(InnerSubscriber s) { @Override public void onError(Throwable e) { - actual.onError(e); - unsubscribe(); + if (delayErrors) { + synchronized (this) { + if (exceptions == null) { + exceptions = new ConcurrentLinkedQueue(); + } + } + exceptions.add(e); + boolean sendOnComplete = false; + synchronized (this) { + wip--; + if (wip == 0 && completed) { + sendOnComplete = true; + } + } + if (sendOnComplete) { + drainAndComplete(); + } + } else { + actual.onError(e); + } } @Override @@ -372,7 +406,25 @@ void completeInner(InnerSubscriber s) { private void drainAndComplete() { drainQueuesIfNeeded(); // TODO need to confirm whether this is needed or not - actual.onCompleted(); + if (delayErrors) { + Queue es = null; + synchronized (this) { + es = exceptions; + } + if (es != null) { + if (es.isEmpty()) { + actual.onCompleted(); + } else if (es.size() == 1) { + actual.onError(es.poll()); + } else { + actual.onError(new CompositeException(es)); + } + } else { + actual.onCompleted(); + } + } else { + actual.onCompleted(); + } } } @@ -493,7 +545,12 @@ private void emit(T t, boolean complete) { if (complete) { parentSubscriber.completeInner(this); } else { - parentSubscriber.actual.onNext(t); + try { + parentSubscriber.actual.onNext(t); + } catch (Throwable e) { + // special error handling due to complexity of merge + onError(OnErrorThrowable.addValueAsLastCause(e, t)); + } emitted++; } } else { @@ -503,7 +560,12 @@ private void emit(T t, boolean complete) { if (complete) { parentSubscriber.completeInner(this); } else { - parentSubscriber.actual.onNext(t); + try { + parentSubscriber.actual.onNext(t); + } catch (Throwable e) { + // special error handling due to complexity of merge + onError(OnErrorThrowable.addValueAsLastCause(e, t)); + } emitted++; producer.REQUESTED.decrementAndGet(producer); } @@ -585,8 +647,13 @@ private int drainRequested() { } else if (q.isCompleted(o)) { parentSubscriber.completeInner(this); } else { - if (!q.accept(o, parentSubscriber.actual)) { - emitted++; + try { + if (!q.accept(o, parentSubscriber.actual)) { + emitted++; + } + } catch (Throwable e) { + // special error handling due to complexity of merge + onError(OnErrorThrowable.addValueAsLastCause(e, o)); } } } @@ -604,8 +671,13 @@ private int drainAll() { if (q.isCompleted(o)) { parentSubscriber.completeInner(this); } else { - if (!q.accept(o, parentSubscriber.actual)) { - emitted++; + try { + if (!q.accept(o, parentSubscriber.actual)) { + emitted++; + } + } catch (Throwable e) { + // special error handling due to complexity of merge + onError(OnErrorThrowable.addValueAsLastCause(e, o)); } } } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java index 58c3bd785b..169215f539 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java @@ -1,29 +1,20 @@ - /** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.internal.operators; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import rx.Observable; -import rx.Observable.Operator; -import rx.Subscriber; -import rx.exceptions.CompositeException; -import rx.observers.SerializedSubscriber; -import rx.subscriptions.CompositeSubscription; - /** * This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of * an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error @@ -37,113 +28,15 @@ * This operation allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. *

- * Note: If this is used on an Observable that never completes, it will never call - * {@code onError} and will effectively swallow errors. + * Note: If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors. * - * @param the source and result value type + * @param + * the source and result value type */ -public final class OperatorMergeDelayError implements Operator> { - - @Override - public Subscriber> call(Subscriber child) { - final SerializedSubscriber s = new SerializedSubscriber(child); - final CompositeSubscription csub = new CompositeSubscription(); - child.add(csub); - - return new MergeDelayErrorSubscriber(s, csub); - } - - static final class MergeDelayErrorSubscriber extends Subscriber> { - final Subscriber s; - final CompositeSubscription csub; - final ConcurrentLinkedQueue exceptions = new ConcurrentLinkedQueue(); - - volatile int wip; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(MergeDelayErrorSubscriber.class, "wip"); - - public MergeDelayErrorSubscriber(Subscriber s, CompositeSubscription csub) { - super(s); - this.s = s; - this.csub = csub; - this.wip = 1; - } - - @Override - public void onNext(Observable t) { - WIP_UPDATER.incrementAndGet(this); - - Subscriber itemSub = new Subscriber() { - /** Make sure terminal events are handled once to avoid wip problems. */ - boolean once = true; - @Override - public void onNext(T t) { - // prevent misbehaving source to emit past the error - if (once) { - try { - s.onNext(t); - } catch (Throwable e) { - // in case the source doesn't properly handle exceptions - onError(e); - } - } - } - - @Override - public void onError(Throwable e) { - if (once) { - once = false; - error(e); - } - } - - @Override - public void onCompleted() { - if (once) { - once = false; - try { - complete(); - } finally { - csub.remove(this); - } - } - } - - }; - csub.add(itemSub); - - t.unsafeSubscribe(itemSub); - } - - @Override - public void onError(Throwable e) { - error(e); - } - - @Override - public void onCompleted() { - complete(); - } +public final class OperatorMergeDelayError extends OperatorMerge { - void error(Throwable e) { - exceptions.add(e); - complete(); - } - - void complete() { - if (WIP_UPDATER.decrementAndGet(this) == 0) { - if (exceptions.isEmpty()) { - s.onCompleted(); - } else - if (exceptions.size() > 1) { - s.onError(new CompositeException(exceptions)); - } else { - s.onError(exceptions.peek()); - } - exceptions.clear(); - unsubscribe(); - } - } + public OperatorMergeDelayError() { + super(true); } + } diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java index 1acb6a13b2..86ca42f8cf 100644 --- a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java +++ b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java @@ -107,7 +107,6 @@ public void onError(final Throwable e) { if (terminated) { return; } - terminated = true; if (emitting) { if (queue == null) { queue = new FastList(); @@ -121,6 +120,9 @@ public void onError(final Throwable e) { } drainQueue(list); actual.onError(e); + synchronized(this) { + emitting = false; + } } @Override diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java index 29c2fdf1c9..2c60a89632 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java @@ -67,7 +67,8 @@ public void testErrorDelayed1() { verify(stringObserver, times(1)).onNext("three"); verify(stringObserver, times(1)).onNext("four"); verify(stringObserver, times(0)).onNext("five"); - verify(stringObserver, times(0)).onNext("six"); + // despite not expecting it ... we don't do anything to prevent it if the source Observable keeps sending after onError + verify(stringObserver, times(1)).onNext("six"); } @Test @@ -87,7 +88,8 @@ public void testErrorDelayed2() { verify(stringObserver, times(1)).onNext("three"); verify(stringObserver, times(1)).onNext("four"); verify(stringObserver, times(0)).onNext("five"); - verify(stringObserver, times(0)).onNext("six"); + // despite not expecting it ... we don't do anything to prevent it if the source Observable keeps sending after onError + verify(stringObserver, times(1)).onNext("six"); verify(stringObserver, times(1)).onNext("seven"); verify(stringObserver, times(1)).onNext("eight"); verify(stringObserver, times(1)).onNext("nine"); @@ -159,8 +161,6 @@ public void testErrorDelayed4WithThreading() { throw new RuntimeException(e); } - verify(stringObserver, times(1)).onError(any(NullPointerException.class)); - verify(stringObserver, never()).onCompleted(); verify(stringObserver, times(1)).onNext("one"); verify(stringObserver, times(1)).onNext("two"); verify(stringObserver, times(1)).onNext("three"); @@ -170,6 +170,8 @@ public void testErrorDelayed4WithThreading() { verify(stringObserver, times(1)).onNext("seven"); verify(stringObserver, times(1)).onNext("eight"); verify(stringObserver, times(1)).onNext("nine"); + verify(stringObserver, times(1)).onError(any(NullPointerException.class)); + verify(stringObserver, never()).onCompleted(); } @Test @@ -187,7 +189,8 @@ public void testCompositeErrorDelayed1() { verify(stringObserver, times(0)).onNext("three"); verify(stringObserver, times(1)).onNext("four"); verify(stringObserver, times(0)).onNext("five"); - verify(stringObserver, times(0)).onNext("six"); + // despite not expecting it ... we don't do anything to prevent it if the source Observable keeps sending after onError + verify(stringObserver, times(1)).onNext("six"); } @Test diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorFlatMapTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorFlatMapTest.java index 263dbba3cf..f317c737d9 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorFlatMapTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorFlatMapTest.java @@ -86,4 +86,33 @@ public Observable call(OnErrorThrowable t) { ts.assertReceivedOnNext(Arrays.asList("Value=1", "Error=2", "Error=3", "Error=4", "Error=5", "Value=6")); } + @Test + public void testOnErrorFlatMapAfterFlatMap() { + TestSubscriber ts = new TestSubscriber(); + Observable.from(1, 2, 3).flatMap(new Func1>() { + + @Override + public Observable call(Integer i) { + System.out.println("i: " + i); + if (i == 1) { + return Observable.error(new RuntimeException("error")); + } else { + return Observable.just(i); + } + } + + }).onErrorFlatMap(new Func1>() { + + @Override + public Observable call(OnErrorThrowable t) { + System.err.println(t); + return Observable.just(-1); + } + + }).subscribe(ts); + // we won't receive a terminal event so don't wait for one + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(-1, 2, 3)); + } + }