Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Remove vararg overloads for combineLatest in Observable + Flowable #6635

Merged
merged 1 commit into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 16 additions & 155 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java

Large diffs are not rendered by default.

104 changes: 8 additions & 96 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,49 +168,6 @@ public static int bufferSize() {
return Flowable.bufferSize();
}

/**
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
* aggregation is defined by a specified function.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
* <p>
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
* <p>
* If there are no ObservableSources provided, the resulting sequence completes immediately without emitting
* any items and without any calls to the combiner function.
*
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T>
* the common base type of source values
* @param <R>
* the result type
* @param sources
* the collection of source ObservableSources
* @param combiner
* the aggregation function used to combine the items emitted by the source ObservableSources
* @param bufferSize
* the internal buffer size and prefetch amount applied to every source Observable
* @return an Observable that emits items that are the result of combining the items emitted by the source
* ObservableSources by means of the given aggregation function
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatest(Function<? super Object[], ? extends R> combiner, int bufferSize, ObservableSource<? extends T>... sources) {
return combineLatest(sources, combiner, bufferSize);
}

/**
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
Expand Down Expand Up @@ -437,7 +394,7 @@ public static <T1, T2, R> Observable<R> combineLatest(
BiFunction<? super T1, ? super T2, ? extends R> combiner) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2);
return combineLatest(new ObservableSource[] { source1, source2 }, Functions.toFunction(combiner), bufferSize());
}

/**
Expand Down Expand Up @@ -482,7 +439,7 @@ public static <T1, T2, T3, R> Observable<R> combineLatest(
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3);
return combineLatest(new ObservableSource[] { source1, source2, source3 }, Functions.toFunction(combiner), bufferSize());
}

/**
Expand Down Expand Up @@ -531,7 +488,7 @@ public static <T1, T2, T3, T4, R> Observable<R> combineLatest(
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4);
return combineLatest(new ObservableSource[] { source1, source2, source3, source4 }, Functions.toFunction(combiner), bufferSize());
}

/**
Expand Down Expand Up @@ -585,7 +542,7 @@ public static <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5);
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5 }, Functions.toFunction(combiner), bufferSize());
}

/**
Expand Down Expand Up @@ -643,7 +600,7 @@ public static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6);
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6 }, Functions.toFunction(combiner), bufferSize());
}

/**
Expand Down Expand Up @@ -706,7 +663,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7);
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7 }, Functions.toFunction(combiner), bufferSize());
}

/**
Expand Down Expand Up @@ -773,7 +730,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
ObjectHelper.requireNonNull(source8, "source8 is null");
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8);
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7, source8 }, Functions.toFunction(combiner), bufferSize());
}

/**
Expand Down Expand Up @@ -845,7 +802,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
ObjectHelper.requireNonNull(source7, "source7 is null");
ObjectHelper.requireNonNull(source8, "source8 is null");
ObjectHelper.requireNonNull(source9, "source9 is null");
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8, source9);
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7, source8, source9 }, Functions.toFunction(combiner), bufferSize());
}

/**
Expand Down Expand Up @@ -890,51 +847,6 @@ public static <T, R> Observable<R> combineLatestDelayError(ObservableSource<? ex
return combineLatestDelayError(sources, combiner, bufferSize());
}

/**
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
* aggregation is defined by a specified function and delays any error from the sources until
* all source ObservableSources terminate.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatestDelayError.png" alt="">
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
* <p>
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
* <p>
* If there are no ObservableSources provided, the resulting sequence completes immediately without emitting
* any items and without any calls to the combiner function.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code combineLatestDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T>
* the common base type of source values
* @param <R>
* the result type
* @param sources
* the collection of source ObservableSources
* @param combiner
* the aggregation function used to combine the items emitted by the source ObservableSources
* @param bufferSize
* the internal buffer size and prefetch amount applied to every source Observable
* @return an Observable that emits items that are the result of combining the items emitted by the source
* ObservableSources by means of the given aggregation function
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
int bufferSize, ObservableSource<? extends T>... sources) {
return combineLatestDelayError(sources, combiner, bufferSize);
}

/**
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
Expand Down
76 changes: 0 additions & 76 deletions src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,6 @@ public void ambIterableOneIsNull() {
.assertError(NullPointerException.class);
}

@Test(expected = NullPointerException.class)
public void combineLatestVarargsNull() {
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, (Publisher<Object>[])null);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestVarargsOneIsNull() {
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, Flowable.never(), null).blockingLast();
}

@Test(expected = NullPointerException.class)
public void combineLatestIterableNull() {
Flowable.combineLatestDelayError((Iterable<Publisher<Object>>)null, new Function<Object[], Object>() {
Expand Down Expand Up @@ -133,23 +112,6 @@ public Object apply(Object[] v) {
}).blockingLast();
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestVarargsFunctionNull() {
Flowable.combineLatestDelayError(null, Flowable.never());
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestVarargsFunctionReturnsNull() {
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return null;
}
}, just1).blockingLast();
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestIterableFunctionNull() {
Expand Down Expand Up @@ -2759,12 +2721,6 @@ public void combineLatestDelayErrorIterableFunctionNull() {
Flowable.combineLatestDelayError(Arrays.asList(just1), null, 128);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsFunctionNull() {
Flowable.combineLatestDelayError(null, 128, Flowable.never());
}

@Test(expected = NullPointerException.class)
public void zipFlowableNull() {
Flowable.zip((Flowable<Flowable<Object>>)null, new Function<Object[], Object>() {
Expand Down Expand Up @@ -2795,27 +2751,6 @@ public void concatFlowableNull() {
Flowable.concat((Flowable<Flowable<Object>>)null);
}

@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsNull() {
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, 128, (Flowable<Object>[])null);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsOneIsNull() {
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return 1;
}
}, 128, Flowable.never(), null).blockingLast();
}

@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableNull() {
Flowable.combineLatestDelayError((Iterable<Flowable<Object>>)null, new Function<Object[], Object>() {
Expand Down Expand Up @@ -2876,15 +2811,4 @@ public void delaySubscriptionOtherNull() {
public void sampleFlowableNull() {
just1.sample(null);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsFunctionReturnsNull() {
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
@Override
public Object apply(Object[] v) {
return null;
}
}, 128, just1).blockingLast();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1305,15 +1305,14 @@ public Object apply(Object a, Object b) throws Exception {
@Test
public void errorDelayed() {
Flowable.combineLatestDelayError(
new Publisher[] { Flowable.error(new TestException()), Flowable.just(1) },
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
128,
Flowable.error(new TestException()),
Flowable.just(1)
128
)
.test()
.assertFailure(TestException.class);
Expand All @@ -1323,15 +1322,14 @@ public Object apply(Object[] a) throws Exception {
@Test
public void errorDelayed2() {
Flowable.combineLatestDelayError(
new Publisher[] { Flowable.error(new TestException()).startWithItem(1), Flowable.empty() },
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
128,
Flowable.error(new TestException()).startWithItem(1),
Flowable.empty()
128
)
.test()
.assertFailure(TestException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,15 +926,14 @@ public Object apply(Object a, Object b) throws Exception {
@Test
public void errorDelayed() {
Observable.combineLatestDelayError(
new ObservableSource[] { Observable.error(new TestException()), Observable.just(1) },
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
128,
Observable.error(new TestException()),
Observable.just(1)
128
)
.test()
.assertFailure(TestException.class);
Expand All @@ -944,15 +943,14 @@ public Object apply(Object[] a) throws Exception {
@Test
public void errorDelayed2() {
Observable.combineLatestDelayError(
new ObservableSource[] { Observable.error(new TestException()).startWithItem(1), Observable.empty() },
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
128,
Observable.error(new TestException()).startWithItem(1),
Observable.empty()
128
)
.test()
.assertFailure(TestException.class);
Expand Down
Loading