Skip to content

Commit

Permalink
Observable.merge: refactor varargs to overloads
Browse files Browse the repository at this point in the history
ReactiveX#359 Varargs cause compiler warnings
  • Loading branch information
benjchristensen committed Sep 9, 2013
1 parent 1a0fcdc commit 969b3a9
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 7 deletions.
226 changes: 220 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,11 @@ public static <T> Observable<T> just(T value) {
* Observable, by using the <code>merge</code> method.
*
* @param source
* a list of Observables
* an Iterable of Observables
* @return an Observable that emits items that are the result of flattening the {@code source} list of Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(List<? extends Observable<? extends T>> source) {
public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> source) {
return create(OperationMerge.merge(source));
}

Expand All @@ -954,6 +954,28 @@ public static <T> Observable<T> merge(List<? extends Observable<? extends T>> so
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
return create(OperationMerge.merge(source));
}

/**
* Flattens a series of Observables into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param t1
* an Observable to be merged
* @param t2
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
return create(OperationMerge.merge(t1, t2));
}

/**
* Flattens a series of Observables into one Observable, without any transformation.
Expand All @@ -963,14 +985,206 @@ public static <T> Observable<T> merge(Observable<? extends Observable<? extends
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param source
* a series of Observables
* @param t1
* an Observable to be merged
* @param t2
* an Observable to be merged
* @param t3
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Observable<? extends T>... source) {
return create(OperationMerge.merge(source));
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3) {
return create(OperationMerge.merge(t1, t2, t3));
}

/**
* Flattens a series of Observables into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param t1
* an Observable to be merged
* @param t2
* an Observable to be merged
* @param t3
* an Observable to be merged
* @param t4
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4) {
return create(OperationMerge.merge(t1, t2, t3, t4));
}

/**
* Flattens a series of Observables into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param t1
* an Observable to be merged
* @param t2
* an Observable to be merged
* @param t3
* an Observable to be merged
* @param t4
* an Observable to be merged
* @param t5
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5));
}

/**
* Flattens a series of Observables into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param t1
* an Observable to be merged
* @param t2
* an Observable to be merged
* @param t3
* an Observable to be merged
* @param t4
* an Observable to be merged
* @param t5
* an Observable to be merged
* @param t6
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6));
}

/**
* Flattens a series of Observables into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param t1
* an Observable to be merged
* @param t2
* an Observable to be merged
* @param t3
* an Observable to be merged
* @param t4
* an Observable to be merged
* @param t5
* an Observable to be merged
* @param t6
* an Observable to be merged
* @param t7
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7));
}

/**
* Flattens a series of Observables into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param t1
* an Observable to be merged
* @param t2
* an Observable to be merged
* @param t3
* an Observable to be merged
* @param t4
* an Observable to be merged
* @param t5
* an Observable to be merged
* @param t6
* an Observable to be merged
* @param t7
* an Observable to be merged
* @param t8
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7, t8));
}

/**
* Flattens a series of Observables into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param t1
* an Observable to be merged
* @param t2
* an Observable to be merged
* @param t3
* an Observable to be merged
* @param t4
* an Observable to be merged
* @param t5
* an Observable to be merged
* @param t6
* an Observable to be merged
* @param t7
* an Observable to be merged
* @param t8
* an Observable to be merged
* @param t9
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8, Observable<? extends T> t9) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7, t8, t9));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/operators/OperationMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static <T> OnSubscribeFunc<T> merge(final Observable<? extends T>... sequ
return merge(Arrays.asList(sequences));
}

public static <T> OnSubscribeFunc<T> merge(final List<? extends Observable<? extends T>> sequences) {
public static <T> OnSubscribeFunc<T> merge(final Iterable<? extends Observable<? extends T>> sequences) {
return merge(Observable.create(new OnSubscribeFunc<Observable<? extends T>>() {

private volatile boolean unsubscribed = false;
Expand Down
59 changes: 59 additions & 0 deletions rxjava-core/src/test/java/rx/CovarianceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,65 @@ public Subscription onSubscribe(Observer<? super Movie> o) {
assertTrue(values.get(2) instanceof Media);
assertTrue(values.get(3) instanceof HorrorMovie);
}


@Test
public void testMergeCovariance() {
Observable<Media> o1 = Observable.<Media> from(new HorrorMovie(), new Movie());
Observable<Media> o2 = Observable.from(new Media(), new HorrorMovie());

Observable<Observable<Media>> os = Observable.from(o1, o2);

List<Media> values = Observable.merge(os).toList().toBlockingObservable().single();
}

@Test
public void testMergeCovariance2() {
Observable<Media> o1 = Observable.from(new HorrorMovie(), new Movie(), new Media());
Observable<Media> o2 = Observable.from(new Media(), new HorrorMovie());

Observable<Observable<Media>> os = Observable.from(o1, o2);

List<Media> values = Observable.merge(os).toList().toBlockingObservable().single();
}

@Test
public void testMergeCovariance3() {
Observable<Movie> o1 = Observable.from(new HorrorMovie(), new Movie());
Observable<Media> o2 = Observable.from(new Media(), new HorrorMovie());

List<Media> values = Observable.merge(o1, o2).toList().toBlockingObservable().single();

assertTrue(values.get(0) instanceof HorrorMovie);
assertTrue(values.get(1) instanceof Movie);
assertTrue(values.get(2) instanceof Media);
assertTrue(values.get(3) instanceof HorrorMovie);
}

@Test
public void testMergeCovariance4() {

Observable<Movie> o1 = Observable.create(new OnSubscribeFunc<Movie>() {

@Override
public Subscription onSubscribe(Observer<? super Movie> o) {
o.onNext(new HorrorMovie());
o.onNext(new Movie());
// o.onNext(new Media()); // correctly doesn't compile
o.onCompleted();
return Subscriptions.empty();
}
});

Observable<Media> o2 = Observable.from(new Media(), new HorrorMovie());

List<Media> values = Observable.merge(o1, o2).toList().toBlockingObservable().single();

assertTrue(values.get(0) instanceof HorrorMovie);
assertTrue(values.get(1) instanceof Movie);
assertTrue(values.get(2) instanceof Media);
assertTrue(values.get(3) instanceof HorrorMovie);
}

Func2<Media, Rating, ExtendedResult> combine = new Func2<Media, Rating, ExtendedResult>() {
@Override
Expand Down

0 comments on commit 969b3a9

Please sign in to comment.