diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 56b917f3cde58..0229bda2e68e0 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -41,7 +41,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { + ): ReceiverInputDStream[SparkFlumeEvent] = { createStream(ssc, hostname, port, storageLevel, false) } @@ -73,7 +73,7 @@ object FlumeUtils { jssc: JavaStreamingContext, hostname: String, port: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port) } @@ -88,7 +88,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port, storageLevel, false) } @@ -105,7 +105,7 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel, enableDecompression: Boolean - ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression) } @@ -122,7 +122,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { + ): ReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel) } @@ -137,7 +137,7 @@ object FlumeUtils { ssc: StreamingContext, addresses: Seq[InetSocketAddress], storageLevel: StorageLevel - ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { + ): ReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(ssc, addresses, storageLevel, DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) } @@ -175,7 +175,7 @@ object FlumeUtils { jssc: JavaStreamingContext, hostname: String, port: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2) } @@ -192,7 +192,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel) } @@ -207,7 +207,7 @@ object FlumeUtils { jssc: JavaStreamingContext, addresses: Array[InetSocketAddress], storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc, addresses, storageLevel, DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) } @@ -229,7 +229,7 @@ object FlumeUtils { storageLevel: StorageLevel, maxBatchSize: Int, parallelism: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 1b465c823472a..f9128e98054e1 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -52,7 +52,7 @@ object MQTTUtils { jssc: JavaStreamingContext, brokerUrl: String, topic: String - ): JavaReceiverInputDStream[String] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic) } @@ -69,7 +69,7 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel - ): JavaReceiverInputDStream[String] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic, storageLevel) } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index 8ed9076a9123f..ed1d77809231e 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -53,9 +53,7 @@ object TwitterUtils { * @param jssc JavaStreamingContext object */ def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = { - jssc.ssc.withScope { - createStream(jssc.ssc, None) - } + createStream(jssc.ssc, None) } /** @@ -68,7 +66,7 @@ object TwitterUtils { * @param filters Set of filter strings to get only those tweets that match them */ def createStream(jssc: JavaStreamingContext, filters: Array[String] - ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters) } @@ -85,7 +83,7 @@ object TwitterUtils { jssc: JavaStreamingContext, filters: Array[String], storageLevel: StorageLevel - ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters, storageLevel) } @@ -96,7 +94,7 @@ object TwitterUtils { * @param twitterAuth Twitter4J Authorization */ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization - ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth)) } @@ -111,7 +109,7 @@ object TwitterUtils { jssc: JavaStreamingContext, twitterAuth: Authorization, filters: Array[String] - ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters) } @@ -127,7 +125,7 @@ object TwitterUtils { twitterAuth: Authorization, filters: Array[String], storageLevel: StorageLevel - ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 28c58487f726d..fbad55becbaa6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -25,7 +25,7 @@ import scala.language.implicitConversions import scala.reflect.ClassTag import scala.util.matching.Regex -import org.apache.spark.{SparkContext, Logging, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ @@ -127,9 +127,7 @@ abstract class DStream[T: ClassTag] ( /** * Make a scope name based on the given one. - * - * By default, this just returns the base name. Subclasses - * may optionally override this to provide custom scope names. + * Subclasses may optionally override this to provide custom scope names. */ protected[streaming] def makeScopeName(baseName: String): String = baseName @@ -351,8 +349,8 @@ abstract class DStream[T: ClassTag] ( if (isTimeValid(time)) { val rddOption = doCompute(time) - // Register the generated RDD for caching and checkpointing rddOption.foreach { case newRDD => + // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") @@ -363,7 +361,6 @@ abstract class DStream[T: ClassTag] ( } generatedRDDs.put(time, newRDD) } - rddOption } else { None @@ -739,9 +736,7 @@ abstract class DStream[T: ClassTag] ( * the same interval as this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's interval. */ - def window(windowDuration: Duration): DStream[T] = ssc.withScope { - window(windowDuration, this.slideDuration) - } + def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) /** * Return a new DStream in which each RDD contains all the elements in seen in a diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 93c6d9d76f33a..27a8a820cc74e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -46,7 +46,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ - def groupByKey(): DStream[(K, Iterable[V])] = self.ssc.withScope { + def groupByKey(): DStream[(K, Iterable[V])] = { groupByKey(defaultPartitioner()) } @@ -54,7 +54,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = self.ssc.withScope { + def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = { groupByKey(defaultPartitioner(numPartitions)) } @@ -75,7 +75,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * merged using the associative reduce function. Hash partitioning is used to generate the RDDs * with Spark's default number of partitions. */ - def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = self.ssc.withScope { + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner()) } @@ -86,7 +86,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def reduceByKey( reduceFunc: (V, V) => V, - numPartitions: Int): DStream[(K, V)] = self.ssc.withScope { + numPartitions: Int): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } @@ -126,9 +126,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * batching interval */ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = { - self.ssc.withScope { - groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) - } + groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) } /** @@ -142,7 +140,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * DStream's batching interval */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) - : DStream[(K, Iterable[V])] = self.ssc.withScope { + : DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -162,7 +160,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, Iterable[V])] = self.ssc.withScope { + ): DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -203,7 +201,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration - ): DStream[(K, V)] = self.ssc.withScope { + ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) } @@ -222,7 +220,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration - ): DStream[(K, V)] = self.ssc.withScope { + ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) } @@ -243,7 +241,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, V)] = self.ssc.withScope { + ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -299,8 +297,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration = self.slideDuration, numPartitions: Int = ssc.sc.defaultParallelism, filterFunc: ((K, V)) => Boolean = null - ): DStream[(K, V)] = self.ssc.withScope { - + ): DStream[(K, V)] = { reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions), filterFunc @@ -354,7 +351,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] - ): DStream[(K, S)] = self.ssc.withScope { + ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) } @@ -370,7 +367,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int - ): DStream[(K, S)] = self.ssc.withScope { + ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) } @@ -387,7 +384,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner - ): DStream[(K, S)] = self.ssc.withScope { + ): DStream[(K, S)] = { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } @@ -430,7 +427,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)] - ): DStream[(K, S)] = self.ssc.withScope { + ): DStream[(K, S)] = { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } @@ -485,7 +482,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * of partitions. */ def cogroup[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = self.ssc.withScope { + other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner()) } @@ -495,7 +492,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def cogroup[W: ClassTag]( other: DStream[(K, W)], - numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = self.ssc.withScope { + numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } @@ -517,7 +514,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ - def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = self.ssc.withScope { + def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner()) } @@ -527,7 +524,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def join[W: ClassTag]( other: DStream[(K, W)], - numPartitions: Int): DStream[(K, (V, W))] = self.ssc.withScope { + numPartitions: Int): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner(numPartitions)) } @@ -551,7 +548,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def leftOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = self.ssc.withScope { + other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { leftOuterJoin[W](other, defaultPartitioner()) } @@ -563,7 +560,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (V, Option[W]))] = self.ssc.withScope { + ): DStream[(K, (V, Option[W]))] = { leftOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -588,7 +585,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def rightOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = self.ssc.withScope { + other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { rightOuterJoin[W](other, defaultPartitioner()) } @@ -600,7 +597,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (Option[V], W))] = self.ssc.withScope { + ): DStream[(K, (Option[V], W))] = { rightOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -625,7 +622,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def fullOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = self.ssc.withScope { + other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = { fullOuterJoin[W](other, defaultPartitioner()) } @@ -637,7 +634,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def fullOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (Option[V], Option[W]))] = self.ssc.withScope { + ): DStream[(K, (Option[V], Option[W]))] = { fullOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -663,7 +660,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassTag[F]): Unit = self.ssc.withScope { + )(implicit fm: ClassTag[F]): Unit = { saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -696,7 +693,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassTag[F]): Unit = self.ssc.withScope { + )(implicit fm: ClassTag[F]): Unit = { saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) }