Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various Changes While Fixing GroupBy #847

Merged
merged 15 commits into from
Feb 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.observers.SafeSubscriber;
import rx.operators.OnSubscribeFromIterable;
import rx.operators.OnSubscribeRange;
import rx.operators.OperationAll;
import rx.operators.OperationAmb;
Expand All @@ -51,7 +52,6 @@
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFlatMap;
import rx.operators.OperationGroupByUntil;
Expand Down Expand Up @@ -96,10 +96,11 @@
import rx.operators.OperationWindow;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OnSubscribeFromIterable;
import rx.operators.OperatorFilter;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorMap;
import rx.operators.OperatorMerge;
import rx.operators.OperationMergeMaxConcurrent;
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorParallel;
import rx.operators.OperatorRepeat;
Expand Down Expand Up @@ -1791,7 +1792,7 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211914.aspx">MSDN: Observable.Merge</a>
*/
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
return source.lift(new OperatorMerge(maxConcurrent)); // any idea how to get these generics working?!
return Observable.create(OperationMergeMaxConcurrent.merge(source, maxConcurrent));
}

/**
Expand Down Expand Up @@ -2361,7 +2362,7 @@ public final static <T extends Comparable<? super T>> Observable<T> min(Observab
*
* @return
*/
private final Observable<Observable<T>> nest() {
public final Observable<Observable<T>> nest() {
return from(this);
}

Expand Down Expand Up @@ -2439,8 +2440,8 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229460.aspx">MSDN: Observable.Range</a>
*/
public final static Observable<Integer> range(int start, int count) {
if (count < 1) {
throw new IllegalArgumentException("Count must be positive");
if (count < 0) {
throw new IllegalArgumentException("Count can not be negative");
}
if ((start + count) > Integer.MAX_VALUE) {
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
Expand Down Expand Up @@ -4440,7 +4441,7 @@ public final Observable<Boolean> exists(Func1<? super T, Boolean> predicate) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-filter-or-where">RxJava Wiki: filter()</a>
*/
public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
return create(OperationFilter.filter(this, predicate));
return lift(new OperatorFilter<T>(predicate));
}

/**
Expand Down
30 changes: 28 additions & 2 deletions rxjava-core/src/main/java/rx/observers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package rx.observers;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import rx.Notification;
import rx.Observer;
Expand All @@ -27,22 +29,25 @@
public class TestSubscriber<T> extends Subscriber<T> {

private final TestObserver<T> testObserver;
private final CountDownLatch latch = new CountDownLatch(1);
private volatile Thread lastSeenThread;

public TestSubscriber(Subscriber<T> delegate) {
this.testObserver = new TestObserver<T>(delegate);
}

public TestSubscriber(Observer<T> delegate) {
this.testObserver = new TestObserver<T>(delegate);
}

public TestSubscriber() {
this.testObserver = new TestObserver<T>(Subscribers.<T>empty());
this.testObserver = new TestObserver<T>(Subscribers.<T> empty());
}

@Override
public void onCompleted() {
testObserver.onCompleted();
latch.countDown();
}

public List<Notification<T>> getOnCompletedEvents() {
Expand All @@ -52,6 +57,7 @@ public List<Notification<T>> getOnCompletedEvents() {
@Override
public void onError(Throwable e) {
testObserver.onError(e);
latch.countDown();
}

public List<Throwable> getOnErrorEvents() {
Expand All @@ -60,6 +66,7 @@ public List<Throwable> getOnErrorEvents() {

@Override
public void onNext(T t) {
lastSeenThread = Thread.currentThread();
testObserver.onNext(t);
}

Expand All @@ -78,4 +85,23 @@ public void assertTerminalEvent() {
testObserver.assertTerminalEvent();
}

public void awaitTerminalEvent() {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}

public void awaitTerminalEvent(long timeout, TimeUnit unit) {
try {
latch.await(timeout, unit);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}

public Thread getLastSeenThread() {
return lastSeenThread;
}
}
71 changes: 0 additions & 71 deletions rxjava-core/src/main/java/rx/operators/OperationFilter.java

This file was deleted.

Loading