Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Single.defer() #3433

Merged
merged 1 commit into from
Nov 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1953,4 +1953,48 @@ public final Single<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
public final Single<T> delay(long delay, TimeUnit unit) {
return delay(delay, unit, Schedulers.computation());
}

/**
* Returns a {@link Single} that calls a {@link Single} factory to create a {@link Single} for each new Observer
* that subscribes. That is, for each subscriber, the actual {@link Single} that subscriber observes is
* determined by the factory function.
* <p>
* <img width="640" height="340" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defer.png" alt="">
* <p>
* The defer Observer allows you to defer or delay emitting value from a {@link Single} until such time as an
* Observer subscribes to the {@link Single}. This allows an {@link Observer} to easily obtain updates or a
* refreshed version of the sequence.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code defer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param singleFactory
* the {@link Single} factory function to invoke for each {@link Observer} that subscribes to the
* resulting {@link Single}.
* @param <T>
* the type of the items emitted by the {@link Single}.
* @return a {@link Single} whose {@link Observer}s' subscriptions trigger an invocation of the given
* {@link Single} factory function.
* @see <a href="http://reactivex.io/documentation/operators/defer.html">ReactiveX operators documentation: Defer</a>
*/
@Experimental
public static <T> Single<T> defer(final Callable<Single<T>> singleFactory) {
return create(new OnSubscribe<T>() {
@Override
public void call(SingleSubscriber<? super T> singleSubscriber) {
Single<? extends T> single;

try {
single = singleFactory.call();
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
singleSubscriber.onError(t);
return;
}

single.subscribe(singleSubscriber);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please notice that I'm not wrapping subscriber via Subscribers.wrap() because SingleSubscriber differs from Subscriber, that's why I used subscribe instead of unsafeSubscribe() here.

If this should be changed, please comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SingleSubscriber doesn't have onStart like Subscriber so this is not a problem.

}
});
}
}
129 changes: 129 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -30,10 +31,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import rx.Single.OnSubscribe;
import rx.exceptions.CompositeException;
import rx.functions.Action0;
Expand Down Expand Up @@ -699,4 +703,129 @@ public void call(SingleSubscriber<? super Integer> singleSubscriber) {
subscriber.assertNoValues();
subscriber.assertError(expected);
}

@Test
public void deferShouldNotCallFactoryFuncUntilSubscriberSubscribes() throws Exception {
Callable<Single<Object>> singleFactory = mock(Callable.class);
Single.defer(singleFactory);
verifyZeroInteractions(singleFactory);
}

@Test
public void deferShouldSubscribeSubscriberToSingleFromFactoryFuncAndEmitValue() throws Exception {
Callable<Single<Object>> singleFactory = mock(Callable.class);
Object value = new Object();
Single<Object> single = Single.just(value);

when(singleFactory.call()).thenReturn(single);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Single
.defer(singleFactory)
.subscribe(testSubscriber);

testSubscriber.assertValue(value);
testSubscriber.assertNoErrors();

verify(singleFactory).call();
}

@Test
public void deferShouldSubscribeSubscriberToSingleFromFactoryFuncAndEmitError() throws Exception {
Callable<Single<Object>> singleFactory = mock(Callable.class);
Throwable error = new IllegalStateException();
Single<Object> single = Single.error(error);

when(singleFactory.call()).thenReturn(single);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Single
.defer(singleFactory)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(error);

verify(singleFactory).call();
}

@Test
public void deferShouldPassErrorFromSingleFactoryToTheSubscriber() throws Exception {
Callable<Single<Object>> singleFactory = mock(Callable.class);
Throwable errorFromSingleFactory = new IllegalStateException();
when(singleFactory.call()).thenThrow(errorFromSingleFactory);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Single
.defer(singleFactory)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(errorFromSingleFactory);

verify(singleFactory).call();
}

@Test
public void deferShouldCallSingleFactoryForEachSubscriber() throws Exception {
Callable<Single<String>> singleFactory = mock(Callable.class);

String[] values = {"1", "2", "3"};
final Single[] singles = new Single[]{Single.just(values[0]), Single.just(values[1]), Single.just(values[2])};

final AtomicInteger singleFactoryCallsCounter = new AtomicInteger();

when(singleFactory.call()).thenAnswer(new Answer<Single<String>>() {
@Override
public Single<String> answer(InvocationOnMock invocation) throws Throwable {
return singles[singleFactoryCallsCounter.getAndIncrement()];
}
});

Single<String> deferredSingle = Single.defer(singleFactory);

for (int i = 0; i < singles.length; i ++) {
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

deferredSingle.subscribe(testSubscriber);

testSubscriber.assertValue(values[i]);
testSubscriber.assertNoErrors();
}

verify(singleFactory, times(3)).call();
}

@Test
public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryIsNull() {
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Single
.defer(null)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(NullPointerException.class);
}


@Test
public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryReturnsNull() throws Exception {
Callable<Single<Object>> singleFactory = mock(Callable.class);
when(singleFactory.call()).thenReturn(null);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Single
.defer(singleFactory)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(NullPointerException.class);

verify(singleFactory).call();
}
}