Skip to content

Commit

Permalink
Merge pull request ReactiveX#594 from zsxwing/start
Browse files Browse the repository at this point in the history
Implement the 'Start' operator
  • Loading branch information
benjchristensen committed Dec 11, 2013
2 parents b435f56 + bfee699 commit f80b09d
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 1 deletion.
37 changes: 37 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import rx.util.Timestamped;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Async;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
Expand Down Expand Up @@ -6329,4 +6330,40 @@ public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Fun
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector) {
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
}

/**
* Invokes the specified function asynchronously, surfacing the result through an observable sequence.
* <p>
* Note: The function is called immediately, not during the subscription of the resulting
* sequence. Multiple subscriptions to the resulting sequence can observe the
* function's result.
*
* @param func
* Function to run asynchronously.
* @return An observable sequence exposing the function's result value, or an exception.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229036(v=vs.103).aspx">MSDN: Observable.Start</a>
*/
public static <T> Observable<T> start(Func0<T> func) {
return Async.toAsync(func).call();
}

/**
* Invokes the specified function asynchronously on the specified scheduler, surfacing
* the result through an observable sequence.
* <p>
* Note: The function is called immediately, not during the subscription of the resulting
* sequence. Multiple subscriptions to the resulting sequence can observe the
* function's result.
*
* @param func
* Function to run asynchronously.
* @param scheduler
* Scheduler to run the function on.
* @return An observable sequence exposing the function's result value, or an exception.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211721(v=vs.103).aspx">MSDN: Observable.Start</a>
*/
public static <T> Observable<T> start(Func0<T> func, Scheduler scheduler) {
return Async.toAsync(func, scheduler).call();
}

}
129 changes: 128 additions & 1 deletion rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import rx.Observable.OnSubscribeFunc;
import rx.schedulers.TestScheduler;
Expand All @@ -41,6 +43,7 @@
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand Down Expand Up @@ -947,4 +950,128 @@ public void testRangeWithScheduler() {
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}
}

@Test
public void testStartWithFunc() {
Func0<String> func = new Func0<String>() {
@Override
public String call() {
return "one";
}
};
assertEquals("one", Observable.start(func).toBlockingObservable().single());
}

@Test(expected = RuntimeException.class)
public void testStartWithFuncError() {
Func0<String> func = new Func0<String>() {
@Override
public String call() {
throw new RuntimeException("Some error");
}
};
Observable.start(func).toBlockingObservable().single();
}

@Test
public void testStartWhenSubscribeRunBeforeFunc() {
TestScheduler scheduler = new TestScheduler();

Func0<String> func = new Func0<String>() {
@Override
public String call() {
return "one";
}
};

Observable<String> observable = Observable.start(func, scheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verifyNoMoreInteractions();

// Run func
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWhenSubscribeRunAfterFunc() {
TestScheduler scheduler = new TestScheduler();

Func0<String> func = new Func0<String>() {
@Override
public String call() {
return "one";
}
};

Observable<String> observable = Observable.start(func, scheduler);

// Run func
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWithFuncAndMultipleObservers() {
TestScheduler scheduler = new TestScheduler();

@SuppressWarnings("unchecked")
Func0<String> func = (Func0<String>) mock(Func0.class);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
return "one";
}
}).when(func).call();

Observable<String> observable = Observable.start(func, scheduler);

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

@SuppressWarnings("unchecked")
Observer<String> observer1 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<String> observer2 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<String> observer3 = mock(Observer.class);

observable.subscribe(observer1);
observable.subscribe(observer2);
observable.subscribe(observer3);

InOrder inOrder;
inOrder = inOrder(observer1);
inOrder.verify(observer1, times(1)).onNext("one");
inOrder.verify(observer1, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

inOrder = inOrder(observer2);
inOrder.verify(observer2, times(1)).onNext("one");
inOrder.verify(observer2, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

inOrder = inOrder(observer3);
inOrder.verify(observer3, times(1)).onNext("one");
inOrder.verify(observer3, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

verify(func, times(1)).call();
}

}

0 comments on commit f80b09d

Please sign in to comment.