Skip to content

Commit

Permalink
Removed DeveloperAPI, removed rateEstimator field, removed Noop rate
Browse files Browse the repository at this point in the history
estimator, changed logic for initialising rate estimator.
  • Loading branch information
dragos committed Jul 23, 2015
1 parent 238cfc6 commit b425d32
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.streaming.{Time, Duration, StreamingContext}
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
import org.apache.spark.streaming.scheduler.RateController
import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.Utils

/**
Expand All @@ -49,26 +49,13 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()

/**
* A rate estimator configured by the user to compute a dynamic ingestion bound for this stream.
* @see `RateEstimator`
*/
protected [streaming] val rateEstimator = newEstimator()

/**
* Return the configured estimator, or `noop` if none was specified.
*/
private def newEstimator() =
ssc.conf.get("spark.streaming.RateEstimator", "noop") match {
case "noop" => new NoopRateEstimator()
case estimator => throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
}


// Keep track of the freshest rate for this stream using the rateEstimator
protected[streaming] val rateController: RateController = new RateController(id, rateEstimator) {
override def publish(rate: Long): Unit = ()
}
protected[streaming] val rateController: Option[RateController] =
RateEstimator.makeEstimator(ssc.conf).map { estimator =>
new RateController(id, estimator) {
override def publish(rate: Long): Unit = ()
}
}

/** A human-readable name of this InputDStream */
private[streaming] def name: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import scala.reflect.ClassTag

import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.streaming.util.WriteAheadLogUtils

/**
Expand All @@ -44,10 +44,13 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override val rateController: RateController = new RateController(id, rateEstimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}
override protected[streaming] val rateController: Option[RateController] =
RateEstimator.makeEstimator(ssc.conf).map { estimator =>
new RateController(id, estimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}
}

/**
* Gets the receiver object that will be sent to the worker nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
eventLoop.start()

// Estimators receive updates from batch completion
ssc.graph.getInputStreams.foreach(is => ssc.addStreamingListener(is.rateController))
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@ import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.ThreadUtils

/**
* :: DeveloperApi ::
* A StreamingListener that receives batch completion updates, and maintains
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
*/
@DeveloperApi
private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.streaming.scheduler.rate

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkConf
import org.apache.spark.SparkException

/**
* :: DeveloperApi ::
* A component that estimates the rate at wich an InputDStream should ingest
* elements, based on updates at every batch completion.
*/
@DeveloperApi
private[streaming] trait RateEstimator extends Serializable {

/**
Expand All @@ -44,14 +43,17 @@ private[streaming] trait RateEstimator extends Serializable {
schedulingDelay: Long): Option[Double]
}

/**
* The trivial rate estimator never sends an update
*/
private[streaming] class NoopRateEstimator extends RateEstimator {
object RateEstimator {

def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double] = None
/**
* Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
*
* @return None if there is no configured estimator, otherwise an instance of RateEstimator
* @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
* known estimators.
*/
def makeEstimator(conf: SparkConf): Option[RateEstimator] =
conf.getOption("spark.streaming.RateEstimator") map { estimator =>
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ private class MockRateLimitDStream(@transient ssc: StreamingContext)
}
}

override val rateController: RateController = new RateController(id, ConstantEstimator) {
override def publish(rate: Long): Unit = {
publishCalls += 1
}
}
override val rateController: Option[RateController] =
Some(new RateController(id, ConstantEstimator) {
override def publish(rate: Long): Unit = {
publishCalls += 1
}
})

def compute(validTime: Time): Option[RDD[Int]] = {
val data = Seq(1)
Expand Down

0 comments on commit b425d32

Please sign in to comment.