Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ConnectableObservable #717

Merged
merged 1 commit into from
Jan 3, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def olympicsExample() {
val (go, medals) = Olympics.mountainBikeMedals.publish
val medals = Olympics.mountainBikeMedals.publish
medals.subscribe(println(_))
go()
medals.connect
//waitFor(medals)
}

Expand All @@ -257,10 +257,10 @@ class RxScalaDemo extends JUnitSuite {

@Test def exampleWithPublish() {
val unshared = List(1 to 4).toObservable
val (startFunc, shared) = unshared.publish
val shared = unshared.publish
shared.subscribe(n => println(s"subscriber 1 gets $n"))
shared.subscribe(n => println(s"subscriber 2 gets $n"))
startFunc()
shared.connect
}

def doLater(waitTime: Duration, action: () => Unit): Unit = {
Expand All @@ -269,9 +269,9 @@ class RxScalaDemo extends JUnitSuite {

@Test def exampleWithoutReplay() {
val numbers = Observable.interval(1000 millis).take(6)
val (startFunc, sharedNumbers) = numbers.publish
val sharedNumbers = numbers.publish
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
startFunc()
sharedNumbers.connect
// subscriber 2 misses 0, 1, 2!
doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) })
waitFor(sharedNumbers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package rx.lang.scala

import rx.util.functions.FuncN
import rx.Observable.OnSubscribeFunc

import rx.lang.scala.observables.ConnectableObservable


/**
Expand Down Expand Up @@ -1052,12 +1052,10 @@ trait Observable[+T]
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/publishConnect.png">
*
* @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function
* is called, the Observable starts to emit items to its [[rx.lang.scala.Observer]]s
* @return an [[rx.lang.scala.observables.ConnectableObservable]].
*/
def publish: (() => Subscription, Observable[T]) = {
val javaCO = asJavaObservable.publish()
(() => javaCO.connect(), toScalaObservable[T](javaCO))
def publish: ConnectableObservable[T] = {
new ConnectableObservable[T](asJavaObservable.publish())
}

// TODO add Scala-like aggregate function
Expand Down Expand Up @@ -1136,7 +1134,8 @@ trait Observable[+T]
* the initial (seed) accumulator value
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via
* [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
* @return an Observable that emits the results of each call to the accumulator function
*/
def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
Expand All @@ -1145,6 +1144,30 @@ trait Observable[+T]
}))
}

/**
* Returns an Observable that applies a function of your choosing to the
* first item emitted by a source Observable, then feeds the result of that
* function along with the second item emitted by an Observable into the
* same function, and so on until all items have been emitted by the source
* Observable, emitting the result of each of these iterations.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/scan.png">
* <p>
*
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via
* [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
* @return
* an Observable that emits the results of each call to the
* accumulator function
*/
def scan[U >: T](accumulator: (U, U) => U): Observable[U] = {
val func: Func2[_ >: U, _ >: U, _ <: U] = accumulator
val func2 = func.asInstanceOf[Func2[T, T, T]]
toScalaObservable[U](asJavaObservable.asInstanceOf[rx.Observable[T]].scan(func2))
}

/**
* Returns an Observable that emits a Boolean that indicates whether all of the items emitted by
* the source Observable satisfy a condition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rx.lang.scala.observables
import scala.collection.JavaConverters._
import rx.lang.scala.ImplicitFunctionConversions._


/**
* An Observable that provides blocking operators.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.lang.scala.observables

import rx.lang.scala.{Observable, Subscription}
import rx.lang.scala.JavaConversions._

class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observables.ConnectableObservable[_ <: T])
extends Observable[T] {

/**
* Call a ConnectableObservable's connect method to instruct it to begin emitting the
* items from its underlying [[rx.lang.scala.Observable]] to its [[rx.lang.scala.Observer]]s.
*/
def connect: Subscription = toScalaSubscription(asJavaObservable.connect())

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
*
* @return a [[rx.lang.scala.Observable]]
*/
def refCount: Observable[T] = toScalaObservable[T](asJavaObservable.refCount())
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ class ObservableTests extends JUnitSuite {
assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3))
}

@Test def TestScan() {
val xs = Observable.items(0,1,2,3)
val ys = xs.scan(0)(_+_)
assertEquals(List(0,0,1,3,6), ys.toBlockingObservable.toList)
val zs = xs.scan((x: Int, y:Int) => x*y)
assertEquals(List(0, 0, 0, 0), zs.toBlockingObservable.toList)
}

// Test that Java's firstOrDefault propagates errors.
// If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse
// should be changed accordingly.
Expand Down