Skip to content

Commit

Permalink
Propagate all local properties
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Uang committed Jan 14, 2019
1 parent 563706e commit f477a24
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.execution.adaptive

import java.util.Properties

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.MapOutputStatistics
import org.apache.spark.broadcast
import org.apache.spark.{broadcast, MapOutputStatistics, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -48,20 +49,30 @@ abstract class QueryStage extends UnaryExecNode {

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

def withLocalProperties[T](sc: SparkContext, properties: Properties)(body: => T): T = {
val oldProperties = sc.getLocalProperties
try {
sc.setLocalProperties(properties)
body
} finally {
sc.setLocalProperties(oldProperties)
}
}

/**
* Execute childStages and wait until all stages are completed. Use a thread pool to avoid
* blocking on one child stage.
*/
def executeChildStages(): Unit = {
val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val localProperties = sqlContext.sparkContext.getLocalProperties

// Handle broadcast stages
val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
case bqs: BroadcastQueryStageInput => bqs.childStage
}
val broadcastFutures = broadcastQueryStages.map { queryStage =>
Future {
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
withLocalProperties(sqlContext.sparkContext, localProperties) {
queryStage.prepareBroadcast()
}
}(QueryStage.executionContext)
Expand All @@ -73,7 +84,7 @@ abstract class QueryStage extends UnaryExecNode {
}
val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
Future {
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
withLocalProperties(sqlContext.sparkContext, localProperties) {
queryStage.execute()
}
}(QueryStage.executionContext)
Expand Down

0 comments on commit f477a24

Please sign in to comment.