Skip to content

Commit

Permalink
Review comments and added a reset call in ReceiverTrackerTest.
Browse files Browse the repository at this point in the history
  • Loading branch information
dragos committed Jul 24, 2015
1 parent e57c66b commit 715437a
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
val id = ssc.getNewInputStreamId()

// Keep track of the freshest rate for this stream using the rateEstimator
protected[streaming] val rateController: Option[RateController] =
RateEstimator.makeEstimator(ssc.conf).map { estimator =>
new RateController(id, estimator) {
override def publish(rate: Long): Unit = ()
}
}
protected[streaming] val rateController: Option[RateController] = None

/** A human-readable name of this InputDStream */
private[streaming] def name: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override protected[streaming] val rateController: Option[RateController] =
RateEstimator.makeEstimator(ssc.conf).map { new ReceiverRateController(id, _) }
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }

/**
* Gets the receiver object that will be sent to the worker nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
eventLoop.start()

// Estimators receive updates from batch completion
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.ThreadUtils

Expand All @@ -29,8 +31,8 @@ import org.apache.spark.util.ThreadUtils
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
*/
private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {

protected def publish(rate: Long): Unit

Expand All @@ -46,8 +48,8 @@ private [streaming] abstract class RateController(val streamUID: Int, rateEstima
*/
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newSpeed = rateEstimator.compute(time, elems, workDelay, waitDelay)
newSpeed foreach { s =>
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ object RateEstimator {
* @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
* known estimators.
*/
def makeEstimator(conf: SparkConf): Option[RateEstimator] =
conf.getOption("spark.streaming.RateEstimator") map { estimator =>
def create(conf: SparkConf): Option[RateEstimator] =
conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator =>
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {

ssc.addStreamingListener(ReceiverStartedWaiter)
ssc.scheduler.listenerBus.start(ssc.sc)
SingletonDummyReceiver.reset()

val newRateLimit = 100L
val inputDStream = new RateLimitInputDStream(ssc)
Expand All @@ -109,7 +110,14 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}

/** An input DStream with a hard-coded receiver that gives access to internals for testing. */
/**
* An input DStream with a hard-coded receiver that gives access to internals for testing.
*
* @note Make sure to call {{{SingletonDummyReceiver.reset()}}} before using this in a test,
* or otherwise you may get {{{NotSerializableException}}} when trying to serialize
* the receiver.
* @see [[[SingletonDummyReceiver]]].
*/
private class RateLimitInputDStream(@transient ssc_ : StreamingContext)
extends ReceiverInputDStream[Int](ssc_) {

Expand Down

0 comments on commit 715437a

Please sign in to comment.