From e685df9a7ab5cf64a68bb443eaa9ea8641127750 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 May 2015 11:02:48 -0700 Subject: [PATCH] Rename createRDDWith --- .../spark/streaming/dstream/DStream.scala | 19 +++++++++++-------- .../streaming/dstream/ForEachDStream.scala | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 922671be8732d..5977481e1f081 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -334,19 +334,22 @@ abstract class DStream[T: ClassTag] ( * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ - private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = createRDDWith(time) { + private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { - // Disable checks for existing output directories in jobs launched by the streaming - // scheduler, since we may need to write output to an existing directory during checkpoint - // recovery; see SPARK-4835 for more details. We need to have this call here because - // compute() might cause Spark jobs to be launched. - val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) { - compute(time) + + val rddOption = createRDDWithLocalProperties(time) { + // Disable checks for existing output directories in jobs launched by the streaming + // scheduler, since we may need to write output to an existing directory during checkpoint + // recovery; see SPARK-4835 for more details. We need to have this call here because + // compute() might cause Spark jobs to be launched. + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + compute(time) + } } rddOption.foreach { case newRDD => @@ -372,7 +375,7 @@ abstract class DStream[T: ClassTag] ( * Wrap a body of code such that the call site and operation scope * information are passed to the RDDs created in this body properly. */ - protected def createRDDWith[U](time: Time)(body: => U): U = { + protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = { val scopeKey = SparkContext.RDD_SCOPE_KEY val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY // Pass this DStream's operation scope and creation site information to RDDs through diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index a64a77dd661ae..c109ceccc6989 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] ( override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => - val jobFunc = () => createRDDWith(time) { + val jobFunc = () => createRDDWithLocalProperties(time) { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) }