From ee91a9d1172afa3c0a68cdba50abd172a90c809d Mon Sep 17 00:00:00 2001 From: Artem Zinnatullin Date: Fri, 9 Oct 2015 01:55:10 +0300 Subject: [PATCH] Add Single.defer() --- src/main/java/rx/Single.java | 44 +++++++++++ src/test/java/rx/SingleTest.java | 129 +++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+) diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index b126fd39a3..f862d42e0c 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1953,4 +1953,48 @@ public final Single delay(long delay, TimeUnit unit, Scheduler scheduler) { public final Single delay(long delay, TimeUnit unit) { return delay(delay, unit, Schedulers.computation()); } + + /** + * Returns a {@link Single} that calls a {@link Single} factory to create a {@link Single} for each new Observer + * that subscribes. That is, for each subscriber, the actual {@link Single} that subscriber observes is + * determined by the factory function. + *

+ * + *

+ * The defer Observer allows you to defer or delay emitting value from a {@link Single} until such time as an + * Observer subscribes to the {@link Single}. This allows an {@link Observer} to easily obtain updates or a + * refreshed version of the sequence. + *

+ *
Scheduler:
+ *
{@code defer} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param singleFactory + * the {@link Single} factory function to invoke for each {@link Observer} that subscribes to the + * resulting {@link Single}. + * @param + * the type of the items emitted by the {@link Single}. + * @return a {@link Single} whose {@link Observer}s' subscriptions trigger an invocation of the given + * {@link Single} factory function. + * @see ReactiveX operators documentation: Defer + */ + @Experimental + public static Single defer(final Callable> singleFactory) { + return create(new OnSubscribe() { + @Override + public void call(SingleSubscriber singleSubscriber) { + Single single; + + try { + single = singleFactory.call(); + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + singleSubscriber.onError(t); + return; + } + + single.subscribe(singleSubscriber); + } + }); + } } diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 5bc24a6368..30fe99e92f 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -20,6 +20,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; @@ -30,10 +31,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import rx.Single.OnSubscribe; import rx.exceptions.CompositeException; import rx.functions.Action0; @@ -699,4 +703,129 @@ public void call(SingleSubscriber singleSubscriber) { subscriber.assertNoValues(); subscriber.assertError(expected); } + + @Test + public void deferShouldNotCallFactoryFuncUntilSubscriberSubscribes() throws Exception { + Callable> singleFactory = mock(Callable.class); + Single.defer(singleFactory); + verifyZeroInteractions(singleFactory); + } + + @Test + public void deferShouldSubscribeSubscriberToSingleFromFactoryFuncAndEmitValue() throws Exception { + Callable> singleFactory = mock(Callable.class); + Object value = new Object(); + Single single = Single.just(value); + + when(singleFactory.call()).thenReturn(single); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .defer(singleFactory) + .subscribe(testSubscriber); + + testSubscriber.assertValue(value); + testSubscriber.assertNoErrors(); + + verify(singleFactory).call(); + } + + @Test + public void deferShouldSubscribeSubscriberToSingleFromFactoryFuncAndEmitError() throws Exception { + Callable> singleFactory = mock(Callable.class); + Throwable error = new IllegalStateException(); + Single single = Single.error(error); + + when(singleFactory.call()).thenReturn(single); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .defer(singleFactory) + .subscribe(testSubscriber); + + testSubscriber.assertNoValues(); + testSubscriber.assertError(error); + + verify(singleFactory).call(); + } + + @Test + public void deferShouldPassErrorFromSingleFactoryToTheSubscriber() throws Exception { + Callable> singleFactory = mock(Callable.class); + Throwable errorFromSingleFactory = new IllegalStateException(); + when(singleFactory.call()).thenThrow(errorFromSingleFactory); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .defer(singleFactory) + .subscribe(testSubscriber); + + testSubscriber.assertNoValues(); + testSubscriber.assertError(errorFromSingleFactory); + + verify(singleFactory).call(); + } + + @Test + public void deferShouldCallSingleFactoryForEachSubscriber() throws Exception { + Callable> singleFactory = mock(Callable.class); + + String[] values = {"1", "2", "3"}; + final Single[] singles = new Single[]{Single.just(values[0]), Single.just(values[1]), Single.just(values[2])}; + + final AtomicInteger singleFactoryCallsCounter = new AtomicInteger(); + + when(singleFactory.call()).thenAnswer(new Answer>() { + @Override + public Single answer(InvocationOnMock invocation) throws Throwable { + return singles[singleFactoryCallsCounter.getAndIncrement()]; + } + }); + + Single deferredSingle = Single.defer(singleFactory); + + for (int i = 0; i < singles.length; i ++) { + TestSubscriber testSubscriber = new TestSubscriber(); + + deferredSingle.subscribe(testSubscriber); + + testSubscriber.assertValue(values[i]); + testSubscriber.assertNoErrors(); + } + + verify(singleFactory, times(3)).call(); + } + + @Test + public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryIsNull() { + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .defer(null) + .subscribe(testSubscriber); + + testSubscriber.assertNoValues(); + testSubscriber.assertError(NullPointerException.class); + } + + + @Test + public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryReturnsNull() throws Exception { + Callable> singleFactory = mock(Callable.class); + when(singleFactory.call()).thenReturn(null); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .defer(singleFactory) + .subscribe(testSubscriber); + + testSubscriber.assertNoValues(); + testSubscriber.assertError(NullPointerException.class); + + verify(singleFactory).call(); + } }