Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement Zip Operator Using Lift [Preview] #785

Merged
merged 4 commits into from
Feb 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperationZip;
import rx.operators.OperatorZip;
import rx.operators.OperatorCast;
import rx.operators.OperatorFromIterable;
import rx.operators.OperatorGroupBy;
Expand All @@ -108,6 +108,7 @@
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
import rx.operators.OperatorZipIterable;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -1645,11 +1646,9 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
* the type of that item
* @return an Observable that emits {@code value} as a single item and then completes
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-just">RxJava Wiki: just()</a>
* @deprecated Use {@link #from(T)}
*/
@Deprecated
public final static <T> Observable<T> just(T value) {
return from(Arrays.asList((value)));
return from(Arrays.asList(value));
}

/**
Expand Down Expand Up @@ -3058,7 +3057,11 @@ public final static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
return create(OperationZip.zip(ws, zipFunction));
List<Observable<?>> os = new ArrayList<Observable<?>>();
for (Observable<?> o : ws) {
os.add(o);
}
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3087,12 +3090,14 @@ public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws,
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
return ws.toList().mergeMap(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
return ws.toList().map(new Func1<List<? extends Observable<?>>, Observable<?>[]>() {

@Override
public final Observable<R> call(List<? extends Observable<?>> wsList) {
return create(OperationZip.zip(wsList, zipFunction));
public Observable<?>[] call(List<? extends Observable<?>> o) {
return o.toArray(new Observable<?>[o.size()]);
}
});

}).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand All @@ -3118,8 +3123,8 @@ public final Observable<R> call(List<? extends Observable<?>> wsList) {
* @return an Observable that emits the zipped results
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, zipFunction));
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3149,7 +3154,7 @@ public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, O
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, zipFunction));
return just(new Observable<?>[] { o1, o2, o3 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3181,7 +3186,7 @@ public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3215,7 +3220,7 @@ public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3251,7 +3256,7 @@ public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? exten
*/
public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3289,7 +3294,7 @@ public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? e
*/
public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3329,7 +3334,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable
*/
public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -3371,7 +3376,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observ
*/
public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8, o9 }).lift(new OperatorZip<R>(zipFunction));
}

/**
Expand Down Expand Up @@ -8403,7 +8408,7 @@ public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
* @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs
*/
public final <T2, R> Observable<R> zip(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
return create(OperationZip.zipIterable(this, other, zipFunction));
return lift(new OperatorZipIterable<T, T2, R>(other, zipFunction));
}

/**
Expand Down
21 changes: 0 additions & 21 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,6 @@ protected Subscriber(Subscriber<?> op) {
this(op.cs);
}

public static <T> Subscriber<T> from(final Observer<? super T> o) {
return new Subscriber<T>() {

@Override
public void onCompleted() {
o.onCompleted();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(T t) {
o.onNext(t);
}

};
}

/**
* Used to register an unsubscribe callback.
*/
Expand Down
151 changes: 151 additions & 0 deletions rxjava-core/src/main/java/rx/observers/Subscribers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package rx.observers;

import rx.Observer;
import rx.Subscriber;
import rx.util.OnErrorNotImplementedException;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

public class Subscribers {

public static <T> Subscriber<T> from(final Observer<? super T> o) {
return new Subscriber<T>() {

@Override
public void onCompleted() {
o.onCompleted();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(T t) {
o.onNext(t);
}

};
}

/**
* Create an empty Subscriber that ignores all events.
*/
public static final <T> Subscriber<T> create() {
return new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}

@Override
public final void onNext(T args) {
// do nothing
}

};
}

/**
* Create an Subscriber that receives `onNext` and ignores `onError` and `onCompleted`.
*/
public static final <T> Subscriber<T> create(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}

return new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

};
}

/**
* Create an Subscriber that receives `onNext` and `onError` and ignores `onCompleted`.
*
*/
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}

return new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
onError.call(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

};
}

/**
* Create an Subscriber that receives `onNext`, `onError` and `onCompleted`.
*
*/
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}

return new Subscriber<T>() {

@Override
public final void onCompleted() {
onComplete.call();
}

@Override
public final void onError(Throwable e) {
onError.call(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

};
}

}
9 changes: 9 additions & 0 deletions rxjava-core/src/main/java/rx/observers/TestObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.observers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -73,6 +74,14 @@ public List<T> getOnNextEvents() {
return Collections.unmodifiableList(onNextEvents);
}

public List<Object> getEvents() {
ArrayList<Object> events = new ArrayList<Object>();
events.add(onNextEvents);
events.add(onErrorEvents);
events.add(onCompletedEvents);
return Collections.unmodifiableList(events);
}

public void assertReceivedOnNext(List<T> items) {
if (onNextEvents.size() != items.size()) {
throw new AssertionError("Number of items does not match. Provided: " + items.size() + " Actual: " + onNextEvents.size());
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/observers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public class TestSubscriber<T> extends Subscriber<T> {

private final Subscriber<Object> EMPTY = Subscriber.from(new EmptyObserver<Object>());
private final Subscriber<Object> EMPTY = Subscribers.create();

private final TestObserver<T> testObserver;

Expand Down
Loading