Skip to content

Commit

Permalink
Merge pull request ReactiveX#626 from akarnokd/OperationLatestAndFixes
Browse files Browse the repository at this point in the history
Added: BO.Latest, fixed: BO.next, BO.mostRecent, BO.toIterable
  • Loading branch information
benjchristensen committed Dec 23, 2013
2 parents bbd6d45 + 7a938be commit ae0ca4a
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 23 deletions.
16 changes: 16 additions & 0 deletions rxjava-core/src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.OperationLatest;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationToFuture;
Expand Down Expand Up @@ -297,6 +298,21 @@ public Iterable<T> next() {
return OperationNext.next(o);
}

/**
* Returns the latest item emitted by the underlying Observable, waiting if necessary
* for one to become available.
* <p>
* If the underlying observable produces items faster than the Iterator.next() takes them
* onNext events might be skipped, but onError or onCompleted events are not.
* <p>
* Note also that an onNext() directly followed by onCompleted() might hide the onNext() event.
*
* @return the Iterable sequence
*/
public Iterable<T> latest() {
return OperationLatest.latest(o);
}

/**
* If the {@link Observable} completes after emitting a single item, return that item,
* otherwise throw an IllegalArgumentException.
Expand Down
114 changes: 114 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationLatest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.util.Exceptions;

/**
* Wait for and iterate over the latest values of the source observable.
* If the source works faster than the iterator, values may be skipped, but
* not the onError or onCompleted events.
*/
public final class OperationLatest {
/** Utility class. */
private OperationLatest() { throw new IllegalStateException("No instances!"); }

public static <T> Iterable<T> latest(final Observable<? extends T> source) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
source.materialize().subscribe(lio);
return lio;
}
};
}

/** Observer of source, iterator for output. */
static final class LatestObserverIterator<T> implements Observer<Notification<? extends T>>, Iterator<T> {
final Semaphore notify = new Semaphore(0);
// observer's notification
final AtomicReference<Notification<? extends T>> reference = new AtomicReference<Notification<? extends T>>();
@Override
public void onNext(Notification<? extends T> args) {
boolean wasntAvailable = reference.getAndSet(args) == null;
if (wasntAvailable) {
notify.release();
}
}

@Override
public void onError(Throwable e) {
// not expected
}

@Override
public void onCompleted() {
// not expected
}

// iterator's notification
Notification<? extends T> iNotif;
@Override
public boolean hasNext() {
if (iNotif != null && iNotif.isOnError()) {
throw Exceptions.propagate(iNotif.getThrowable());
}
if (iNotif == null || !iNotif.isOnCompleted()) {
if (iNotif == null) {
try {
notify.acquire();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
iNotif = new Notification<T>(ex);
throw Exceptions.propagate(ex);
}

iNotif = reference.getAndSet(null);
if (iNotif.isOnError()) {
throw Exceptions.propagate(iNotif.getThrowable());
}
}
}
return !iNotif.isOnCompleted();
}

@Override
public T next() {
if (hasNext()) {
if (iNotif.isOnNext()) {
T v = iNotif.getValue();
iNotif = null;
return v;
}
}
throw new NoSuchElementException();
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read-only iterator.");
}

}
}
12 changes: 6 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperationMostRecent.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
*/
public final class OperationMostRecent {

public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, T initialValue) {

MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);

source.subscribe(mostRecentObserver);
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {

return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);

source.subscribe(mostRecentObserver);

return nextIterator;
}
};
Expand Down
13 changes: 6 additions & 7 deletions rxjava-core/src/main/java/rx/operators/OperationNext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,18 @@
public final class OperationNext {

public static <T> Iterable<T> next(final Observable<? extends T> items) {

NextObserver<T> nextObserver = new NextObserver<T>();
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);

items.materialize().subscribe(nextObserver);

return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
NextObserver<T> nextObserver = new NextObserver<T>();
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);

items.materialize().subscribe(nextObserver);

return nextIterator;
}
};

}

private static class NextIterator<T> implements Iterator<T> {
Expand Down
20 changes: 10 additions & 10 deletions rxjava-core/src/main/java/rx/operators/OperationToIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.operators;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

Expand Down Expand Up @@ -63,27 +64,26 @@ public void onNext(Notification<? extends T> args) {

return new Iterator<T>() {
private Notification<? extends T> buf;

@Override
public boolean hasNext() {
if (buf == null) {
buf = take();
}
if (buf.isOnError()) {
throw Exceptions.propagate(buf.getThrowable());
}
return !buf.isOnCompleted();
}

@Override
public T next() {
if (buf == null) {
buf = take();
if (hasNext()) {
T result = buf.getValue();
buf = null;
return result;
}
if (buf.isOnError()) {
throw Exceptions.propagate(buf.getThrowable());
}

T result = buf.getValue();
buf = null;
return result;
throw new NoSuchElementException();
}

private Notification<? extends T> take() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static org.junit.Assert.*;

import java.util.Iterator;
import java.util.NoSuchElementException;
import org.junit.Assert;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -201,6 +203,58 @@ public void testToIterable() {
assertEquals(false, it.hasNext());

}
@Test(expected = NoSuchElementException.class)
public void testToIterableNextOnly() {
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));

Iterator<Integer> it = obs.toIterable().iterator();

Assert.assertEquals((Integer)1, it.next());
Assert.assertEquals((Integer)2, it.next());
Assert.assertEquals((Integer)3, it.next());

it.next();
}

@Test(expected = NoSuchElementException.class)
public void testToIterableNextOnlyTwice() {
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));

Iterator<Integer> it = obs.toIterable().iterator();

Assert.assertEquals((Integer)1, it.next());
Assert.assertEquals((Integer)2, it.next());
Assert.assertEquals((Integer)3, it.next());

boolean exc = false;
try {
it.next();
} catch (NoSuchElementException ex) {
exc = true;
}
Assert.assertEquals(true, exc);

it.next();
}

@Test
public void testToIterableManyTimes() {
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));

Iterable<Integer> iter = obs.toIterable();

for (int j = 0; j < 3; j++) {
Iterator<Integer> it = iter.iterator();

Assert.assertTrue(it.hasNext());
Assert.assertEquals((Integer)1, it.next());
Assert.assertTrue(it.hasNext());
Assert.assertEquals((Integer)2, it.next());
Assert.assertTrue(it.hasNext());
Assert.assertEquals((Integer)3, it.next());
Assert.assertFalse(it.hasNext());
}
}

@Test(expected = TestException.class)
public void testToIterableWithException() {
Expand Down
Loading

0 comments on commit ae0ca4a

Please sign in to comment.