diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index cf92fb12cd..999d5b413a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -118,6 +118,7 @@ import rx.util.Timestamped; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Async; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -6269,4 +6270,40 @@ public Observable> groupByUntil(Fun public Observable> groupByUntil(Func1 keySelector, Func1 valueSelector, Func1, ? extends Observable> durationSelector) { return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } + + /** + * Invokes the specified function asynchronously, surfacing the result through an observable sequence. + *

+ * Note: The function is called immediately, not during the subscription of the resulting + * sequence. Multiple subscriptions to the resulting sequence can observe the + * function's result. + * + * @param func + * Function to run asynchronously. + * @return An observable sequence exposing the function's result value, or an exception. + * @see MSDN: Observable.Start + */ + public static Observable start(Func0 func) { + return Async.toAsync(func).call(); + } + + /** + * Invokes the specified function asynchronously on the specified scheduler, surfacing + * the result through an observable sequence. + *

+ * Note: The function is called immediately, not during the subscription of the resulting + * sequence. Multiple subscriptions to the resulting sequence can observe the + * function's result. + * + * @param func + * Function to run asynchronously. + * @param scheduler + * Scheduler to run the function on. + * @return An observable sequence exposing the function's result value, or an exception. + * @see MSDN: Observable.Start + */ + public static Observable start(Func0 func, Scheduler scheduler) { + return Async.toAsync(func, scheduler).call(); + } + } diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index e4023f44ab..9c5c7e1e49 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -33,6 +33,8 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import rx.Observable.OnSubscribeFunc; import rx.concurrency.TestScheduler; @@ -41,6 +43,7 @@ import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -947,4 +950,128 @@ public void testRangeWithScheduler() { inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } -} \ No newline at end of file + + @Test + public void testStartWithFunc() { + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + assertEquals("one", Observable.start(func).toBlockingObservable().single()); + } + + @Test(expected = RuntimeException.class) + public void testStartWithFuncError() { + Func0 func = new Func0() { + @Override + public String call() { + throw new RuntimeException("Some error"); + } + }; + Observable.start(func).toBlockingObservable().single(); + } + + @Test + public void testStartWhenSubscribeRunBeforeFunc() { + TestScheduler scheduler = new TestScheduler(); + + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + + Observable observable = Observable.start(func, scheduler); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verifyNoMoreInteractions(); + + // Run func + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testStartWhenSubscribeRunAfterFunc() { + TestScheduler scheduler = new TestScheduler(); + + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + + Observable observable = Observable.start(func, scheduler); + + // Run func + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testStartWithFuncAndMultipleObservers() { + TestScheduler scheduler = new TestScheduler(); + + @SuppressWarnings("unchecked") + Func0 func = (Func0) mock(Func0.class); + doAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return "one"; + } + }).when(func).call(); + + Observable observable = Observable.start(func, scheduler); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer3 = mock(Observer.class); + + observable.subscribe(observer1); + observable.subscribe(observer2); + observable.subscribe(observer3); + + InOrder inOrder; + inOrder = inOrder(observer1); + inOrder.verify(observer1, times(1)).onNext("one"); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer2); + inOrder.verify(observer2, times(1)).onNext("one"); + inOrder.verify(observer2, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer3); + inOrder.verify(observer3, times(1)).onNext("one"); + inOrder.verify(observer3, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(func, times(1)).call(); + } + +}