From 5d94e52679110fde7fe5bf04f8514c371e26b538 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 9 Dec 2013 15:40:30 +0800 Subject: [PATCH 1/3] Implement the 'Start' operator --- rxjava-core/src/main/java/rx/Observable.java | 74 +++++++++++++++++++ .../src/test/java/rx/ObservableTests.java | 65 ++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index cf92fb12cd..1eaa75611d 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -118,6 +118,7 @@ import rx.util.Timestamped; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Async; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -6269,4 +6270,77 @@ public Observable> groupByUntil(Fun public Observable> groupByUntil(Func1 keySelector, Func1 valueSelector, Func1, ? extends Observable> durationSelector) { return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } + + /** + * Invokes the action asynchronously, surfacing the result through an observable sequence. + *

+ * Note: The action is called immediately, not during the subscription of the resulting + * sequence. Multiple subscriptions to the resulting sequence can observe the + * action's outcome. + * + * @param action + * Action to run asynchronously. + * @return An observable sequence exposing a null value upon completion of the action, + * or an exception. + * @see MSDN: Observable.Start + */ + public static Observable start(Action0 action) { + return Async.toAsync(action).call(); + } + + /** + * Invokes the action asynchronously on the specified scheduler, surfacing the + * result through an observable sequence. + *

+ * Note: The action is called immediately, not during the subscription of the resulting + * sequence. Multiple subscriptions to the resulting sequence can observe the + * action's outcome. + * + * @param action + * Action to run asynchronously. + * @param scheduler + * Scheduler to run the function on. + * @return An observable sequence exposing a null value upon completion of the action, + * or an exception. + * @see MSDN: Observable.Start + */ + public static Observable start(Action0 action, Scheduler scheduler) { + return Async.toAsync(action, scheduler).call(); + } + + /** + * Invokes the specified function asynchronously, surfacing the result through an observable sequence. + *

+ * Note: The function is called immediately, not during the subscription of the resulting + * sequence. Multiple subscriptions to the resulting sequence can observe the + * function's result. + * + * @param func + * Function to run asynchronously. + * @return An observable sequence exposing the function's result value, or an exception. + * @see MSDN: Observable.Start + */ + public static Observable start(Func0 func) { + return Async.toAsync(func).call(); + } + + /** + * Invokes the specified function asynchronously on the specified scheduler, surfacing + * the result through an observable sequence. + *

+ * Note: The function is called immediately, not during the subscription of the resulting + * sequence. Multiple subscriptions to the resulting sequence can observe the + * function's result. + * + * @param func + * Function to run asynchronously. + * @param scheduler + * Scheduler to run the function on. + * @return An observable sequence exposing the function's result value, or an exception. + * @see MSDN: Observable.Start + */ + public static Observable start(Func0 func, Scheduler scheduler) { + return Async.toAsync(func, scheduler).call(); + } + } diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index e4023f44ab..50661238b6 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -33,6 +33,8 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import rx.Observable.OnSubscribeFunc; import rx.concurrency.TestScheduler; @@ -41,6 +43,7 @@ import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -947,4 +950,66 @@ public void testRangeWithScheduler() { inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } + + @Test + public void testStartWithAction() { + TestScheduler scheduler = new TestScheduler(); + + Action0 action = mock(Action0.class); + Observable observable = Observable.start(action, scheduler); + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + assertEquals(null, observable.toBlockingObservable().single()); + assertEquals(null, observable.toBlockingObservable().single()); + verify(action, times(1)).call(); + } + + @Test(expected = RuntimeException.class) + public void testStartWithActionError() { + Action0 action = new Action0() { + @Override + public void call() { + throw new RuntimeException("Some error"); + } + }; + + Observable observable = Observable.start(action); + observable.toBlockingObservable().single(); + } + + @Test + public void testStartWithFunc() { + TestScheduler scheduler = new TestScheduler(); + + @SuppressWarnings("unchecked") + Func0 func = (Func0) mock(Func0.class); + doAnswer(new Answer() { + + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return "one"; + } + + }).when(func).call(); + + Observable observable = Observable.start(func, scheduler); + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + assertEquals("one", observable.toBlockingObservable().single()); + assertEquals("one", observable.toBlockingObservable().single()); + verify(func, times(1)).call(); + } + + @Test(expected = RuntimeException.class) + public void testStartWithFuncError() { + Func0 func = new Func0() { + @Override + public String call() { + throw new RuntimeException("Some error"); + } + }; + + Observable observable = Observable.start(func); + observable.toBlockingObservable().single(); + } } \ No newline at end of file From 46544c38c74e9df451a880ceaf460ad786d2c26e Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 10 Dec 2013 22:45:02 +0800 Subject: [PATCH 2/3] Add more unit tests --- .../src/test/java/rx/ObservableTests.java | 213 ++++++++++++++++-- 1 file changed, 188 insertions(+), 25 deletions(-) diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 50661238b6..6e703a57ce 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -953,15 +953,8 @@ public void testRangeWithScheduler() { @Test public void testStartWithAction() { - TestScheduler scheduler = new TestScheduler(); - Action0 action = mock(Action0.class); - Observable observable = Observable.start(action, scheduler); - scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); - - assertEquals(null, observable.toBlockingObservable().single()); - assertEquals(null, observable.toBlockingObservable().single()); - verify(action, times(1)).call(); + assertEquals(null, Observable.start(action).toBlockingObservable().single()); } @Test(expected = RuntimeException.class) @@ -972,44 +965,214 @@ public void call() { throw new RuntimeException("Some error"); } }; + Observable.start(action).toBlockingObservable().single(); + } + + @Test + public void testStartWhenSubscribeRunBeforeAction() { + TestScheduler scheduler = new TestScheduler(); + + Action0 action = mock(Action0.class); + + Observable observable = Observable.start(action, scheduler); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verifyNoMoreInteractions(); + + // Run action + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - Observable observable = Observable.start(action); - observable.toBlockingObservable().single(); + inOrder.verify(observer, times(1)).onNext(null); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); } @Test - public void testStartWithFunc() { + public void testStartWhenSubscribeRunAfterAction() { TestScheduler scheduler = new TestScheduler(); + Action0 action = mock(Action0.class); + + Observable observable = Observable.start(action, scheduler); + + // Run action + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + @SuppressWarnings("unchecked") - Func0 func = (Func0) mock(Func0.class); - doAnswer(new Answer() { + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(null); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testStartWithActionAndMultipleObservers() { + TestScheduler scheduler = new TestScheduler(); + Action0 action = mock(Action0.class); + + Observable observable = Observable.start(action, scheduler); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer3 = mock(Observer.class); + + observable.subscribe(observer1); + observable.subscribe(observer2); + observable.subscribe(observer3); + + InOrder inOrder; + inOrder = inOrder(observer1); + inOrder.verify(observer1, times(1)).onNext(null); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer2); + inOrder.verify(observer2, times(1)).onNext(null); + inOrder.verify(observer2, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer3); + inOrder.verify(observer3, times(1)).onNext(null); + inOrder.verify(observer3, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(action, times(1)).call(); + } + + @Test + public void testStartWithFunc() { + Func0 func = new Func0() { @Override - public String answer(InvocationOnMock invocation) throws Throwable { + public String call() { return "one"; } + }; + assertEquals("one", Observable.start(func).toBlockingObservable().single()); + } - }).when(func).call(); + @Test(expected = RuntimeException.class) + public void testStartWithFuncError() { + Func0 func = new Func0() { + @Override + public String call() { + throw new RuntimeException("Some error"); + } + }; + Observable.start(func).toBlockingObservable().single(); + } + + @Test + public void testStartWhenSubscribeRunBeforeFunc() { + TestScheduler scheduler = new TestScheduler(); + + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; Observable observable = Observable.start(func, scheduler); - scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); - assertEquals("one", observable.toBlockingObservable().single()); - assertEquals("one", observable.toBlockingObservable().single()); - verify(func, times(1)).call(); + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verifyNoMoreInteractions(); + + // Run func + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); } - @Test(expected = RuntimeException.class) - public void testStartWithFuncError() { + @Test + public void testStartWhenSubscribeRunAfterFunc() { + TestScheduler scheduler = new TestScheduler(); + Func0 func = new Func0() { @Override public String call() { - throw new RuntimeException("Some error"); + return "one"; } }; - Observable observable = Observable.start(func); - observable.toBlockingObservable().single(); + Observable observable = Observable.start(func, scheduler); + + // Run func + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); } -} \ No newline at end of file + + @Test + public void testStartWithFuncAndMultipleObservers() { + TestScheduler scheduler = new TestScheduler(); + + @SuppressWarnings("unchecked") + Func0 func = (Func0) mock(Func0.class); + doAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return "one"; + } + }).when(func).call(); + + Observable observable = Observable.start(func, scheduler); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer3 = mock(Observer.class); + + observable.subscribe(observer1); + observable.subscribe(observer2); + observable.subscribe(observer3); + + InOrder inOrder; + inOrder = inOrder(observer1); + inOrder.verify(observer1, times(1)).onNext("one"); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer2); + inOrder.verify(observer2, times(1)).onNext("one"); + inOrder.verify(observer2, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer3); + inOrder.verify(observer3, times(1)).onNext("one"); + inOrder.verify(observer3, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(func, times(1)).call(); + } + +} From c799e525680e95744a129b427df16791bfad02a1 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 11 Dec 2013 11:05:44 +0800 Subject: [PATCH 3/3] Remove the Action0 overloads --- rxjava-core/src/main/java/rx/Observable.java | 37 ------- .../src/test/java/rx/ObservableTests.java | 101 ------------------ 2 files changed, 138 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 1eaa75611d..999d5b413a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -6271,43 +6271,6 @@ public Observable> gro return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } - /** - * Invokes the action asynchronously, surfacing the result through an observable sequence. - *

- * Note: The action is called immediately, not during the subscription of the resulting - * sequence. Multiple subscriptions to the resulting sequence can observe the - * action's outcome. - * - * @param action - * Action to run asynchronously. - * @return An observable sequence exposing a null value upon completion of the action, - * or an exception. - * @see MSDN: Observable.Start - */ - public static Observable start(Action0 action) { - return Async.toAsync(action).call(); - } - - /** - * Invokes the action asynchronously on the specified scheduler, surfacing the - * result through an observable sequence. - *

- * Note: The action is called immediately, not during the subscription of the resulting - * sequence. Multiple subscriptions to the resulting sequence can observe the - * action's outcome. - * - * @param action - * Action to run asynchronously. - * @param scheduler - * Scheduler to run the function on. - * @return An observable sequence exposing a null value upon completion of the action, - * or an exception. - * @see MSDN: Observable.Start - */ - public static Observable start(Action0 action, Scheduler scheduler) { - return Async.toAsync(action, scheduler).call(); - } - /** * Invokes the specified function asynchronously, surfacing the result through an observable sequence. *

diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 6e703a57ce..9c5c7e1e49 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -951,107 +951,6 @@ public void testRangeWithScheduler() { inOrder.verifyNoMoreInteractions(); } - @Test - public void testStartWithAction() { - Action0 action = mock(Action0.class); - assertEquals(null, Observable.start(action).toBlockingObservable().single()); - } - - @Test(expected = RuntimeException.class) - public void testStartWithActionError() { - Action0 action = new Action0() { - @Override - public void call() { - throw new RuntimeException("Some error"); - } - }; - Observable.start(action).toBlockingObservable().single(); - } - - @Test - public void testStartWhenSubscribeRunBeforeAction() { - TestScheduler scheduler = new TestScheduler(); - - Action0 action = mock(Action0.class); - - Observable observable = Observable.start(action, scheduler); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - observable.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verifyNoMoreInteractions(); - - // Run action - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - inOrder.verify(observer, times(1)).onNext(null); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testStartWhenSubscribeRunAfterAction() { - TestScheduler scheduler = new TestScheduler(); - - Action0 action = mock(Action0.class); - - Observable observable = Observable.start(action, scheduler); - - // Run action - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - observable.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onNext(null); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testStartWithActionAndMultipleObservers() { - TestScheduler scheduler = new TestScheduler(); - - Action0 action = mock(Action0.class); - - Observable observable = Observable.start(action, scheduler); - - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - @SuppressWarnings("unchecked") - Observer observer1 = mock(Observer.class); - @SuppressWarnings("unchecked") - Observer observer2 = mock(Observer.class); - @SuppressWarnings("unchecked") - Observer observer3 = mock(Observer.class); - - observable.subscribe(observer1); - observable.subscribe(observer2); - observable.subscribe(observer3); - - InOrder inOrder; - inOrder = inOrder(observer1); - inOrder.verify(observer1, times(1)).onNext(null); - inOrder.verify(observer1, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - inOrder = inOrder(observer2); - inOrder.verify(observer2, times(1)).onNext(null); - inOrder.verify(observer2, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - inOrder = inOrder(observer3); - inOrder.verify(observer3, times(1)).onNext(null); - inOrder.verify(observer3, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - verify(action, times(1)).call(); - } - @Test public void testStartWithFunc() { Func0 func = new Func0() {