Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replay additional overloads #612

Merged
merged 2 commits into from
Dec 12, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 339 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallel;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
Expand Down Expand Up @@ -526,6 +527,24 @@ public <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends R> su
return OperationMulticast.multicast(this, subject);
}

/**
* Returns an observable sequence that contains the elements of a sequence
* produced by multicasting the source sequence within a selector function.
*
* @param subjectFactory the subject factory
* @param selector The selector function which can use the multicasted
* source sequence subject to the policies enforced by the
* created subject.
* @return the Observable sequence that contains the elements of a sequence
* produced by multicasting the source sequence within a selector function.
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229708.aspx'>MSDN: Observable.Multicast</a>
*/
public <TIntermediate, TResult> Observable<TResult> multicast(
final Func0<? extends Subject<? super T, ? extends TIntermediate>> subjectFactory,
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
return OperationMulticast.multicast(this, subjectFactory, selector);
}
/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from
* onError.
Expand Down Expand Up @@ -4277,7 +4296,327 @@ public <R> Observable<List<T>> maxBy(Func1<T, R> selector, Comparator<? super R>
public ConnectableObservable<T> replay() {
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription
* to the underlying Observable that will replay all of its items and
* notifications to any future {@link Observer} on the given scheduler.
*
* @param scheduler the scheduler where the Observers will receive the events
* @return a {@link ConnectableObservable} that shares a single subscription
* to the underlying Observable that will replay all of its items and
* notifications to any future {@link Observer} on the given scheduler
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211699.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(Scheduler scheduler) {
return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject.<T>create(), scheduler));
}

/**
* Returns a connectable observable sequence that shares a single subscription
* to the underlying sequence replaying bufferSize notifications.
*
* @param bufferSize the buffer size
* @return a connectable observable sequence that shares a single subscription
* to the underlying sequence replaying bufferSize notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211976.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(int bufferSize) {
return OperationMulticast.multicast(this, OperationReplay.<T>replayBuffered(bufferSize));
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications.
*
* @param bufferSize the buffer size
* @param scheduler the scheduler where the Observers will receive the events
* @return a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229814.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
return OperationMulticast.multicast(this,
OperationReplay.createScheduledSubject(
OperationReplay.<T>replayBuffered(bufferSize), scheduler));
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window.
*
* @param time the window length
* @param unit the window length time unit
* @return a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229232.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(long time, TimeUnit unit) {
return replay(time, unit, Schedulers.threadPoolForComputation());
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window.
*
* @param time the window length
* @param unit the window length time unit
* @param scheduler the scheduler which is used as a time source for the window
* @return a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211811.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
return OperationMulticast.multicast(this, OperationReplay.<T>replayWindowed(time, unit, -1, scheduler));
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications within window.
*
* @param bufferSize the buffer size
* @param time the window length
* @param unit the window length time unit
* @return Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229874.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit) {
return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation());
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications within window.
*
* @param bufferSize the buffer size
* @param time the window length
* @param unit the window length time unit
* @param scheduler the scheduler which is used as a time source for the window
* @return a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211759.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
if (bufferSize < 0) {
throw new IllegalArgumentException("bufferSize < 0");
}
return OperationMulticast.multicast(this, OperationReplay.<T>replayWindowed(time, unit, bufferSize, scheduler));
}

/**
* Returns an observable sequence that is the result of invoking the selector
* on a connectable observable sequence that shares a single subscription to
* the underlying sequence and starts with initial value.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @return an observable sequence that is the result of invoking the selector
* on a connectable observable sequence that shares a single subscription to
* the underlying sequence and starts with initial value
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229653.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return ReplaySubject.create();
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param scheduler the scheduler where the replay is observed
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211644.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final Scheduler scheduler) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.createScheduledSubject(ReplaySubject.<T>create(), scheduler);
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param bufferSize the buffer size
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211675.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.replayBuffered(bufferSize);
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param bufferSize the buffer size
* @param scheduler the scheduler where the replay is observed
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229928.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final Scheduler scheduler) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.<T>createScheduledSubject(OperationReplay.<T>replayBuffered(bufferSize), scheduler);
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking
* the selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param time the window length
* @param unit the window length time unit
* @return an observable sequence that is the result of invoking
* the selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229526.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, long time, TimeUnit unit) {
return replay(selector, time, unit, Schedulers.threadPoolForComputation());
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param time the window length
* @param unit the window length time unit
* @param scheduler the scheduler which is used as a time source for the window
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh244327.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.replayWindowed(time, unit, -1, scheduler);
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
* within window.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param bufferSize the buffer size
* @param time the window length
* @param unit the window length time unit
*
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
* within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh228952.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, int bufferSize, long time, TimeUnit unit) {
return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation());
}


/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
* within window.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param bufferSize the buffer size
* @param time the window length
* @param unit the window length time unit
* @param scheduler the scheduler which is used as a time source for the window
*
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
* within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229404.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
if (bufferSize < 0) {
throw new IllegalArgumentException("bufferSize < 0");
}
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler);
}
}, selector);
}

/**
* Retry subscription to the source Observable when it calls
* <code>onError</code> up to a certain number of retries.
Expand Down
Loading