Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the 'Start' operator #594

Merged
merged 3 commits into from
Dec 11, 2013
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import rx.util.Timestamped;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Async;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
Expand Down Expand Up @@ -6269,4 +6270,77 @@ public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Fun
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector) {
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
}

/**
* Invokes the action asynchronously, surfacing the result through an observable sequence.
* <p>
* Note: The action is called immediately, not during the subscription of the resulting
* sequence. Multiple subscriptions to the resulting sequence can observe the
* action's outcome.
*
* @param action
* Action to run asynchronously.
* @return An observable sequence exposing a null value upon completion of the action,
* or an exception.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229265(v=vs.103).aspx">MSDN: Observable.Start</a>
*/
public static Observable<Void> start(Action0 action) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have the Func0 version, do we need this Action0 overload? The issue is that dynamic languages can't differentiate between the two.

If someone truly has a Void returning action (which to me should be a rarity) then they can put Func0<Void>.

I'd recommend we remove the Action0 overloads and only have Func0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I removed them.

return Async.toAsync(action).call();
}

/**
* Invokes the action asynchronously on the specified scheduler, surfacing the
* result through an observable sequence.
* <p>
* Note: The action is called immediately, not during the subscription of the resulting
* sequence. Multiple subscriptions to the resulting sequence can observe the
* action's outcome.
*
* @param action
* Action to run asynchronously.
* @param scheduler
* Scheduler to run the function on.
* @return An observable sequence exposing a null value upon completion of the action,
* or an exception.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211971(v=vs.103).aspx">MSDN: Observable.Start</a>
*/
public static Observable<Void> start(Action0 action, Scheduler scheduler) {
return Async.toAsync(action, scheduler).call();
}

/**
* Invokes the specified function asynchronously, surfacing the result through an observable sequence.
* <p>
* Note: The function is called immediately, not during the subscription of the resulting
* sequence. Multiple subscriptions to the resulting sequence can observe the
* function's result.
*
* @param func
* Function to run asynchronously.
* @return An observable sequence exposing the function's result value, or an exception.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229036(v=vs.103).aspx">MSDN: Observable.Start</a>
*/
public static <T> Observable<T> start(Func0<T> func) {
return Async.toAsync(func).call();
}

/**
* Invokes the specified function asynchronously on the specified scheduler, surfacing
* the result through an observable sequence.
* <p>
* Note: The function is called immediately, not during the subscription of the resulting
* sequence. Multiple subscriptions to the resulting sequence can observe the
* function's result.
*
* @param func
* Function to run asynchronously.
* @param scheduler
* Scheduler to run the function on.
* @return An observable sequence exposing the function's result value, or an exception.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211721(v=vs.103).aspx">MSDN: Observable.Start</a>
*/
public static <T> Observable<T> start(Func0<T> func, Scheduler scheduler) {
return Async.toAsync(func, scheduler).call();
}

}
230 changes: 229 additions & 1 deletion rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import rx.Observable.OnSubscribeFunc;
import rx.concurrency.TestScheduler;
Expand All @@ -41,6 +43,7 @@
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand Down Expand Up @@ -947,4 +950,229 @@ public void testRangeWithScheduler() {
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}
}

@Test
public void testStartWithAction() {
Action0 action = mock(Action0.class);
assertEquals(null, Observable.start(action).toBlockingObservable().single());
}

@Test(expected = RuntimeException.class)
public void testStartWithActionError() {
Action0 action = new Action0() {
@Override
public void call() {
throw new RuntimeException("Some error");
}
};
Observable.start(action).toBlockingObservable().single();
}

@Test
public void testStartWhenSubscribeRunBeforeAction() {
TestScheduler scheduler = new TestScheduler();

Action0 action = mock(Action0.class);

Observable<Void> observable = Observable.start(action, scheduler);

@SuppressWarnings("unchecked")
Observer<Void> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verifyNoMoreInteractions();

// Run action
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

inOrder.verify(observer, times(1)).onNext(null);
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWhenSubscribeRunAfterAction() {
TestScheduler scheduler = new TestScheduler();

Action0 action = mock(Action0.class);

Observable<Void> observable = Observable.start(action, scheduler);

// Run action
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

@SuppressWarnings("unchecked")
Observer<Void> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(null);
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWithActionAndMultipleObservers() {
TestScheduler scheduler = new TestScheduler();

Action0 action = mock(Action0.class);

Observable<Void> observable = Observable.start(action, scheduler);

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

@SuppressWarnings("unchecked")
Observer<Void> observer1 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Void> observer2 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Void> observer3 = mock(Observer.class);

observable.subscribe(observer1);
observable.subscribe(observer2);
observable.subscribe(observer3);

InOrder inOrder;
inOrder = inOrder(observer1);
inOrder.verify(observer1, times(1)).onNext(null);
inOrder.verify(observer1, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

inOrder = inOrder(observer2);
inOrder.verify(observer2, times(1)).onNext(null);
inOrder.verify(observer2, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

inOrder = inOrder(observer3);
inOrder.verify(observer3, times(1)).onNext(null);
inOrder.verify(observer3, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

verify(action, times(1)).call();
}

@Test
public void testStartWithFunc() {
Func0<String> func = new Func0<String>() {
@Override
public String call() {
return "one";
}
};
assertEquals("one", Observable.start(func).toBlockingObservable().single());
}

@Test(expected = RuntimeException.class)
public void testStartWithFuncError() {
Func0<String> func = new Func0<String>() {
@Override
public String call() {
throw new RuntimeException("Some error");
}
};
Observable.start(func).toBlockingObservable().single();
}

@Test
public void testStartWhenSubscribeRunBeforeFunc() {
TestScheduler scheduler = new TestScheduler();

Func0<String> func = new Func0<String>() {
@Override
public String call() {
return "one";
}
};

Observable<String> observable = Observable.start(func, scheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verifyNoMoreInteractions();

// Run func
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWhenSubscribeRunAfterFunc() {
TestScheduler scheduler = new TestScheduler();

Func0<String> func = new Func0<String>() {
@Override
public String call() {
return "one";
}
};

Observable<String> observable = Observable.start(func, scheduler);

// Run func
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWithFuncAndMultipleObservers() {
TestScheduler scheduler = new TestScheduler();

@SuppressWarnings("unchecked")
Func0<String> func = (Func0<String>) mock(Func0.class);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
return "one";
}
}).when(func).call();

Observable<String> observable = Observable.start(func, scheduler);

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

@SuppressWarnings("unchecked")
Observer<String> observer1 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<String> observer2 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<String> observer3 = mock(Observer.class);

observable.subscribe(observer1);
observable.subscribe(observer2);
observable.subscribe(observer3);

InOrder inOrder;
inOrder = inOrder(observer1);
inOrder.verify(observer1, times(1)).onNext("one");
inOrder.verify(observer1, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

inOrder = inOrder(observer2);
inOrder.verify(observer2, times(1)).onNext("one");
inOrder.verify(observer2, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

inOrder = inOrder(observer3);
inOrder.verify(observer3, times(1)).onNext("one");
inOrder.verify(observer3, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

verify(func, times(1)).call();
}

}