diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 10dda41461..d0ee285ae4 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -275,6 +275,12 @@ def class ObservableTests { } + @Test + public void testAll() { + Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) }); + verify(a, times(1)).received(true); + } + def class AsyncObservable implements Func1, Subscription> { public Subscription call(final Observer observer) { diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5d11c7b9ed..d24e48106c 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -36,6 +36,7 @@ import org.mockito.MockitoAnnotations; import rx.observables.GroupedObservable; +import rx.operators.OperationAll; import rx.operators.OperationConcat; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; @@ -1665,6 +1666,35 @@ public T call(T t1, T t2) { }); } + /** + * Determines whether all elements of an observable sequence satisfies a condition. + * @param sequence an observable sequence whose elements to apply the predicate to. + * @param predicate a function to test each element for a condition. + * @param the type of observable. + * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. + */ + public static Observable all(final Observable sequence, final Func1 predicate) { + return _create(OperationAll.all(sequence, predicate)); + } + + /** + * Determines whether all elements of an observable sequence satisfies a condition. + * @param sequence an observable sequence whose elements to apply the predicate to. + * @param predicate a function to test each element for a condition. + * @param the type of observable. + * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. + */ + public static Observable all(final Observable sequence, Object predicate) { + final FuncN _f = Functions.from(predicate); + + return all(sequence, new Func1() { + @Override + public Boolean call(T t) { + return (Boolean) _f.call(t); + } + }); + } + /** * Returns an Observable that skips the first num items emitted by the source * Observable. You can ignore the first num items emitted by an Observable and attend @@ -2973,6 +3003,24 @@ public Observable scan(final T initialValue, final Object accumulator) { return scan(this, initialValue, accumulator); } + /** + * Determines whether all elements of an observable sequence satisfies a condition. + * @param predicate a function to test each element for a condition. + * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. + */ + public Observable all(Func1 predicate) { + return all(this, predicate); + } + + /** + * Determines whether all elements of an observable sequence satisfies a condition. + * @param predicate a function to test each element for a condition. + * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. + */ + public Observable all(Object predicate) { + return all(this, predicate); + } + /** * Returns an Observable that skips the first num items emitted by the source * Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationAll.java b/rxjava-core/src/main/java/rx/operators/OperationAll.java new file mode 100644 index 0000000000..4ce29069a2 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationAll.java @@ -0,0 +1,141 @@ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func1; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class OperationAll { + + public static Func1, Subscription> all(Observable sequence, Func1 predicate) { + return new AllObservable(sequence, predicate); + } + + private static class AllObservable implements Func1, Subscription> { + private final Observable sequence; + private final Func1 predicate; + + private final AtomicBoolean status = new AtomicBoolean(true); + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + + private AllObservable(Observable sequence, Func1 predicate) { + this.sequence = sequence; + this.predicate = predicate; + } + + + @Override + public Subscription call(final Observer observer) { + return subscription.wrap(sequence.subscribe(new Observer() { + @Override + public void onCompleted() { + if (status.get()) { + observer.onNext(true); + observer.onCompleted(); + } + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(T args) { + boolean result = predicate.call(args); + boolean changed = status.compareAndSet(true, result); + + if (changed && !result) { + observer.onNext(false); + observer.onCompleted(); + subscription.unsubscribe(); + } + } + })); + } + } + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testAll() { + Observable obs = Observable.from("one", "two", "six"); + + Observer observer = mock(Observer.class); + Observable.create(all(obs, new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 3; + } + })).subscribe(observer); + + verify(observer).onNext(true); + verify(observer).onCompleted(); + verifyNoMoreInteractions(observer); + } + + @Test + @SuppressWarnings("unchecked") + public void testNotAll() { + Observable obs = Observable.from("one", "two", "three", "six"); + + Observer observer = mock(Observer.class); + Observable.create(all(obs, new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 3; + } + })).subscribe(observer); + + verify(observer).onNext(false); + verify(observer).onCompleted(); + verifyNoMoreInteractions(observer); + } + + @Test + @SuppressWarnings("unchecked") + public void testEmpty() { + Observable obs = Observable.empty(); + + Observer observer = mock(Observer.class); + Observable.create(all(obs, new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 3; + } + })).subscribe(observer); + + verify(observer).onNext(true); + verify(observer).onCompleted(); + verifyNoMoreInteractions(observer); + } + + @Test + @SuppressWarnings("unchecked") + public void testError() { + Exception error = new Exception(); + Observable obs = Observable.error(error); + + Observer observer = mock(Observer.class); + Observable.create(all(obs, new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 3; + } + })).subscribe(observer); + + verify(observer).onError(error); + verifyNoMoreInteractions(observer); + } + } +}