Skip to content

Commit

Permalink
Merge pull request ReactiveX#537 from landonf/scala-do-operator
Browse files Browse the repository at this point in the history
Add scala adapters for doOnEach operator.
  • Loading branch information
benjchristensen committed Dec 4, 2013
2 parents 00c565d + 87bb561 commit 9398623
Showing 1 changed file with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1818,6 +1818,65 @@ trait Observable[+T]
new WithFilter[T](p, asJavaObservable)
}

/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable.
*
* @param observer the observer
*
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(observer: Observer[T]): Observable[T] = {
Observable[T](asJavaObservable.doOnEach(observer.asJavaObserver))
}

/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable.
*
* @param onNext this function will be called whenever the Observable emits an item
*
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit): Observable[T] = {
Observable[T](asJavaObservable.doOnEach(
onNext
))
}

/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable.
*
* @param onNext this function will be called whenever the Observable emits an item
* @param onError this function will be called if an error occurs
*
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = {
Observable[T](asJavaObservable.doOnEach(
onNext,
onError
))
}

/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable.
*
* @param onNext this function will be called whenever the Observable emits an item
* @param onError this function will be called if an error occurs
* @param onCompleted the action to invoke when the source Observable calls
*
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = {
Observable[T](asJavaObservable.doOnEach(
onNext,
onError,
onCompleted
))
}
}

/**
Expand Down

0 comments on commit 9398623

Please sign in to comment.