diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9f88e15fda..962a52700e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4738,7 +4738,118 @@ public final Observable firstOrDefault(T defaultValue, Func1MSDN: Observable.SelectMany */ public final Observable flatMap(Func1> func) { - return mergeMap(func); + return merge(map(func)); + } + + /** + * Returns an Observable that applies a function to each item emitted or notification raised by the source + * Observable and then flattens the Observables returned from these functions and emits the resulting items. + *

+ * + *

+ *
Scheduler:
+ *
{@code mergeMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the result type + * @param onNext + * a function that returns an Observable to merge for each item emitted by the source Observable + * @param onError + * a function that returns an Observable to merge for an onError notification from the source + * Observable + * @param onCompleted + * a function that returns an Observable to merge for an onCompleted notification from the source + * Observable + * @return an Observable that emits the results of merging the Observables returned from applying the + * specified functions to the emissions and notifications of the source Observable + * @see RxJava wiki: mergeMap + */ + public final Observable flatMap( + Func1> onNext, + Func1> onError, + Func0> onCompleted) { + return merge(mapNotification(onNext, onError, onCompleted)); + } + + /** + * Returns an Observable that emits the results of a specified function to the pair of values emitted by the + * source Observable and a specified collection Observable. + *

+ * + *

+ *
Scheduler:
+ *
{@code mergeMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of items emitted by the collection Observable + * @param + * the type of items emitted by the resulting Observable + * @param collectionSelector + * a function that returns an Observable for each item emitted by the source Observable + * @param resultSelector + * a function that combines one item emitted by each of the source and collection Observables and + * returns an item to be emitted by the resulting Observable + * @return an Observable that emits the results of applying a function to a pair of values emitted by the + * source Observable and the collection Observable + * @see RxJava wiki: mergeMap + */ + public final Observable flatMap(final Func1> collectionSelector, + final Func2 resultSelector) { + return merge(lift(new OperatorMapPair(collectionSelector, resultSelector))); + } + + /** + * Returns an Observable that merges each item emitted by the source Observable with the values in an + * Iterable corresponding to that item that is generated by a selector. + *

+ * + *

+ *
Scheduler:
+ *
{@code mergeMapIterable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of item emitted by the resulting Observable + * @param collectionSelector + * a function that returns an Iterable sequence of values for when given an item emitted by the + * source Observable + * @return an Observable that emits the results of merging the items emitted by the source Observable with + * the values in the Iterables corresponding to those items, as generated by {@code collectionSelector} + * @see RxJava wiki: mergeMapIterable + */ + public final Observable flatMapIterable(Func1> collectionSelector) { + return merge(map(OperatorMapPair.convertSelector(collectionSelector))); + } + + /** + * Returns an Observable that emits the results of applying a function to the pair of values from the source + * Observable and an Iterable corresponding to that item that is generated by a selector. + *

+ * + *

+ *
Scheduler:
+ *
{@code mergeMapIterable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the collection element type + * @param + * the type of item emited by the resulting Observable + * @param collectionSelector + * a function that returns an Iterable sequence of values for each item emitted by the source + * Observable + * @param resultSelector + * a function that returns an item based on the item emitted by the source Observable and the + * Iterable returned for that item by the {@code collectionSelector} + * @return an Observable that emits the items returned by {@code resultSelector} for each item in the source + * Observable + * @see RxJava wiki: mergeMapIterable + */ + public final Observable flatMapIterable(Func1> collectionSelector, + Func2 resultSelector) { + return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector); } /** @@ -5225,7 +5336,9 @@ public final Observable> materialize() { * transformations * @see RxJava wiki: mergeMap * @see #flatMap(Func1) + * @deprecated use flatMap */ + @Deprecated public final Observable mergeMap(Func1> func) { return merge(map(func)); } @@ -5253,7 +5366,9 @@ public final Observable mergeMap(Func1RxJava wiki: mergeMap + * @deprecated use flatMap */ + @Deprecated public final Observable mergeMap( Func1> onNext, Func1> onError, @@ -5283,10 +5398,12 @@ public final Observable mergeMap( * @return an Observable that emits the results of applying a function to a pair of values emitted by the * source Observable and the collection Observable * @see RxJava wiki: mergeMap + * @deprecated use flatMap */ + @Deprecated public final Observable mergeMap(Func1> collectionSelector, Func2 resultSelector) { - return lift(new OperatorMergeMapPair(collectionSelector, resultSelector)); + return flatMap(collectionSelector, resultSelector); } /** @@ -5307,9 +5424,11 @@ public final Observable mergeMap(Func1RxJava wiki: mergeMapIterable + * @deprecated use flatMapIterable */ + @Deprecated public final Observable mergeMapIterable(Func1> collectionSelector) { - return merge(map(OperatorMergeMapPair.convertSelector(collectionSelector))); + return merge(map(OperatorMapPair.convertSelector(collectionSelector))); } /** @@ -5335,10 +5454,12 @@ public final Observable mergeMapIterable(Func1RxJava wiki: mergeMapIterable + * @deprecated use flatMapIterable */ + @Deprecated public final Observable mergeMapIterable(Func1> collectionSelector, Func2 resultSelector) { - return mergeMap(OperatorMergeMapPair.convertSelector(collectionSelector), resultSelector); + return mergeMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector); } /** diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMapPair.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMapPair.java new file mode 100644 index 0000000000..af95ce1426 --- /dev/null +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMapPair.java @@ -0,0 +1,95 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.exceptions.OnErrorThrowable; +import rx.functions.Func1; +import rx.functions.Func2; + +/** + * An {@link Operator} that pairs up items emitted by a source {@link Observable} with the sequence of items + * emitted by the {@code Observable} that is derived from each item by means of a selector, and emits the + * results of this pairing. + * + * @param + * the type of items emitted by the source {@code Observable} + * @param + * the type of items emitted by the derived {@code Observable}s + * @param + * the type of items to be emitted by this {@code Operator} + */ +public final class OperatorMapPair implements Operator, T> { + + /** + * Creates the function that generates a {@code Observable} based on an item emitted by another {@code Observable}. + * + * @param selector + * a function that accepts an item and returns an {@code Iterable} of corresponding items + * @return a function that converts an item emitted by the source {@code Observable} into an {@code Observable} that emits the items generated by {@code selector} operating on that item + */ + public static Func1> convertSelector(final Func1> selector) { + return new Func1>() { + @Override + public Observable call(T t1) { + return Observable.from(selector.call(t1)); + } + }; + } + + final Func1> collectionSelector; + final Func2 resultSelector; + + public OperatorMapPair(final Func1> collectionSelector, final Func2 resultSelector) { + this.collectionSelector = collectionSelector; + this.resultSelector = resultSelector; + } + + @Override + public Subscriber call(final Subscriber> o) { + return new Subscriber(o) { + + @Override + public void onCompleted() { + o.onCompleted(); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onNext(final T outer) { + try { + o.onNext(collectionSelector.call(outer).map(new Func1() { + + @Override + public R call(U inner) { + return resultSelector.call(outer, inner); + } + })); + } catch (Throwable e) { + o.onError(OnErrorThrowable.addValueAsLastCause(e, outer)); + } + } + + }; + } + +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeMapPair.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeMapPair.java deleted file mode 100644 index 47b39dddc6..0000000000 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeMapPair.java +++ /dev/null @@ -1,151 +0,0 @@ - /** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.operators; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import rx.Observable; -import rx.Observable.Operator; -import rx.Subscriber; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.observers.SerializedSubscriber; -import rx.subscriptions.CompositeSubscription; - -/** - * An {@link Operator} that pairs up items emitted by a source {@link Observable} with the sequence of items - * emitted by the {@code Observable} that is derived from each item by means of a selector, and emits the - * results of this pairing. - * - * @param the type of items emitted by the source {@code Observable} - * @param the type of items emitted by the derived {@code Observable}s - * @param the type of items to be emitted by this {@code Operator} - */ -public final class OperatorMergeMapPair implements Operator { - - /** - * Creates the function that generates a {@code Observable} based on an item emitted by another - * {@code Observable}. - * - * @param selector - * a function that accepts an item and returns an {@code Iterable} of corresponding items - * @return a function that converts an item emitted by the source {@code Observable} into an - * {@code Observable} that emits the items generated by {@code selector} operating on that item - */ - public static Func1> convertSelector(final Func1> selector) { - return new Func1>() { - @Override - public Observable call(T t1) { - return Observable.from(selector.call(t1)); - } - }; - } - - final Func1> collectionSelector; - final Func2 resultSelector; - - public OperatorMergeMapPair(Func1> collectionSelector, - Func2 resultSelector) { - this.collectionSelector = collectionSelector; - this.resultSelector = resultSelector; - } - - @Override - public Subscriber call(Subscriber child) { - final SerializedSubscriber s = new SerializedSubscriber(child); - final CompositeSubscription csub = new CompositeSubscription(); - child.add(csub); - - return new SourceSubscriber(s, csub, collectionSelector, resultSelector); - } - - static final class SourceSubscriber extends Subscriber { - final Subscriber s; - final CompositeSubscription csub; - final Func1> collectionSelector; - final Func2 resultSelector; - - volatile int wip; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "wip"); - - public SourceSubscriber(Subscriber s, CompositeSubscription csub, - Func1> collectionSelector, - Func2 resultSelector) { - super(s); - this.s = s; - this.csub = csub; - this.collectionSelector = collectionSelector; - this.resultSelector = resultSelector; - this.wip = 1; - } - - @Override - public void onNext(final T t) { - Observable collection; - try { - collection = collectionSelector.call(t); - } catch (Throwable e) { - onError(e); - return; - } - - Subscriber collectionSub = new Subscriber() { - - @Override - public void onNext(U u) { - try { - s.onNext(resultSelector.call(t, u)); - } catch (Throwable e) { - onError(e); - } - } - - @Override - public void onError(Throwable e) { - SourceSubscriber.this.onError(e); - } - - @Override - public void onCompleted() { - try { - SourceSubscriber.this.onCompleted(); - } finally { - csub.remove(this); - } - } - }; - csub.add(collectionSub); - WIP_UPDATER.incrementAndGet(this); - - collection.unsafeSubscribe(collectionSub); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - unsubscribe(); - } - - @Override - public void onCompleted() { - if (WIP_UPDATER.decrementAndGet(this) == 0) { - s.onCompleted(); - } - } - - } -}