Skip to content

Commit

Permalink
Merge pull request #203 from mairbek/all-operation
Browse files Browse the repository at this point in the history
All Operation Implemented
  • Loading branch information
benjchristensen committed Mar 26, 2013
2 parents 89dbc93 + a326072 commit bf5056f
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ def class ObservableTests {

}

@Test
public void testAll() {
Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
verify(a, times(1)).received(true);
}

def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {

public Subscription call(final Observer<Integer> observer) {
Expand Down
48 changes: 48 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.mockito.MockitoAnnotations;

import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
Expand Down Expand Up @@ -1665,6 +1666,35 @@ public T call(T t1, T t2) {
});
}

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param sequence an observable sequence whose elements to apply the predicate to.
* @param predicate a function to test each element for a condition.
* @param <T> the type of observable.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public static <T> Observable<Boolean> all(final Observable<T> sequence, final Func1<T, Boolean> predicate) {
return _create(OperationAll.all(sequence, predicate));
}

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param sequence an observable sequence whose elements to apply the predicate to.
* @param predicate a function to test each element for a condition.
* @param <T> the type of observable.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public static <T> Observable<Boolean> all(final Observable<T> sequence, Object predicate) {
final FuncN _f = Functions.from(predicate);

return all(sequence, new Func1<T, Boolean>() {
@Override
public Boolean call(T t) {
return (Boolean) _f.call(t);
}
});
}

/**
* Returns an Observable that skips the first <code>num</code> items emitted by the source
* Observable. You can ignore the first <code>num</code> items emitted by an Observable and attend
Expand Down Expand Up @@ -2973,6 +3003,24 @@ public Observable<T> scan(final T initialValue, final Object accumulator) {
return scan(this, initialValue, accumulator);
}

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param predicate a function to test each element for a condition.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public Observable<Boolean> all(Func1<T, Boolean> predicate) {
return all(this, predicate);
}

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param predicate a function to test each element for a condition.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public Observable<Boolean> all(Object predicate) {
return all(this, predicate);
}

/**
* Returns an Observable that skips the first <code>num</code> items emitted by the source
* Observable.
Expand Down
141 changes: 141 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationAll.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package rx.operators;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

public class OperationAll {

public static <T> Func1<Observer<Boolean>, Subscription> all(Observable<T> sequence, Func1<T, Boolean> predicate) {
return new AllObservable<T>(sequence, predicate);
}

private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscription> {
private final Observable<T> sequence;
private final Func1<T, Boolean> predicate;

private final AtomicBoolean status = new AtomicBoolean(true);
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();


private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {
this.sequence = sequence;
this.predicate = predicate;
}


@Override
public Subscription call(final Observer<Boolean> observer) {
return subscription.wrap(sequence.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
if (status.get()) {
observer.onNext(true);
observer.onCompleted();
}
}

@Override
public void onError(Exception e) {
observer.onError(e);
}

@Override
public void onNext(T args) {
boolean result = predicate.call(args);
boolean changed = status.compareAndSet(true, result);

if (changed && !result) {
observer.onNext(false);
observer.onCompleted();
subscription.unsubscribe();
}
}
}));
}
}

public static class UnitTest {

@Test
@SuppressWarnings("unchecked")
public void testAll() {
Observable<String> obs = Observable.from("one", "two", "six");

Observer<Boolean> observer = mock(Observer.class);
Observable.create(all(obs, new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
})).subscribe(observer);

verify(observer).onNext(true);
verify(observer).onCompleted();
verifyNoMoreInteractions(observer);
}

@Test
@SuppressWarnings("unchecked")
public void testNotAll() {
Observable<String> obs = Observable.from("one", "two", "three", "six");

Observer<Boolean> observer = mock(Observer.class);
Observable.create(all(obs, new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
})).subscribe(observer);

verify(observer).onNext(false);
verify(observer).onCompleted();
verifyNoMoreInteractions(observer);
}

@Test
@SuppressWarnings("unchecked")
public void testEmpty() {
Observable<String> obs = Observable.empty();

Observer<Boolean> observer = mock(Observer.class);
Observable.create(all(obs, new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
})).subscribe(observer);

verify(observer).onNext(true);
verify(observer).onCompleted();
verifyNoMoreInteractions(observer);
}

@Test
@SuppressWarnings("unchecked")
public void testError() {
Exception error = new Exception();
Observable<String> obs = Observable.error(error);

Observer<Boolean> observer = mock(Observer.class);
Observable.create(all(obs, new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
})).subscribe(observer);

verify(observer).onError(error);
verifyNoMoreInteractions(observer);
}
}
}

0 comments on commit bf5056f

Please sign in to comment.