Skip to content

Commit

Permalink
resolve the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Feb 5, 2021
1 parent 8a77832 commit a02e307
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ case class AdaptiveSparkPlanExec(

override def output: Seq[Attribute] = inputPlan.output

override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
override def doCanonicalize(): SparkPlan = inputPlan.canonicalized

override def resetMetrics(): Unit = {
metrics.valuesIterator.foreach(_.reset())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ case class PlanAdaptiveDynamicPruningFilters(
HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output)
val mode = HashedRelationBroadcastMode(packedKeys)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, adaptivePlan)
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)
val existingStage = stageCache.get(exchange.canonicalized)
if (existingStage.nonEmpty && conf.exchangeReuseEnabled) {
val name = s"dynamicpruning#${exprId.id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ abstract class QueryStageExec extends LeafExecNode {
*/
val plan: SparkPlan

/**
* The canonicalized plan before applying query stage optimizer rules.
*/
val _canonicalized: SparkPlan

/**
* Materialize this query stage, to prepare for the execution, like submitting map stages,
* broadcasting data, etc. The caller side can use the returned [[Future]] to wait until this
Expand Down Expand Up @@ -116,7 +121,7 @@ abstract class QueryStageExec extends LeafExecNode {
override def supportsColumnar: Boolean = plan.supportsColumnar
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = plan.executeColumnar()
override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast()
override def doCanonicalize(): SparkPlan = plan.canonicalized
override def doCanonicalize(): SparkPlan = _canonicalized

protected override def stringArgs: Iterator[Any] = Iterator.single(id)

Expand Down Expand Up @@ -154,7 +159,7 @@ abstract class QueryStageExec extends LeafExecNode {
case class ShuffleQueryStageExec(
override val id: Int,
override val plan: SparkPlan,
_canonicalized: SparkPlan) extends QueryStageExec {
override val _canonicalized: SparkPlan) extends QueryStageExec {

@transient val shuffle = plan match {
case s: ShuffleExchangeLike => s
Expand Down Expand Up @@ -208,7 +213,7 @@ case class ShuffleQueryStageExec(
case class BroadcastQueryStageExec(
override val id: Int,
override val plan: SparkPlan,
_canonicalized: SparkPlan) extends QueryStageExec {
override val _canonicalized: SparkPlan) extends QueryStageExec {

@transient val broadcast = plan match {
case b: BroadcastExchangeLike => b
Expand Down

0 comments on commit a02e307

Please sign in to comment.