From 75f9631dfbe9eecc79428e7cd796ffda1f22250f Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Sun, 19 Jun 2016 18:40:50 +0200 Subject: [PATCH] Added zip function with Observable array. --- src/main/java/rx/Observable.java | 48 ++++++++++++++ .../internal/operators/OperatorZipTest.java | 62 +++++++++++++++++++ 2 files changed, 110 insertions(+) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 1babda6639..04d55897b7 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3119,6 +3119,54 @@ public static Observable zip(Iterable> ws, FuncN< return Observable.just(os.toArray(new Observable[os.size()])).lift(new OperatorZip(zipFunction)); } + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * items emitted, in sequence, by an array of other Observables. + *

+ * {@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each of the source Observables; + * the second item emitted by the new Observable will be the result of the function applied to the second + * item emitted by each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@code onNext} as many times as + * the number of {@code onNext} invocations of the source Observable that emits the fewest items. + *

+ * The operator subscribes to its sources in order they are specified and completes eagerly if + * one of the sources is shorter than the rest while unsubscribing the other sources. Therefore, it + * is possible those other sources will never be able to run to completion (and thus not calling + * {@code doOnCompleted()}). This can also happen if the sources are exactly the same length; if + * source A completes and B has been consumed and is about to complete, the operator detects A won't + * be sending further values and it will unsubscribe B immediately. For example: + *

zip(new Observable[]{range(1, 5).doOnCompleted(action1), range(6, 5).doOnCompleted(action2)}, (a) ->
+     * a)
+ * {@code action1} will be called but {@code action2} won't. + *
To work around this termination property, + * use {@code doOnUnsubscribed()} as well or use {@code using()} to do cleanup in case of completion + * or unsubscription. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects backpressure from the sources and honors backpressure from the downstream. + * (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use + * one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param ws + * an array of source Observables + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + @Experimental + public static Observable zip(Observable[] ws, FuncN zipFunction) { + return Observable.just(ws).lift(new OperatorZip(zipFunction)); + } + /** * Returns an Observable that emits the results of a specified combiner function applied to combinations of * n items emitted, in sequence, by the n Observables emitted by a specified Observable. diff --git a/src/test/java/rx/internal/operators/OperatorZipTest.java b/src/test/java/rx/internal/operators/OperatorZipTest.java index 75f83f3e67..5ba0adaa45 100644 --- a/src/test/java/rx/internal/operators/OperatorZipTest.java +++ b/src/test/java/rx/internal/operators/OperatorZipTest.java @@ -998,6 +998,68 @@ public Object call(final Object... args) { ts.assertReceivedOnNext(Collections.emptyList()); } + @Test + public void testZipEmptyArray() { + Observable[] ws = new Observable[0]; + Observable o = Observable.zip(ws, new FuncN() { + @Override + public Integer call(Object... args) { + assertEquals("No argument should have been passed", 0, args.length); + return 0; + } + }); + + TestSubscriber ts = new TestSubscriber(); + o.subscribe(ts); + ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS); + ts.assertReceivedOnNext(Collections.emptyList()); + } + + @Test + public void testZipArraySingleItem() { + final Integer expected = 0; + Observable[] ws = new Observable[]{ Observable.just(expected) }; + + Observable o = Observable.zip(ws, new FuncN() { + @Override + public Integer call(Object... args) { + assertEquals("One argument should have been passed", 1, args.length); + return expected; + } + }); + + TestSubscriber ts = new TestSubscriber(); + o.subscribe(ts); + ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS); + ts.assertReceivedOnNext(Collections.singletonList(expected)); + } + + @Test + public void testZipBigArray() { + final int size = 20; + Integer expected = 0; + Observable[] ws = new Observable[size]; + + for (int i = 0, wsLength = ws.length; i < wsLength; i++) { + ws[i] = Observable.just(i); + expected += i; + } + + final Integer finalExpected = expected; + Observable o = Observable.zip(ws, new FuncN() { + @Override + public Integer call(Object... args) { + assertEquals(size + " arguments should have been passed", size, args.length); + return finalExpected; + } + }); + + TestSubscriber ts = new TestSubscriber(); + o.subscribe(ts); + ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS); + ts.assertReceivedOnNext(Collections.singletonList(expected)); + } + /** * Expect NoSuchElementException instead of blocking forever as zip should emit onCompleted and no onNext * and last() expects at least a single response.