Skip to content

Commit

Permalink
Rename operations that create InputDStreams
Browse files Browse the repository at this point in the history
Instead of displaying something like "createStream", we will
display something more descriptive like "kafka direct stream"
as of this commit.
  • Loading branch information
Andrew Or committed May 14, 2015
1 parent 7c4513d commit 5703939
Show file tree
Hide file tree
Showing 15 changed files with 41 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private[ui] object RDDOperationGraph extends Logging {

/** Return the dot representation of a node in an RDDOperationGraph. */
private def makeDotNode(node: RDDOperationNode): String = {
s"""${node.id} [label="${node.name} (${node.id})"]"""
s"""${node.id} [label="${node.name} [${node.id}]"]"""
}

/** Return the dot representation of a subgraph in an RDDOperationGraph. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class FlumeInputDStream[T: ClassTag](
enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {

protected override val customScopeName: Option[String] = Some(s"input stream [$id]")

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel, enableDecompression)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag](
storageLevel: StorageLevel
) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {

protected override val customScopeName: Option[String] = Some(s"flume polling stream [$id]")

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ class DirectKafkaInputDStream[
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(

private val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

protected override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]")

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class KafkaInputDStream[
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

protected override val customScopeName: Option[String] = Some(s"kafka stream [$id]")

def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class MQTTInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) {

protected override val customScopeName: Option[String] = Some(s"MQTT stream [$id]")

def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class TwitterInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[Status](ssc_) {

protected override val customScopeName: Option[String] = Some(s"twitter stream [$id]")

private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.reflect.ClassTag
class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
extends InputDStream[T](ssc_) {

protected override val customScopeName: Option[String] = Some(s"constant stream [$id]")

override def start() {}

override def stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,28 @@ abstract class DStream[T: ClassTag] (
*
* This is not defined if the DStream is created outside of one of the public DStream operations.
*/
private val baseScope: Option[String] = {
protected val baseScope: Option[String] = {
Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
}

/**
* An optional custom name for all scopes generated by this DStream.
* If None, the name of the operation that created this DStream will be used.
*/
protected val customScopeName: Option[String] = None

/**
* Make a scope that groups RDDs created in the same DStream operation in the same batch.
*
* Each DStream produces many scopes and each scope may be shared by other DStreams created
* in the same operation. Separate calls to the same DStream operation create separate scopes.
* For instance, `dstream.map(...).map(...)` creates two separate scopes per batch.
*/
protected def makeScope(time: Time): Option[RDDOperationScope] = {
private def makeScope(time: Time): Option[RDDOperationScope] = {
baseScope.map { bsJson =>
val formattedBatchTime = UIUtils.formatBatchTime(time.milliseconds)
val bscope = RDDOperationScope.fromJson(bsJson)
val baseName = bscope.name // e.g. countByWindow
val baseName = customScopeName.getOrElse(bscope.name) // e.g. countByWindow
val scopeName =
if (baseName.length > 10) {
// If the operation name is too long, wrap the line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null

protected override val customScopeName: Option[String] = Some(s"file stream [$id]")

override def start() { }

override def stop() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.streaming.dstream

import org.apache.spark.streaming.{Time, Duration, StreamingContext}

import scala.reflect.ClassTag

import org.apache.spark.streaming.{Time, Duration, StreamingContext}

/**
* This is the abstract base class for all input streams. This class provides methods
* start() and stop() which is called by Spark Streaming system to start and stop receiving data.
Expand Down Expand Up @@ -49,6 +49,9 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
*/
private[streaming] def name: String = s"${getClass.getSimpleName}-$id"

/** Human-friendly scope name to use in place of generic operation names (e.g. createStream). */
protected override val customScopeName: Option[String] = Some(s"input stream [$id]")

/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
* Additionally it also ensures valid times are in strictly increasing order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class QueueInputDStream[T: ClassTag](
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {

protected override val customScopeName: Option[String] = Some(s"queue stream [$id]")

override def start() { }

override def stop() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class RawInputDStream[T: ClassTag](
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_ ) with Logging {

protected override val customScopeName: Option[String] = Some(s"raw stream [$id]")

def getReceiver(): Receiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

protected override val customScopeName: Option[String] = Some(s"receiver stream [$id]")

/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class SocketInputDStream[T: ClassTag](
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {

protected override val customScopeName: Option[String] = Some(s"socket stream [$id]")

def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
Expand Down

0 comments on commit 5703939

Please sign in to comment.