From 7e348cea61494e5e5b4ec64a9c8dbed97c25dc84 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 15 Oct 2013 14:11:31 +0800 Subject: [PATCH 1/5] Fixed testOnErrorViaHasNext in issue #383 --- .../src/main/java/rx/operators/OperationNext.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 73a3b3dc13..e04e41dd14 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -104,6 +104,7 @@ public void remove() { private static class NextObserver implements Observer> { private final BlockingQueue> buf = new ArrayBlockingQueue>(1); private final AtomicBoolean waiting = new AtomicBoolean(false); + private volatile boolean completed = false; @Override public void onCompleted() { @@ -139,7 +140,11 @@ public void await() { public boolean isCompleted(boolean rethrowExceptionIfExists) { Notification lastItem = buf.peek(); if (lastItem == null) { - return false; + // Fixed issue #383 testOnErrorViaHasNext fails sometimes. + // If the buf is empty, there are two cases: + // 1. The next item has not been emitted yet. + // 2. The error or completed notification is removed in takeNext method. + return completed; } if (lastItem.isOnError()) { @@ -157,10 +162,12 @@ public T takeNext() throws InterruptedException { Notification next = buf.take(); if (next.isOnError()) { + completed = true; throw Exceptions.propagate(next.getThrowable()); } if (next.isOnCompleted()) { + completed = true; throw new IllegalStateException("Observable is completed"); } From 7a6d6d76c961398d7d79a49fa003b8ba1deaf2b9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 16 Oct 2013 09:25:51 +0800 Subject: [PATCH 2/5] Removed the unnecessary 'catch' and 'fail' --- .../src/main/java/rx/operators/OperationNext.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index e04e41dd14..34d00954e9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -234,7 +234,7 @@ public void testOnError() throws Throwable { } @Test - public void testOnErrorViaHasNext() throws Throwable { + public void testOnErrorViaHasNext() throws InterruptedException, ExecutionException { Subject obs = PublishSubject.create(); Iterator it = next(obs).iterator(); @@ -253,15 +253,10 @@ public void testOnErrorViaHasNext() throws Throwable { obs.onError(new TestException()); // this should not throw an exception but instead just return false - try { - assertFalse(it.hasNext()); - } catch (Throwable e) { - fail("should not have received exception"); - e.printStackTrace(); - } + assertFalse(it.hasNext()); } - private Future nextAsync(final Iterator it) throws Throwable { + private Future nextAsync(final Iterator it) { return executor.submit(new Callable() { From eff612e599ddc0fc64ffcea454247a7fd2b90659 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 20 Oct 2013 00:58:09 +0800 Subject: [PATCH 3/5] Blocked 'hasNext' instead of 'next' until any notification arrives --- .../main/java/rx/operators/OperationNext.java | 229 +++++++++--------- 1 file changed, 120 insertions(+), 109 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 34d00954e9..1d2fc9f0b0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -15,17 +15,16 @@ */ package rx.operators; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -37,6 +36,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.concurrency.Schedulers; import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; @@ -68,6 +68,8 @@ public Iterator iterator() { private static class NextIterator implements Iterator { private final NextObserver observer; + private T next; + private boolean hasNext = true; private NextIterator(NextObserver observer) { this.observer = observer; @@ -75,24 +77,35 @@ private NextIterator(NextObserver observer) { @Override public boolean hasNext() { - return !observer.isCompleted(false); - } - - @Override - public T next() { - if (observer.isCompleted(true)) { - throw new IllegalStateException("Observable is completed"); + // Since an iterator should not be used in different thread, + // so we do not need any synchronization. + if(hasNext == false) { + return false; } - - observer.await(); - try { - return observer.takeNext(); + Notification nextNotification = observer.takeNext(); + if(nextNotification.isOnNext()) { + next = nextNotification.getValue(); + return true; + } + // If an observable is completed or fails, + // next always return null and hasNext always return false. + next = null; + hasNext = false; + if(nextNotification.isOnCompleted()) { + return false; + } + // onError + throw Exceptions.propagate(nextNotification.getThrowable()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Exceptions.propagate(e); } + } + @Override + public T next() { + return next; } @Override @@ -104,7 +117,6 @@ public void remove() { private static class NextObserver implements Observer> { private final BlockingQueue> buf = new ArrayBlockingQueue>(1); private final AtomicBoolean waiting = new AtomicBoolean(false); - private volatile boolean completed = false; @Override public void onCompleted() { @@ -125,7 +137,7 @@ public void onNext(Notification args) { Notification concurrentItem = buf.poll(); // in case if we won race condition with onComplete/onError method - if (!concurrentItem.isOnNext()) { + if (concurrentItem != null && !concurrentItem.isOnNext()) { toOffer = concurrentItem; } } @@ -133,138 +145,137 @@ public void onNext(Notification args) { } - public void await() { + public Notification takeNext() throws InterruptedException { waiting.set(true); + return buf.take(); } - public boolean isCompleted(boolean rethrowExceptionIfExists) { - Notification lastItem = buf.peek(); - if (lastItem == null) { - // Fixed issue #383 testOnErrorViaHasNext fails sometimes. - // If the buf is empty, there are two cases: - // 1. The next item has not been emitted yet. - // 2. The error or completed notification is removed in takeNext method. - return completed; - } + } - if (lastItem.isOnError()) { - if (rethrowExceptionIfExists) { - throw Exceptions.propagate(lastItem.getThrowable()); - } else { - return true; - } - } + public static class UnitTest { - return lastItem.isOnCompleted(); + private void fireOnNextInNewThread(final Subject o, final String value) { + new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore + } + o.onNext(value); + } + }.start(); } - public T takeNext() throws InterruptedException { - Notification next = buf.take(); - - if (next.isOnError()) { - completed = true; - throw Exceptions.propagate(next.getThrowable()); - } - - if (next.isOnCompleted()) { - completed = true; - throw new IllegalStateException("Observable is completed"); - } - - return next.getValue(); - + private void fireOnErrorInNewThread(final Subject o) { + new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore + } + o.onError(new TestException()); + } + }.start(); } - } - - public static class UnitTest { - private final ExecutorService executor = Executors.newSingleThreadExecutor(); @Test - public void testNext() throws Throwable { + public void testNext() { Subject obs = PublishSubject.create(); - Iterator it = next(obs).iterator(); - - assertTrue(it.hasNext()); - - Future next = nextAsync(it); - Thread.sleep(100); - obs.onNext("one"); - assertEquals("one", next.get()); - + fireOnNextInNewThread(obs, "one"); assertTrue(it.hasNext()); + assertEquals("one", it.next()); - next = nextAsync(it); - Thread.sleep(100); - obs.onNext("two"); - assertEquals("two", next.get()); - + fireOnNextInNewThread(obs, "two"); assertTrue(it.hasNext()); + assertEquals("two", it.next()); obs.onCompleted(); + assertFalse(it.hasNext()); + assertNull(it.next()); + // If the observable is completed, hasNext always returns false and next always returns null. assertFalse(it.hasNext()); + assertNull(it.next()); } - @Test(expected = TestException.class) - public void testOnError() throws Throwable { + @Test + public void testNextWithError() { Subject obs = PublishSubject.create(); - Iterator it = next(obs).iterator(); - + fireOnNextInNewThread(obs, "one"); assertTrue(it.hasNext()); + assertEquals("one", it.next()); - Future next = nextAsync(it); - Thread.sleep(100); - obs.onNext("one"); - assertEquals("one", next.get()); - - assertTrue(it.hasNext()); - - next = nextAsync(it); - Thread.sleep(100); - obs.onError(new TestException()); - + fireOnErrorInNewThread(obs); try { - next.get(); - } catch (ExecutionException e) { - throw e.getCause(); + it.hasNext(); + fail("Expected an TestException"); + } + catch(TestException e) { + // successful } + + // After the observable fails, hasNext always returns false and next always returns null. + assertFalse(it.hasNext()); + assertNull(it.next()); } @Test - public void testOnErrorViaHasNext() throws InterruptedException, ExecutionException { - Subject obs = PublishSubject.create(); + public void testNextWithEmpty() { + Observable obs = Observable.empty().observeOn(Schedulers.newThread()); + Iterator it = next(obs).iterator(); - Iterator it = next(obs).iterator(); - - assertTrue(it.hasNext()); + assertFalse(it.hasNext()); + assertNull(it.next()); - Future next = nextAsync(it); - Thread.sleep(100); - obs.onNext("one"); - assertEquals("one", next.get()); + // If the observable is completed, hasNext always returns false and next always returns null. + assertFalse(it.hasNext()); + assertNull(it.next()); + } - assertTrue(it.hasNext()); + @Test + public void testOnError() throws Throwable { + Subject obs = PublishSubject.create(); + Iterator it = next(obs).iterator(); - next = nextAsync(it); - Thread.sleep(100); obs.onError(new TestException()); + try { + it.hasNext(); + fail("Expected an TestException"); + } + catch(TestException e) { + // successful + } - // this should not throw an exception but instead just return false + // After the observable fails, hasNext always returns false and next always returns null. assertFalse(it.hasNext()); + assertNull(it.next()); } - private Future nextAsync(final Iterator it) { + @Test + public void testOnErrorInNewThread() { + Subject obs = PublishSubject.create(); + Iterator it = next(obs).iterator(); - return executor.submit(new Callable() { + fireOnErrorInNewThread(obs); - @Override - public String call() throws Exception { - return it.next(); - } - }); + try { + it.hasNext(); + fail("Expected an TestException"); + } + catch(TestException e) { + // successful + } + + // After the observable fails, hasNext always returns false and next always returns null. + assertFalse(it.hasNext()); + assertNull(it.next()); } @SuppressWarnings("serial") From 7b6cde9c6899ea38ba8c7267683d58602c99e71e Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Oct 2013 11:05:08 +0800 Subject: [PATCH 4/5] Followed the iterator contract --- .../main/java/rx/operators/OperationNext.java | 132 +++++++++++++++--- 1 file changed, 113 insertions(+), 19 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 1d2fc9f0b0..69ed2df778 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -17,11 +17,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -70,6 +70,7 @@ private static class NextIterator implements Iterator { private final NextObserver observer; private T next; private boolean hasNext = true; + private boolean isNextConsumed = true; private NextIterator(NextObserver observer) { this.observer = observer; @@ -80,23 +81,34 @@ public boolean hasNext() { // Since an iterator should not be used in different thread, // so we do not need any synchronization. if(hasNext == false) { + // the iterator has reached the end. return false; } + if(isNextConsumed == false) { + // next has not been used yet. + return true; + } + return moveToNext(); + } + + private boolean moveToNext() { try { Notification nextNotification = observer.takeNext(); if(nextNotification.isOnNext()) { + isNextConsumed = false; next = nextNotification.getValue(); return true; } // If an observable is completed or fails, - // next always return null and hasNext always return false. - next = null; + // hasNext() always return false. hasNext = false; if(nextNotification.isOnCompleted()) { return false; } - // onError - throw Exceptions.propagate(nextNotification.getThrowable()); + if(nextNotification.isOnError()) { + throw Exceptions.propagate(nextNotification.getThrowable()); + } + throw new IllegalStateException("Should not reach here"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Exceptions.propagate(e); @@ -105,7 +117,13 @@ public boolean hasNext() { @Override public T next() { - return next; + if(hasNext()) { + isNextConsumed = true; + return next; + } + else { + throw new NoSuchElementException("No more elements"); + } } @Override @@ -197,11 +215,21 @@ public void testNext() { obs.onCompleted(); assertFalse(it.hasNext()); - assertNull(it.next()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } - // If the observable is completed, hasNext always returns false and next always returns null. + // If the observable is completed, hasNext always returns false and next always throw a NoSuchElementException. assertFalse(it.hasNext()); - assertNull(it.next()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } } @Test @@ -221,9 +249,14 @@ public void testNextWithError() { // successful } - // After the observable fails, hasNext always returns false and next always returns null. + // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException. assertFalse(it.hasNext()); - assertNull(it.next()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } } @Test @@ -232,11 +265,21 @@ public void testNextWithEmpty() { Iterator it = next(obs).iterator(); assertFalse(it.hasNext()); - assertNull(it.next()); - - // If the observable is completed, hasNext always returns false and next always returns null. + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } + + // If the observable is completed, hasNext always returns false and next always throw a NoSuchElementException. assertFalse(it.hasNext()); - assertNull(it.next()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } } @Test @@ -253,9 +296,14 @@ public void testOnError() throws Throwable { // successful } - // After the observable fails, hasNext always returns false and next always returns null. + // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException. assertFalse(it.hasNext()); - assertNull(it.next()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } } @Test @@ -273,9 +321,53 @@ public void testOnErrorInNewThread() { // successful } - // After the observable fails, hasNext always returns false and next always returns null. + // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException. assertFalse(it.hasNext()); - assertNull(it.next()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } + } + + @Test + public void testNextWithOnlyUsingNextMethod() { + Subject obs = PublishSubject.create(); + Iterator it = next(obs).iterator(); + fireOnNextInNewThread(obs, "one"); + assertEquals("one", it.next()); + + fireOnNextInNewThread(obs, "two"); + assertEquals("two", it.next()); + + obs.onCompleted(); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } + } + + @Test + public void testNextWithCallingHasNextMultipleTimes() { + Subject obs = PublishSubject.create(); + Iterator it = next(obs).iterator(); + fireOnNextInNewThread(obs, "one"); + assertTrue(it.hasNext()); + assertTrue(it.hasNext()); + assertTrue(it.hasNext()); + assertTrue(it.hasNext()); + assertEquals("one", it.next()); + + obs.onCompleted(); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } } @SuppressWarnings("serial") @@ -342,6 +434,8 @@ public void run() { assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT); assertTrue(it.hasNext()); + int d = it.next(); + assertTrue(d > c); // shut down the thread running.set(false); From db6110ab4943fdb958a6a66a381dd4ad42c23c46 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 27 Oct 2013 22:39:50 +0800 Subject: [PATCH 5/5] Force 'hasNext' and 'next' throw the error once they have already thrown it before --- .../main/java/rx/operators/OperationNext.java | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 69ed2df778..43100e1096 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -71,6 +71,7 @@ private static class NextIterator implements Iterator { private T next; private boolean hasNext = true; private boolean isNextConsumed = true; + private Throwable error = null; private NextIterator(NextObserver observer) { this.observer = observer; @@ -78,6 +79,10 @@ private NextIterator(NextObserver observer) { @Override public boolean hasNext() { + if(error != null) { + // If any error has already been thrown, throw it again. + throw Exceptions.propagate(error); + } // Since an iterator should not be used in different thread, // so we do not need any synchronization. if(hasNext == false) { @@ -106,17 +111,23 @@ private boolean moveToNext() { return false; } if(nextNotification.isOnError()) { - throw Exceptions.propagate(nextNotification.getThrowable()); + error = nextNotification.getThrowable(); + throw Exceptions.propagate(error); } throw new IllegalStateException("Should not reach here"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw Exceptions.propagate(e); + error = e; + throw Exceptions.propagate(error); } } @Override public T next() { + if(error != null) { + // If any error has already been thrown, throw it again. + throw Exceptions.propagate(error); + } if(hasNext()) { isNextConsumed = true; return next; @@ -246,17 +257,9 @@ public void testNextWithError() { fail("Expected an TestException"); } catch(TestException e) { - // successful } - // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException. - assertFalse(it.hasNext()); - try { - it.next(); - fail("At the end of an iterator should throw a NoSuchElementException"); - } - catch(NoSuchElementException e){ - } + assertErrorAfterObservableFail(it); } @Test @@ -296,14 +299,7 @@ public void testOnError() throws Throwable { // successful } - // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException. - assertFalse(it.hasNext()); - try { - it.next(); - fail("At the end of an iterator should throw a NoSuchElementException"); - } - catch(NoSuchElementException e){ - } + assertErrorAfterObservableFail(it); } @Test @@ -321,13 +317,22 @@ public void testOnErrorInNewThread() { // successful } - // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException. - assertFalse(it.hasNext()); + assertErrorAfterObservableFail(it); + } + + private void assertErrorAfterObservableFail(Iterator it) { + // After the observable fails, hasNext and next always throw the exception. + try { + it.hasNext(); + fail("hasNext should throw a TestException"); + } + catch(TestException e){ + } try { it.next(); - fail("At the end of an iterator should throw a NoSuchElementException"); + fail("next should throw a TestException"); } - catch(NoSuchElementException e){ + catch(TestException e){ } }