diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 6e7dfd7d1092a..0a2b6b28b57a4 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -171,7 +171,7 @@ private[ui] object RDDOperationGraph extends Logging { /** Return the dot representation of a node in an RDDOperationGraph. */ private def makeDotNode(node: RDDOperationNode): String = { - s"""${node.id} [label="${node.name} (${node.id})"]""" + s"""${node.id} [label="${node.name} [${node.id}]"]""" } /** Return the dot representation of a subgraph in an RDDOperationGraph. */ diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 60e2994431b38..8b86bb4e7505d 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -50,6 +50,8 @@ class FlumeInputDStream[T: ClassTag]( enableDecompression: Boolean ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { + protected override val customScopeName: Option[String] = Some(s"input stream [$id]") + override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumeReceiver(host, port, storageLevel, enableDecompression) } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 92fa5b41be89e..59db0cdc9bb82 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -53,6 +53,8 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { + protected override val customScopeName: Option[String] = Some(s"flume polling stream [$id]") + override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 6715aede7928a..cbafb6ad78c97 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -62,9 +62,12 @@ class DirectKafkaInputDStream[ val fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ) extends InputDStream[R](ssc_) with Logging { - val maxRetries = context.sparkContext.getConf.getInt( + + private val maxRetries = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRetries", 1) + protected override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]") + protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index cca0fac0234e1..14f5a5f06d024 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -55,6 +55,8 @@ class KafkaInputDStream[ storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { + protected override val customScopeName: Option[String] = Some(s"kafka stream [$id]") + def getReceiver(): Receiver[(K, V)] = { if (!useReliableReceiver) { new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 3c0ef94cb0fab..e26a16503859b 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -57,6 +57,8 @@ class MQTTInputDStream( storageLevel: StorageLevel ) extends ReceiverInputDStream[String](ssc_) { + protected override val customScopeName: Option[String] = Some(s"MQTT stream [$id]") + def getReceiver(): Receiver[String] = { new MQTTReceiver(brokerUrl, topic, storageLevel) } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 7cf02d85d73d3..2c2d5296b32e2 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -45,6 +45,8 @@ class TwitterInputDStream( storageLevel: StorageLevel ) extends ReceiverInputDStream[Status](ssc_) { + protected override val customScopeName: Option[String] = Some(s"twitter stream [$id]") + private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala index f396c347581ce..9a65efa8b425c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -27,6 +27,8 @@ import scala.reflect.ClassTag class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T]) extends InputDStream[T](ssc_) { + protected override val customScopeName: Option[String] = Some(s"constant stream [$id]") + override def start() {} override def stop() {} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 859e1e2def7d5..3497486641d35 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -121,10 +121,16 @@ abstract class DStream[T: ClassTag] ( * * This is not defined if the DStream is created outside of one of the public DStream operations. */ - private val baseScope: Option[String] = { + protected val baseScope: Option[String] = { Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)) } + /** + * An optional custom name for all scopes generated by this DStream. + * If None, the name of the operation that created this DStream will be used. + */ + protected val customScopeName: Option[String] = None + /** * Make a scope that groups RDDs created in the same DStream operation in the same batch. * @@ -132,11 +138,11 @@ abstract class DStream[T: ClassTag] ( * in the same operation. Separate calls to the same DStream operation create separate scopes. * For instance, `dstream.map(...).map(...)` creates two separate scopes per batch. */ - protected def makeScope(time: Time): Option[RDDOperationScope] = { + private def makeScope(time: Time): Option[RDDOperationScope] = { baseScope.map { bsJson => val formattedBatchTime = UIUtils.formatBatchTime(time.milliseconds) val bscope = RDDOperationScope.fromJson(bsJson) - val baseName = bscope.name // e.g. countByWindow + val baseName = customScopeName.getOrElse(bscope.name) // e.g. countByWindow val scopeName = if (baseName.length > 10) { // If the operation name is too long, wrap the line diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index eca69f00188e4..2123585ffd9d8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -126,6 +126,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null + protected override val customScopeName: Option[String] = Some(s"file stream [$id]") + override def start() { } override def stop() { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 9716adb62817c..0fb0cc20b14f9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -17,10 +17,10 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Time, Duration, StreamingContext} - import scala.reflect.ClassTag +import org.apache.spark.streaming.{Time, Duration, StreamingContext} + /** * This is the abstract base class for all input streams. This class provides methods * start() and stop() which is called by Spark Streaming system to start and stop receiving data. @@ -49,6 +49,9 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) */ private[streaming] def name: String = s"${getClass.getSimpleName}-$id" + /** Human-friendly scope name to use in place of generic operation names (e.g. createStream). */ + protected override val customScopeName: Option[String] = Some(s"input stream [$id]") + /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. * Additionally it also ensures valid times are in strictly increasing order. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index ed7da6dc1315e..dccaacf889cdd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -32,6 +32,8 @@ class QueueInputDStream[T: ClassTag]( defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { + protected override val customScopeName: Option[String] = Some(s"queue stream [$id]") + override def start() { } override def stop() { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index e2925b9e03ec3..b386144b0833a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -45,6 +45,8 @@ class RawInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_ ) with Logging { + protected override val customScopeName: Option[String] = Some(s"raw stream [$id]") + def getReceiver(): Receiver[T] = { new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]] } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 5cfe43a1ce726..e3c1afeffee84 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -40,6 +40,8 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { + protected override val customScopeName: Option[String] = Some(s"receiver stream [$id]") + /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 8b72bcf20653d..5f789f9f027a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -37,6 +37,8 @@ class SocketInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { + protected override val customScopeName: Option[String] = Some(s"socket stream [$id]") + def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) }