Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: Added zip function with Observable array. #4036

Merged
merged 1 commit into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3119,6 +3119,54 @@ public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction));
}

/**
* Returns an Observable that emits the results of a specified combiner function applied to combinations of
* items emitted, in sequence, by an array of other Observables.
* <p>
* {@code zip} applies this function in strict sequence, so the first item emitted by the new Observable
* will be the result of the function applied to the first item emitted by each of the source Observables;
* the second item emitted by the new Observable will be the result of the function applied to the second
* item emitted by each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@code onNext} as many times as
* the number of {@code onNext} invocations of the source Observable that emits the fewest items.
* <p>
* The operator subscribes to its sources in order they are specified and completes eagerly if
* one of the sources is shorter than the rest while unsubscribing the other sources. Therefore, it
* is possible those other sources will never be able to run to completion (and thus not calling
* {@code doOnCompleted()}). This can also happen if the sources are exactly the same length; if
* source A completes and B has been consumed and is about to complete, the operator detects A won't
* be sending further values and it will unsubscribe B immediately. For example:
* <pre><code>zip(new Observable[]{range(1, 5).doOnCompleted(action1), range(6, 5).doOnCompleted(action2)}, (a) -&gt;
* a)</code></pre>
* {@code action1} will be called but {@code action2} won't.
* <br>To work around this termination property,
* use {@code doOnUnsubscribed()} as well or use {@code using()} to do cleanup in case of completion
* or unsubscription.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
* <dl>
* <dt><b>Backpressure:</b><dt>
* <dd>The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param ws
* an array of source Observables
* @param zipFunction
* a function that, when applied to an item emitted by each of the source Observables, results in
* an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@Experimental
public static <R> Observable<R> zip(Observable<?>[] ws, FuncN<? extends R> zipFunction) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add:

     * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical
     *        with the release number)
     */
    @Experimental

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :)

return Observable.just(ws).lift(new OperatorZip<R>(zipFunction));
}

/**
* Returns an Observable that emits the results of a specified combiner function applied to combinations of
* <i>n</i> items emitted, in sequence, by the <i>n</i> Observables emitted by a specified Observable.
Expand Down
62 changes: 62 additions & 0 deletions src/test/java/rx/internal/operators/OperatorZipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,68 @@ public Object call(final Object... args) {
ts.assertReceivedOnNext(Collections.emptyList());
}

@Test
public void testZipEmptyArray() {
Observable<Integer>[] ws = new Observable[0];
Observable<Integer> o = Observable.zip(ws, new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
assertEquals("No argument should have been passed", 0, args.length);
return 0;
}
});

TestSubscriber<Object> ts = new TestSubscriber<Object>();
o.subscribe(ts);
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
ts.assertReceivedOnNext(Collections.emptyList());
}

@Test
public void testZipArraySingleItem() {
final Integer expected = 0;
Observable<Integer>[] ws = new Observable[]{ Observable.just(expected) };

Observable<Integer> o = Observable.zip(ws, new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
assertEquals("One argument should have been passed", 1, args.length);
return expected;
}
});

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
o.subscribe(ts);
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
ts.assertReceivedOnNext(Collections.singletonList(expected));
}

@Test
public void testZipBigArray() {
final int size = 20;
Integer expected = 0;
Observable<Integer>[] ws = new Observable[size];

for (int i = 0, wsLength = ws.length; i < wsLength; i++) {
ws[i] = Observable.just(i);
expected += i;
}

final Integer finalExpected = expected;
Observable<Integer> o = Observable.zip(ws, new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
assertEquals(size + " arguments should have been passed", size, args.length);
return finalExpected;
}
});

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
o.subscribe(ts);
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
ts.assertReceivedOnNext(Collections.singletonList(expected));
}

/**
* Expect NoSuchElementException instead of blocking forever as zip should emit onCompleted and no onNext
* and last() expects at least a single response.
Expand Down