Skip to content

Commit

Permalink
Rename createRDDWith
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 18, 2015
1 parent 84d0656 commit e685df9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,19 +334,22 @@ abstract class DStream[T: ClassTag] (
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = createRDDWith(time) {
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)

val rddOption = createRDDWithLocalProperties(time) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}

rddOption.foreach { case newRDD =>
Expand All @@ -372,7 +375,7 @@ abstract class DStream[T: ClassTag] (
* Wrap a body of code such that the call site and operation scope
* information are passed to the RDDs created in this body properly.
*/
protected def createRDDWith[U](time: Time)(body: => U): U = {
protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
val scopeKey = SparkContext.RDD_SCOPE_KEY
val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
// Pass this DStream's operation scope and creation site information to RDDs through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] (
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWith(time) {
val jobFunc = () => createRDDWithLocalProperties(time) {
ssc.sparkContext.setCallSite(creationSite)
foreachFunc(rdd, time)
}
Expand Down

0 comments on commit e685df9

Please sign in to comment.