diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 3fa4cc39ac..f8b0268bd1 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1396,6 +1396,29 @@ public final Observable flatMapObservable(Func1 + * + *
+ *
Scheduler:
+ *
{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * a function that, when applied to the item emitted by the source Single, returns a + * Completable + * @return the Completable returned from {@code func} when applied to the item emitted by the source Single + * @see ReactiveX operators documentation: FlatMap + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Completable flatMapCompletable(final Func1 func) { + return Completable.create(new CompletableFlatMapSingleToCompletable(this, func)); + } + /** * Returns a Single that applies a specified function to the item emitted by the source Single and * emits the result of this function application. diff --git a/src/main/java/rx/internal/operators/CompletableFlatMapSingleToCompletable.java b/src/main/java/rx/internal/operators/CompletableFlatMapSingleToCompletable.java new file mode 100644 index 0000000000..f5d0a44123 --- /dev/null +++ b/src/main/java/rx/internal/operators/CompletableFlatMapSingleToCompletable.java @@ -0,0 +1,92 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import rx.Completable; +import rx.Completable.CompletableOnSubscribe; +import rx.Completable.CompletableSubscriber; +import rx.Single; +import rx.SingleSubscriber; +import rx.Subscription; +import rx.exceptions.Exceptions; +import rx.functions.Func1; + +public final class CompletableFlatMapSingleToCompletable implements CompletableOnSubscribe { + + final Single source; + + final Func1 mapper; + + public CompletableFlatMapSingleToCompletable(Single source, Func1 mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + public void call(CompletableSubscriber t) { + SourceSubscriber parent = new SourceSubscriber(t, mapper); + t.onSubscribe(parent); + source.subscribe(parent); + } + + static final class SourceSubscriber extends SingleSubscriber implements CompletableSubscriber { + final CompletableSubscriber actual; + + final Func1 mapper; + + public SourceSubscriber(CompletableSubscriber actual, Func1 mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void onSuccess(T value) { + Completable c; + + try { + c = mapper.call(value); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } + + if (c == null) { + onError(new NullPointerException("The mapper returned a null Completable")); + return; + } + + c.subscribe(this); + } + + @Override + public void onError(Throwable error) { + actual.onError(error); + } + + @Override + public void onCompleted() { + actual.onCompleted(); + } + + @Override + public void onSubscribe(Subscription d) { + add(d); + } + } + +} diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 7f45192044..375feafc7e 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -2005,4 +2005,68 @@ public Single call(Integer v) { assertFalse("Observers present?!", ps.hasObservers()); } + @Test + public void flatMapCompletableComplete() { + final AtomicInteger atomicInteger = new AtomicInteger(); + TestSubscriber testSubscriber = TestSubscriber.create(); + + Single.just(1).flatMapCompletable(new Func1() { + @Override + public Completable call(final Integer integer) { + return Completable.fromAction(new Action0() { + @Override + public void call() { + atomicInteger.set(5); + } + }); + } + }).subscribe(testSubscriber); + + testSubscriber.assertCompleted(); + + assertEquals(5, atomicInteger.get()); + } + + @Test + public void flatMapCompletableError() { + final RuntimeException error = new RuntimeException("some error"); + TestSubscriber testSubscriber = TestSubscriber.create(); + + Single.just(1).flatMapCompletable(new Func1() { + @Override + public Completable call(final Integer integer) { + return Completable.error(error); + } + }).subscribe(testSubscriber); + + testSubscriber.assertError(error); + } + + @Test + public void flatMapCompletableNullCompletable() { + TestSubscriber testSubscriber = TestSubscriber.create(); + + Single.just(1).flatMapCompletable(new Func1() { + @Override + public Completable call(final Integer integer) { + return null; + } + }).subscribe(testSubscriber); + + testSubscriber.assertError(NullPointerException.class); + } + + @Test + public void flatMapCompletableException() { + TestSubscriber testSubscriber = TestSubscriber.create(); + + Single.just(1).flatMapCompletable(new Func1() { + @Override + public Completable call(final Integer integer) { + throw new UnsupportedOperationException(); + } + }).subscribe(testSubscriber); + + testSubscriber.assertError(UnsupportedOperationException.class); + } }