Skip to content

Commit

Permalink
Merge pull request #785 from benjchristensen/operator-zip
Browse files Browse the repository at this point in the history
Reimplement Zip Operator Using Lift [Preview]
  • Loading branch information
benjchristensen committed Feb 5, 2014
2 parents 267d569 + 3d5474f commit dc15e2b
Show file tree
Hide file tree
Showing 10 changed files with 1,080 additions and 294 deletions.
43 changes: 24 additions & 19 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperationZip;
import rx.operators.OperatorZip;
import rx.operators.OperatorCast;
import rx.operators.OperatorFromIterable;
import rx.operators.OperatorGroupBy;
Expand All @@ -108,6 +108,7 @@
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
import rx.operators.OperatorZipIterable;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -1645,11 +1646,9 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
* the type of that item
* @return an Observable that emits {@code value} as a single item and then completes
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-just">RxJava Wiki: just()</a>
* @deprecated Use {@link #from(T)}
*/
@Deprecated
public final static <T> Observable<T> just(T value) {
return from(Arrays.asList((value)));
return from(Arrays.asList(value));
}

/**
Expand Down Expand Up @@ -3058,7 +3057,11 @@ public final static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
return create(OperationZip.zip(ws, zipFunction));
List<Observable<?>> os = new ArrayList<Observable<?>>();
for (Observable<?> o : ws) {
os.add(o);
}
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3087,12 +3090,14 @@ public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws,
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
return ws.toList().mergeMap(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
return ws.toList().map(new Func1<List<? extends Observable<?>>, Observable<?>[]>() {

@Override
public final Observable<R> call(List<? extends Observable<?>> wsList) {
return create(OperationZip.zip(wsList, zipFunction));
public Observable<?>[] call(List<? extends Observable<?>> o) {
return o.toArray(new Observable<?>[o.size()]);
}
});

}).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand All @@ -3118,8 +3123,8 @@ public final Observable<R> call(List<? extends Observable<?>> wsList) {
* @return an Observable that emits the zipped results
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, zipFunction));
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3149,7 +3154,7 @@ public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, O
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, zipFunction));
return just(new Observable<?>[] { o1, o2, o3 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3181,7 +3186,7 @@ public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3215,7 +3220,7 @@ public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3251,7 +3256,7 @@ public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? exten
*/
public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3289,7 +3294,7 @@ public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? e
*/
public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3329,7 +3334,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable
*/
public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3371,7 +3376,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observ
*/
public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8, o9 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -8403,7 +8408,7 @@ public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
* @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs
*/
public final <T2, R> Observable<R> zip(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
return create(OperationZip.zipIterable(this, other, zipFunction));
return lift(new OperatorZipIterable<T, T2, R>(other, zipFunction));
}

/**
Expand Down
21 changes: 0 additions & 21 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,6 @@ protected Subscriber(Subscriber<?> op) {
this(op.cs);
}

public static <T> Subscriber<T> from(final Observer<? super T> o) {
return new Subscriber<T>() {

@Override
public void onCompleted() {
o.onCompleted();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(T t) {
o.onNext(t);
}

};
}

/**
* Used to register an unsubscribe callback.
*/
Expand Down
151 changes: 151 additions & 0 deletions rxjava-core/src/main/java/rx/observers/Subscribers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package rx.observers;

import rx.Observer;
import rx.Subscriber;
import rx.util.OnErrorNotImplementedException;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

public class Subscribers {

public static <T> Subscriber<T> from(final Observer<? super T> o) {
return new Subscriber<T>() {

@Override
public void onCompleted() {
o.onCompleted();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(T t) {
o.onNext(t);
}

};
}

/**
* Create an empty Subscriber that ignores all events.
*/
public static final <T> Subscriber<T> create() {
return new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}

@Override
public final void onNext(T args) {
// do nothing
}

};
}

/**
* Create an Subscriber that receives `onNext` and ignores `onError` and `onCompleted`.
*/
public static final <T> Subscriber<T> create(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}

return new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

};
}

/**
* Create an Subscriber that receives `onNext` and `onError` and ignores `onCompleted`.
*
*/
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}

return new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
onError.call(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

};
}

/**
* Create an Subscriber that receives `onNext`, `onError` and `onCompleted`.
*
*/
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}

return new Subscriber<T>() {

@Override
public final void onCompleted() {
onComplete.call();
}

@Override
public final void onError(Throwable e) {
onError.call(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

};
}

}
9 changes: 9 additions & 0 deletions rxjava-core/src/main/java/rx/observers/TestObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.observers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -73,6 +74,14 @@ public List<T> getOnNextEvents() {
return Collections.unmodifiableList(onNextEvents);
}

public List<Object> getEvents() {
ArrayList<Object> events = new ArrayList<Object>();
events.add(onNextEvents);
events.add(onErrorEvents);
events.add(onCompletedEvents);
return Collections.unmodifiableList(events);
}

public void assertReceivedOnNext(List<T> items) {
if (onNextEvents.size() != items.size()) {
throw new AssertionError("Number of items does not match. Provided: " + items.size() + " Actual: " + onNextEvents.size());
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/observers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public class TestSubscriber<T> extends Subscriber<T> {

private final Subscriber<Object> EMPTY = Subscriber.from(new EmptyObserver<Object>());
private final Subscriber<Object> EMPTY = Subscribers.create();

private final TestObserver<T> testObserver;

Expand Down
Loading

0 comments on commit dc15e2b

Please sign in to comment.