From d99736a4f2dce15dd645173f6c397eb98313d459 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 16 Dec 2013 11:00:22 +0100 Subject: [PATCH 1/3] Added: BO.Latest, fixed: BO.next, BO.mostRecent, BO.toIterable --- .../rx/observables/BlockingObservable.java | 19 ++ .../java/rx/operators/OperationLatest.java | 184 ++++++++++++++++++ .../rx/operators/OperationMostRecent.java | 12 +- .../main/java/rx/operators/OperationNext.java | 13 +- .../rx/operators/OperationToIterator.java | 20 +- .../observables/BlockingObservableTest.java | 54 +++++ .../rx/operators/OperationLatestTest.java | 162 +++++++++++++++ .../rx/operators/OperationMostRecentTest.java | 30 +++ .../java/rx/operators/OperationNextTest.java | 24 +++ 9 files changed, 495 insertions(+), 23 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationLatest.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationLatestTest.java diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 8fe13c9e88..74cc4bbed8 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -23,6 +23,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.operators.OperationLatest; import rx.operators.OperationMostRecent; import rx.operators.OperationNext; import rx.operators.OperationToFuture; @@ -266,6 +267,24 @@ public Iterable next() { return OperationNext.next(o); } + /** + * Returns the latest item emitted by the underlying Observable, waiting if necessary + * for one to become available. + *

+ * If the underlying observable produces items faster than the Iterator.next() takes them + * onNext events might be skipped, but onError or onCompleted events are not. + *

+ * The difference between BlockingObservable.next() and BlockingObservable.latest() is that + * the former does not overwrite untaken values whereas the latter does. + *

+ * Note also that an onNext() directly followed by onCompleted() might hide the given onNext() event. + * + * @return the Iterable sequence + */ + public Iterable latest() { + return OperationLatest.latest(o); + } + /** * If the {@link Observable} completes after emitting a single item, return that item, * otherwise throw an exception. diff --git a/rxjava-core/src/main/java/rx/operators/OperationLatest.java b/rxjava-core/src/main/java/rx/operators/OperationLatest.java new file mode 100644 index 0000000000..57f0cafce0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationLatest.java @@ -0,0 +1,184 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.util.Exceptions; + +/** + * Wait for and iterate over the latest values of the source observable. + * If the source works faster than the iterator, values may be skipped, but + * not the onError or onCompleted events. + */ +public final class OperationLatest { + /** Utility class. */ + private OperationLatest() { throw new IllegalStateException("No instances!"); } + + public static Iterable latest(final Observable source) { + return new Iterable() { + @Override + public Iterator iterator() { + LatestObserverIterator lio = new LatestObserverIterator(); + source.subscribe(lio); + return lio; + } + }; + } + + /** Observer of source, iterator for output. */ + static final class LatestObserverIterator implements Observer, Iterator { + final Lock lock = new ReentrantLock(); + final Semaphore notify = new Semaphore(0); + // observer's values + boolean oHasValue; + Notification.Kind oKind; + T oValue; + Throwable oError; + @Override + public void onNext(T args) { + boolean wasntAvailable; + lock.lock(); + try { + wasntAvailable = !oHasValue; + oHasValue = true; + oValue = args; + oKind = Notification.Kind.OnNext; + } finally { + lock.unlock(); + } + if (wasntAvailable) { + notify.release(); + } + } + + @Override + public void onError(Throwable e) { + boolean wasntAvailable; + lock.lock(); + try { + wasntAvailable = !oHasValue; + oHasValue = true; + oValue = null; + oError = e; + oKind = Notification.Kind.OnError; + } finally { + lock.unlock(); + } + if (wasntAvailable) { + notify.release(); + } + } + + @Override + public void onCompleted() { + boolean wasntAvailable; + lock.lock(); + try { + wasntAvailable = !oHasValue; + oHasValue = true; + oValue = null; + oKind = Notification.Kind.OnCompleted; + } finally { + lock.unlock(); + } + if (wasntAvailable) { + notify.release(); + } + } + + // iterator's values + + boolean iDone; + boolean iHasValue; + T iValue; + Throwable iError; + Notification.Kind iKind; + + @Override + public boolean hasNext() { + if (iError != null) { + Exceptions.propagate(iError); + } + if (!iDone) { + if (!iHasValue) { + try { + notify.acquire(); + } catch (InterruptedException ex) { + iError = ex; + iHasValue = true; + iKind = Notification.Kind.OnError; + return true; + } + + lock.lock(); + try { + iKind = oKind; + switch (oKind) { + case OnNext: + iValue = oValue; + oValue = null; // handover + break; + case OnError: + iError = oError; + oError = null; // handover + if (iError != null) { + Exceptions.propagate(iError); + } + break; + case OnCompleted: + iDone = true; + break; + } + oHasValue = false; + } finally { + lock.unlock(); + } + iHasValue = true; + } + } + return !iDone; + } + + @Override + public T next() { + if (iKind == Notification.Kind.OnError) { + Exceptions.propagate(iError); + } + if (hasNext()) { + if (iKind == Notification.Kind.OnNext) { + T v = iValue; + iValue = null; // handover + iHasValue = false; + return v; + } + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only iterator."); + } + + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java index 15406aeece..4e90d3e543 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java @@ -31,16 +31,16 @@ */ public final class OperationMostRecent { - public static Iterable mostRecent(final Observable source, T initialValue) { - - MostRecentObserver mostRecentObserver = new MostRecentObserver(initialValue); - final MostRecentIterator nextIterator = new MostRecentIterator(mostRecentObserver); - - source.subscribe(mostRecentObserver); + public static Iterable mostRecent(final Observable source, final T initialValue) { return new Iterable() { @Override public Iterator iterator() { + MostRecentObserver mostRecentObserver = new MostRecentObserver(initialValue); + final MostRecentIterator nextIterator = new MostRecentIterator(mostRecentObserver); + + source.subscribe(mostRecentObserver); + return nextIterator; } }; diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 4f5e9dfd31..82e6587b20 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -34,19 +34,18 @@ public final class OperationNext { public static Iterable next(final Observable items) { - - NextObserver nextObserver = new NextObserver(); - final NextIterator nextIterator = new NextIterator(nextObserver); - - items.materialize().subscribe(nextObserver); - return new Iterable() { @Override public Iterator iterator() { + NextObserver nextObserver = new NextObserver(); + final NextIterator nextIterator = new NextIterator(nextObserver); + + items.materialize().subscribe(nextObserver); + return nextIterator; } }; - + } private static class NextIterator implements Iterator { diff --git a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java index 2fcd51872e..9760805fdc 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java @@ -16,6 +16,7 @@ package rx.operators; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -63,27 +64,26 @@ public void onNext(Notification args) { return new Iterator() { private Notification buf; - + @Override public boolean hasNext() { if (buf == null) { buf = take(); } + if (buf.isOnError()) { + throw Exceptions.propagate(buf.getThrowable()); + } return !buf.isOnCompleted(); } @Override public T next() { - if (buf == null) { - buf = take(); + if (hasNext()) { + T result = buf.getValue(); + buf = null; + return result; } - if (buf.isOnError()) { - throw Exceptions.propagate(buf.getThrowable()); - } - - T result = buf.getValue(); - buf = null; - return result; + throw new NoSuchElementException(); } private Notification take() { diff --git a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java index f3e05189c1..008aedc6dd 100644 --- a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java +++ b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java @@ -18,6 +18,8 @@ import static org.junit.Assert.*; import java.util.Iterator; +import java.util.NoSuchElementException; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -201,6 +203,58 @@ public void testToIterable() { assertEquals(false, it.hasNext()); } + @Test(expected = NoSuchElementException.class) + public void testToIterableNextOnly() { + BlockingObservable obs = BlockingObservable.from(Observable.from(1, 2, 3)); + + Iterator it = obs.toIterable().iterator(); + + Assert.assertEquals((Integer)1, it.next()); + Assert.assertEquals((Integer)2, it.next()); + Assert.assertEquals((Integer)3, it.next()); + + it.next(); + } + + @Test(expected = NoSuchElementException.class) + public void testToIterableNextOnlyTwice() { + BlockingObservable obs = BlockingObservable.from(Observable.from(1, 2, 3)); + + Iterator it = obs.toIterable().iterator(); + + Assert.assertEquals((Integer)1, it.next()); + Assert.assertEquals((Integer)2, it.next()); + Assert.assertEquals((Integer)3, it.next()); + + boolean exc = false; + try { + it.next(); + } catch (NoSuchElementException ex) { + exc = true; + } + Assert.assertEquals(true, exc); + + it.next(); + } + + @Test + public void testToIterableManyTimes() { + BlockingObservable obs = BlockingObservable.from(Observable.from(1, 2, 3)); + + Iterable iter = obs.toIterable(); + + for (int j = 0; j < 3; j++) { + Iterator it = iter.iterator(); + + Assert.assertTrue(it.hasNext()); + Assert.assertEquals((Integer)1, it.next()); + Assert.assertTrue(it.hasNext()); + Assert.assertEquals((Integer)2, it.next()); + Assert.assertTrue(it.hasNext()); + Assert.assertEquals((Integer)3, it.next()); + Assert.assertFalse(it.hasNext()); + } + } @Test(expected = TestException.class) public void testToIterableWithException() { diff --git a/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java b/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java new file mode 100644 index 0000000000..e7fd9645c0 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java @@ -0,0 +1,162 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import junit.framework.Assert; +import org.junit.Test; +import rx.Observable; +import rx.observables.BlockingObservable; +import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; + +public class OperationLatestTest { + @Test(timeout = 1000) + public void testSimple() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlockingObservable(); + + Iterable iter = source.latest(); + + Iterator it = iter.iterator(); + + // only 9 because take(10) will immediately call onCompleted when receiving the 10th item + // which onCompleted will overwrite the previous value + for (int i = 0; i < 9; i++) { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(true, it.hasNext()); + + Assert.assertEquals(Long.valueOf(i), it.next()); + } + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + Assert.assertEquals(false, it.hasNext()); + } + @Test(timeout = 1000) + public void testSameSourceMultipleIterators() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlockingObservable(); + + Iterable iter = source.latest(); + + for (int j = 0; j < 3; j++) { + Iterator it = iter.iterator(); + + // only 9 because take(10) will immediately call onCompleted when receiving the 10th item + // which onCompleted will overwrite the previous value + for (int i = 0; i < 9; i++) { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(true, it.hasNext()); + + Assert.assertEquals(Long.valueOf(i), it.next()); + } + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + Assert.assertEquals(false, it.hasNext()); + } + } + @Test(timeout = 1000, expected = NoSuchElementException.class) + public void testEmpty() { + BlockingObservable source = Observable.empty().toBlockingObservable(); + + Iterable iter = source.latest(); + + Iterator it = iter.iterator(); + + Assert.assertEquals(false, it.hasNext()); + + it.next(); + } + @Test(timeout = 1000, expected = NoSuchElementException.class) + public void testSimpleJustNext() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlockingObservable(); + + Iterable iter = source.latest(); + + Iterator it = iter.iterator(); + + // only 9 because take(10) will immediately call onCompleted when receiving the 10th item + // which onCompleted will overwrite the previous value + for (int i = 0; i < 10; i++) { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(Long.valueOf(i), it.next()); + } + } + @Test(timeout = 1000, expected = RuntimeException.class) + public void testHasNextThrows() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.error(new RuntimeException("Forced failure!"), scheduler).toBlockingObservable(); + + Iterable iter = source.latest(); + + Iterator it = iter.iterator(); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + it.hasNext(); + } + @Test(timeout = 1000, expected = RuntimeException.class) + public void testNextThrows() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.error(new RuntimeException("Forced failure!"), scheduler).toBlockingObservable(); + + Iterable iter = source.latest(); + Iterator it = iter.iterator(); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + it.next(); + } + @Test(timeout = 1000) + public void testFasterSource() { + PublishSubject source = PublishSubject.create(); + BlockingObservable blocker = source.toBlockingObservable(); + + Iterable iter = blocker.latest(); + Iterator it = iter.iterator(); + + source.onNext(1); + + Assert.assertEquals(Integer.valueOf(1), it.next()); + + source.onNext(2); + source.onNext(3); + + Assert.assertEquals(Integer.valueOf(3), it.next()); + + source.onNext(4); + source.onNext(5); + source.onNext(6); + + Assert.assertEquals(Integer.valueOf(6), it.next()); + + source.onNext(7); + source.onCompleted(); + + Assert.assertEquals(false, it.hasNext()); + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationMostRecentTest.java b/rxjava-core/src/test/java/rx/operators/OperationMostRecentTest.java index d20f79e819..35382273fc 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMostRecentTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationMostRecentTest.java @@ -19,8 +19,13 @@ import static rx.operators.OperationMostRecent.*; import java.util.Iterator; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; import org.junit.Test; +import rx.Observable; +import rx.observables.BlockingObservable; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; import rx.subjects.Subject; @@ -71,4 +76,29 @@ public void testMostRecentWithException() { private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } + + @Test(timeout = 1000) + public void testSingleSourceManyIterators() { + TestScheduler scheduler = new TestScheduler(); + BlockingObservable source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlockingObservable(); + + Iterable iter = source.mostRecent(-1L); + + for (int j = 0; j < 3; j++) { + Iterator it = iter.iterator(); + + Assert.assertEquals(Long.valueOf(-1), it.next()); + + for (int i = 0; i < 9; i++) { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(true, it.hasNext()); + Assert.assertEquals(Long.valueOf(i), it.next()); + } + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(false, it.hasNext()); + } + + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java index 8a5de26d1d..29b9ea3383 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java @@ -24,13 +24,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; import org.junit.Test; import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.observables.BlockingObservable; import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; @@ -293,4 +296,25 @@ public void run() { System.out.println("a: " + a + " b: " + b + " c: " + c); } + @Test(timeout = 2000) + public void testSingleSourceManyIterators() throws InterruptedException { + BlockingObservable source = Observable.interval(50, TimeUnit.MILLISECONDS).take(10).toBlockingObservable(); + + Iterable iter = source.next(); + + for (int j = 0; j < 3; j++) { + Iterator it = iter.iterator(); + + for (int i = 0; i < 9; i++) { + // hasNext has to set the waiting to true, otherwise, all onNext will be skipped + Assert.assertEquals(true, it.hasNext()); + Assert.assertEquals(Long.valueOf(i), it.next()); + } + + Thread.sleep(100); + + Assert.assertEquals(false, it.hasNext()); + } + + } } From 8f1f86d5af809f208c6118cda07c8ded2a4e6628 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 17 Dec 2013 09:38:34 +0100 Subject: [PATCH 2/3] Fixes based on @zsxwing's suggestions. --- .../rx/observables/BlockingObservable.java | 5 +- .../java/rx/operators/OperationLatest.java | 120 ++++-------------- .../rx/operators/OperationLatestTest.java | 2 +- 3 files changed, 27 insertions(+), 100 deletions(-) diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 74cc4bbed8..7427c24bdb 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -274,10 +274,7 @@ public Iterable next() { * If the underlying observable produces items faster than the Iterator.next() takes them * onNext events might be skipped, but onError or onCompleted events are not. *

- * The difference between BlockingObservable.next() and BlockingObservable.latest() is that - * the former does not overwrite untaken values whereas the latter does. - *

- * Note also that an onNext() directly followed by onCompleted() might hide the given onNext() event. + * Note also that an onNext() directly followed by onCompleted() might hide the onNext() event. * * @return the Iterable sequence */ diff --git a/rxjava-core/src/main/java/rx/operators/OperationLatest.java b/rxjava-core/src/main/java/rx/operators/OperationLatest.java index 57f0cafce0..556e007a4a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationLatest.java @@ -18,8 +18,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.Semaphore; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; import rx.Notification; import rx.Observable; import rx.Observer; @@ -39,33 +38,20 @@ public static Iterable latest(final Observable source) { @Override public Iterator iterator() { LatestObserverIterator lio = new LatestObserverIterator(); - source.subscribe(lio); + source.materialize().subscribe(lio); return lio; } }; } /** Observer of source, iterator for output. */ - static final class LatestObserverIterator implements Observer, Iterator { - final Lock lock = new ReentrantLock(); + static final class LatestObserverIterator implements Observer>, Iterator { final Semaphore notify = new Semaphore(0); - // observer's values - boolean oHasValue; - Notification.Kind oKind; - T oValue; - Throwable oError; + // observer's notification + final AtomicReference> reference = new AtomicReference>(); @Override - public void onNext(T args) { - boolean wasntAvailable; - lock.lock(); - try { - wasntAvailable = !oHasValue; - oHasValue = true; - oValue = args; - oKind = Notification.Kind.OnNext; - } finally { - lock.unlock(); - } + public void onNext(Notification args) { + boolean wasntAvailable = reference.getAndSet(args) == null; if (wasntAvailable) { notify.release(); } @@ -73,102 +59,46 @@ public void onNext(T args) { @Override public void onError(Throwable e) { - boolean wasntAvailable; - lock.lock(); - try { - wasntAvailable = !oHasValue; - oHasValue = true; - oValue = null; - oError = e; - oKind = Notification.Kind.OnError; - } finally { - lock.unlock(); - } - if (wasntAvailable) { - notify.release(); - } + // not expected } @Override public void onCompleted() { - boolean wasntAvailable; - lock.lock(); - try { - wasntAvailable = !oHasValue; - oHasValue = true; - oValue = null; - oKind = Notification.Kind.OnCompleted; - } finally { - lock.unlock(); - } - if (wasntAvailable) { - notify.release(); - } + // not expected } - // iterator's values - - boolean iDone; - boolean iHasValue; - T iValue; - Throwable iError; - Notification.Kind iKind; - + // iterator's notification + Notification iNotif; @Override public boolean hasNext() { - if (iError != null) { - Exceptions.propagate(iError); + if (iNotif != null && iNotif.isOnError()) { + throw Exceptions.propagate(iNotif.getThrowable()); } - if (!iDone) { - if (!iHasValue) { + if (iNotif == null || !iNotif.isOnCompleted()) { + if (iNotif == null) { try { notify.acquire(); } catch (InterruptedException ex) { - iError = ex; - iHasValue = true; - iKind = Notification.Kind.OnError; - return true; + Thread.currentThread().interrupt(); + iNotif = new Notification(ex); + throw Exceptions.propagate(ex); } - lock.lock(); - try { - iKind = oKind; - switch (oKind) { - case OnNext: - iValue = oValue; - oValue = null; // handover - break; - case OnError: - iError = oError; - oError = null; // handover - if (iError != null) { - Exceptions.propagate(iError); - } - break; - case OnCompleted: - iDone = true; - break; - } - oHasValue = false; - } finally { - lock.unlock(); + iNotif = reference.getAndSet(null); + if (iNotif.isOnError()) { + throw Exceptions.propagate(iNotif.getThrowable()); } - iHasValue = true; } } - return !iDone; + return !iNotif.isOnCompleted(); } @Override public T next() { - if (iKind == Notification.Kind.OnError) { - Exceptions.propagate(iError); - } if (hasNext()) { - if (iKind == Notification.Kind.OnNext) { - T v = iValue; - iValue = null; // handover - iHasValue = false; + if (iNotif.isOnNext()) { + T v = iNotif.getValue(); + iNotif = null; return v; } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java b/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java index e7fd9645c0..3dde8610f9 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java @@ -104,7 +104,7 @@ public void testSimpleJustNext() { Assert.assertEquals(Long.valueOf(i), it.next()); } } - @Test(timeout = 1000, expected = RuntimeException.class) + @Test(/*timeout = 1000, */expected = RuntimeException.class) public void testHasNextThrows() { TestScheduler scheduler = new TestScheduler(); From 7a938be958ee502ee25edef939d9b789c35d596a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 17 Dec 2013 10:06:23 +0100 Subject: [PATCH 3/3] Increased time delay in test. --- .../src/test/java/rx/operators/OperationNextTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java index 29b9ea3383..acbf5ed52e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java @@ -296,9 +296,9 @@ public void run() { System.out.println("a: " + a + " b: " + b + " c: " + c); } - @Test(timeout = 2000) + @Test(timeout = 8000) public void testSingleSourceManyIterators() throws InterruptedException { - BlockingObservable source = Observable.interval(50, TimeUnit.MILLISECONDS).take(10).toBlockingObservable(); + BlockingObservable source = Observable.interval(200, TimeUnit.MILLISECONDS).take(10).toBlockingObservable(); Iterable iter = source.next(); @@ -311,7 +311,7 @@ public void testSingleSourceManyIterators() throws InterruptedException { Assert.assertEquals(Long.valueOf(i), it.next()); } - Thread.sleep(100); + Thread.sleep(400); Assert.assertEquals(false, it.hasNext()); }