diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java index fce80c00a9..500f5a34a7 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java @@ -98,7 +98,9 @@ public void request(long n) { } if (!it.hasNext()) { - o.onCompleted(); + if (!o.isUnsubscribed()) { + o.onCompleted(); + } return; } if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) { diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 9f3f36390b..6584f4fac6 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -186,7 +186,8 @@ private void handleNewSource(Observable t) { InnerSubscriber i = new InnerSubscriber(this, producerIfNeeded); i.sindex = childrenSubscribers.add(i); t.unsafeSubscribe(i); - request(1); + if (!isUnsubscribed()) + request(1); } private void handleScalarSynchronousObservable(ScalarSynchronousObservable t) {