Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flatMap overloads #1560

Merged
merged 1 commit into from
Aug 8, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 125 additions & 4 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4738,7 +4738,118 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.selectmany.aspx">MSDN: Observable.SelectMany</a>
*/
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return mergeMap(func);
return merge(map(func));
}

/**
* Returns an Observable that applies a function to each item emitted or notification raised by the source
* Observable and then flattens the Observables returned from these functions and emits the resulting items.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMap.nce.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the result type
* @param onNext
* a function that returns an Observable to merge for each item emitted by the source Observable
* @param onError
* a function that returns an Observable to merge for an onError notification from the source
* Observable
* @param onCompleted
* a function that returns an Observable to merge for an onCompleted notification from the source
* Observable
* @return an Observable that emits the results of merging the Observables returned from applying the
* specified functions to the emissions and notifications of the source Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
*/
public final <R> Observable<R> flatMap(
Func1<? super T, ? extends Observable<? extends R>> onNext,
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
Func0<? extends Observable<? extends R>> onCompleted) {
return merge(mapNotification(onNext, onError, onCompleted));
}

/**
* Returns an Observable that emits the results of a specified function to the pair of values emitted by the
* source Observable and a specified collection Observable.
* <p>
* <img width="640" height="390" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMap.r.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of items emitted by the collection Observable
* @param <R>
* the type of items emitted by the resulting Observable
* @param collectionSelector
* a function that returns an Observable for each item emitted by the source Observable
* @param resultSelector
* a function that combines one item emitted by each of the source and collection Observables and
* returns an item to be emitted by the resulting Observable
* @return an Observable that emits the results of applying a function to a pair of values emitted by the
* source Observable and the collection Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
*/
public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
final Func2<? super T, ? super U, ? extends R> resultSelector) {
return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)));
}

/**
* Returns an Observable that merges each item emitted by the source Observable with the values in an
* Iterable corresponding to that item that is generated by a selector.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMapIterable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the type of item emitted by the resulting Observable
* @param collectionSelector
* a function that returns an Iterable sequence of values for when given an item emitted by the
* source Observable
* @return an Observable that emits the results of merging the items emitted by the source Observable with
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMapIterable</a>
*/
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
}

/**
* Returns an Observable that emits the results of applying a function to the pair of values from the source
* Observable and an Iterable corresponding to that item that is generated by a selector.
* <p>
* <img width="640" height="390" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMapIterable.r.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the collection element type
* @param <R>
* the type of item emited by the resulting Observable
* @param collectionSelector
* a function that returns an Iterable sequence of values for each item emitted by the source
* Observable
* @param resultSelector
* a function that returns an item based on the item emitted by the source Observable and the
* Iterable returned for that item by the {@code collectionSelector}
* @return an Observable that emits the items returned by {@code resultSelector} for each item in the source
* Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMapIterable</a>
*/
public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector);
}

/**
Expand Down Expand Up @@ -5225,7 +5336,9 @@ public final Observable<Notification<T>> materialize() {
* transformations
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
* @see #flatMap(Func1)
* @deprecated use flatMap
*/
@Deprecated
public final <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return merge(map(func));
}
Expand Down Expand Up @@ -5253,7 +5366,9 @@ public final <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<?
* @return an Observable that emits the results of merging the Observables returned from applying the
* specified functions to the emissions and notifications of the source Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
* @deprecated use flatMap
*/
@Deprecated
public final <R> Observable<R> mergeMap(
Func1<? super T, ? extends Observable<? extends R>> onNext,
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
Expand Down Expand Up @@ -5283,10 +5398,12 @@ public final <R> Observable<R> mergeMap(
* @return an Observable that emits the results of applying a function to a pair of values emitted by the
* source Observable and the collection Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
* @deprecated use flatMap
*/
@Deprecated
public final <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return lift(new OperatorMergeMapPair<T, U, R>(collectionSelector, resultSelector));
return flatMap(collectionSelector, resultSelector);
}

/**
Expand All @@ -5307,9 +5424,11 @@ public final <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable
* @return an Observable that emits the results of merging the items emitted by the source Observable with
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMapIterable</a>
* @deprecated use flatMapIterable
*/
@Deprecated
public final <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
return merge(map(OperatorMergeMapPair.convertSelector(collectionSelector)));
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
}

/**
Expand All @@ -5335,10 +5454,12 @@ public final <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Itera
* @return an Observable that emits the items returned by {@code resultSelector} for each item in the source
* Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMapIterable</a>
* @deprecated use flatMapIterable
*/
@Deprecated
public final <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return mergeMap(OperatorMergeMapPair.convertSelector(collectionSelector), resultSelector);
return mergeMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func1;
import rx.functions.Func2;

/**
* An {@link Operator} that pairs up items emitted by a source {@link Observable} with the sequence of items
* emitted by the {@code Observable} that is derived from each item by means of a selector, and emits the
* results of this pairing.
*
* @param <T>
* the type of items emitted by the source {@code Observable}
* @param <U>
* the type of items emitted by the derived {@code Observable}s
* @param <R>
* the type of items to be emitted by this {@code Operator}
*/
public final class OperatorMapPair<T, U, R> implements Operator<Observable<? extends R>, T> {

/**
* Creates the function that generates a {@code Observable} based on an item emitted by another {@code Observable}.
*
* @param selector
* a function that accepts an item and returns an {@code Iterable} of corresponding items
* @return a function that converts an item emitted by the source {@code Observable} into an {@code Observable} that emits the items generated by {@code selector} operating on that item
*/
public static <T, U> Func1<T, Observable<U>> convertSelector(final Func1<? super T, ? extends Iterable<? extends U>> selector) {
return new Func1<T, Observable<U>>() {
@Override
public Observable<U> call(T t1) {
return Observable.from(selector.call(t1));
}
};
}

final Func1<? super T, ? extends Observable<? extends U>> collectionSelector;
final Func2<? super T, ? super U, ? extends R> resultSelector;

public OperatorMapPair(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector, final Func2<? super T, ? super U, ? extends R> resultSelector) {
this.collectionSelector = collectionSelector;
this.resultSelector = resultSelector;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super Observable<? extends R>> o) {
return new Subscriber<T>(o) {

@Override
public void onCompleted() {
o.onCompleted();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(final T outer) {
try {
o.onNext(collectionSelector.call(outer).map(new Func1<U, R>() {

@Override
public R call(U inner) {
return resultSelector.call(outer, inner);
}
}));
} catch (Throwable e) {
o.onError(OnErrorThrowable.addValueAsLastCause(e, outer));
}
}

};
}

}
Loading