From ede6906782963f31d0e47b7bb109d207ba59459d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 19 Jan 2014 17:01:41 +0800 Subject: [PATCH] Fixed the issue that 'zip' calls 'onCompleted' twice --- .../main/java/rx/operators/OperationZip.java | 22 +++++++-- .../java/rx/operators/OperationZipTest.java | 45 +++++++++++++++++++ 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 06a2f111e2..89ce7bf75c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -142,10 +142,24 @@ public ManyObservables( this.selector = selector; } + @SuppressWarnings("unchecked") @Override public Subscription onSubscribe(final Observer observer) { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); final CompositeSubscription composite = new CompositeSubscription(); + subscription.wrap(composite); + + final SafeObserver safeObserver; + // prevent double-wrapping + if(observer instanceof SafeObserver) { + safeObserver = (SafeObserver) observer; + } + else { + // issue: https://groups.google.com/forum/#!topic/rxjava/79cWTv3TFp0 + // For an internal observer, we need to wrap it with a SafeObserver. + safeObserver = new SafeObserver(subscription, observer); + } final ReadWriteLock rwLock = new ReentrantReadWriteLock(true); @@ -154,17 +168,17 @@ public Subscription onSubscribe(final Observer observer) { Observer> o2 = new Observer>() { @Override public void onCompleted() { - observer.onCompleted(); + safeObserver.onCompleted(); } @Override public void onError(Throwable t) { - observer.onError(t); + safeObserver.onError(t); } @Override public void onNext(List value) { - observer.onNext(selector.call(value.toArray(new Object[value.size()]))); + safeObserver.onNext(selector.call(value.toArray(new Object[value.size()]))); } }; @@ -180,7 +194,7 @@ public void onNext(List value) { io.connect(); } - return composite; + return subscription; } /** diff --git a/rxjava-core/src/test/java/rx/operators/OperationZipTest.java b/rxjava-core/src/test/java/rx/operators/OperationZipTest.java index 2c7dd25dbb..892611f147 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationZipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationZipTest.java @@ -1018,4 +1018,49 @@ public void remove() { verify(o, never()).onCompleted(); } + + @Test + public void testZipWithOnCompletedTwice() { + // issue: https://groups.google.com/forum/#!topic/rxjava/79cWTv3TFp0 + // The problem is the original "zip" implementation does not wrap + // an internal observer with a SafeObserver. However, in the "zip", + // it may calls "onCompleted" twice. That breaks the Rx contract. + + // This test tries to emulate this case. + // As "mock(Observer.class)" will create an instance in the package "rx", + // we need to wrap "mock(Observer.class)" with an observer instance + // which is in the package "rx.operators". + @SuppressWarnings("unchecked") + final Observer observer = mock(Observer.class); + + Observable.zip(Observable.from(1), + Observable.from(1), new Func2() { + @Override + public Integer call(Integer a, Integer b) { + return a + b; + } + }).subscribe(new Observer() { + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(Integer args) { + observer.onNext(args); + } + + }); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } }