Skip to content

Commit

Permalink
Merge branch 'upstream' into issue43
Browse files Browse the repository at this point in the history
  • Loading branch information
abliss committed Mar 29, 2013
2 parents 232612c + f1c54b5 commit fb0e8b0
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ def class ObservableTests {

}

@Test
public void testAll() {
Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
verify(a, times(1)).received(true);
}

def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {

public Subscription call(final Observer<Integer> observer) {
Expand Down
1 change: 0 additions & 1 deletion rxjava-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ targetCompatibility = JavaVersion.VERSION_1_6

dependencies {
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
provided 'junit:junit:4.10'
provided 'org.mockito:mockito-core:1.8.5'
}
Expand Down
48 changes: 48 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.mockito.MockitoAnnotations;

import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
Expand Down Expand Up @@ -1678,6 +1679,35 @@ public T call(T t1, T t2) {
});
}

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param sequence an observable sequence whose elements to apply the predicate to.
* @param predicate a function to test each element for a condition.
* @param <T> the type of observable.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public static <T> Observable<Boolean> all(final Observable<T> sequence, final Func1<T, Boolean> predicate) {
return _create(OperationAll.all(sequence, predicate));
}

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param sequence an observable sequence whose elements to apply the predicate to.
* @param predicate a function to test each element for a condition.
* @param <T> the type of observable.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public static <T> Observable<Boolean> all(final Observable<T> sequence, Object predicate) {
final FuncN _f = Functions.from(predicate);

return all(sequence, new Func1<T, Boolean>() {
@Override
public Boolean call(T t) {
return (Boolean) _f.call(t);
}
});
}

/**
* Returns an Observable that skips the first <code>num</code> items emitted by the source
* Observable. You can ignore the first <code>num</code> items emitted by an Observable and attend
Expand Down Expand Up @@ -2997,6 +3027,24 @@ public Observable<T> scan(final T initialValue, final Object accumulator) {
return scan(this, initialValue, accumulator);
}

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param predicate a function to test each element for a condition.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public Observable<Boolean> all(Func1<T, Boolean> predicate) {
return all(this, predicate);
}

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param predicate a function to test each element for a condition.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public Observable<Boolean> all(Object predicate) {
return all(this, predicate);
}

/**
* Returns an Observable that skips the first <code>num</code> items emitted by the source
* Observable.
Expand Down
141 changes: 141 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationAll.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package rx.operators;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

public class OperationAll {

public static <T> Func1<Observer<Boolean>, Subscription> all(Observable<T> sequence, Func1<T, Boolean> predicate) {
return new AllObservable<T>(sequence, predicate);
}

private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscription> {
private final Observable<T> sequence;
private final Func1<T, Boolean> predicate;

private final AtomicBoolean status = new AtomicBoolean(true);
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();


private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {
this.sequence = sequence;
this.predicate = predicate;
}


@Override
public Subscription call(final Observer<Boolean> observer) {
return subscription.wrap(sequence.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
if (status.get()) {
observer.onNext(true);
observer.onCompleted();
}
}

@Override
public void onError(Exception e) {
observer.onError(e);
}

@Override
public void onNext(T args) {
boolean result = predicate.call(args);
boolean changed = status.compareAndSet(true, result);

if (changed && !result) {
observer.onNext(false);
observer.onCompleted();
subscription.unsubscribe();
}
}
}));
}
}

public static class UnitTest {

@Test
@SuppressWarnings("unchecked")
public void testAll() {
Observable<String> obs = Observable.from("one", "two", "six");

Observer<Boolean> observer = mock(Observer.class);
Observable.create(all(obs, new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
})).subscribe(observer);

verify(observer).onNext(true);
verify(observer).onCompleted();
verifyNoMoreInteractions(observer);
}

@Test
@SuppressWarnings("unchecked")
public void testNotAll() {
Observable<String> obs = Observable.from("one", "two", "three", "six");

Observer<Boolean> observer = mock(Observer.class);
Observable.create(all(obs, new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
})).subscribe(observer);

verify(observer).onNext(false);
verify(observer).onCompleted();
verifyNoMoreInteractions(observer);
}

@Test
@SuppressWarnings("unchecked")
public void testEmpty() {
Observable<String> obs = Observable.empty();

Observer<Boolean> observer = mock(Observer.class);
Observable.create(all(obs, new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
})).subscribe(observer);

verify(observer).onNext(true);
verify(observer).onCompleted();
verifyNoMoreInteractions(observer);
}

@Test
@SuppressWarnings("unchecked")
public void testError() {
Exception error = new Exception();
Observable<String> obs = Observable.error(error);

Observer<Boolean> observer = mock(Observer.class);
Observable.create(all(obs, new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
})).subscribe(observer);

verify(observer).onError(error);
verifyNoMoreInteractions(observer);
}
}
}
Loading

0 comments on commit fb0e8b0

Please sign in to comment.