Skip to content

Commit

Permalink
[SPARK-31721][SQL] Assert optimized is initialized before tracking th…
Browse files Browse the repository at this point in the history
…e planning time

### What changes were proposed in this pull request?
The QueryPlanningTracker in QueryExeuction reports the planning time that also includes the optimization time. This happens because the optimizedPlan in QueryExecution is lazy and only will initialize when first called. When df.queryExecution.executedPlan is called, the the tracker starts recording the planning time, and then calls the optimized plan. This causes the planning time to start before optimization and also include the planning time.
This PR fixes this behavior by introducing a method assertOptimized, similar to assertAnalyzed that explicitly initializes the optimized plan. This method is called before measuring the time for sparkPlan and executedPlan. We call it before sparkPlan because that also counts as planning time.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests

Closes apache#28543 from dbaliafroozeh/AddAssertOptimized.

Authored-by: Ali Afroozeh <[email protected]>
Signed-off-by: herman <[email protected]>
  • Loading branch information
dbaliafroozeh authored and hvanhovell committed May 19, 2020
1 parent 653ca19 commit b9cc31c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,30 @@ class QueryExecution(
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
}

lazy val sparkPlan: SparkPlan = executePhase(QueryPlanningTracker.PLANNING) {
// Clone the logical plan here, in case the planner rules change the states of the logical plan.
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
private def assertOptimized(): Unit = optimizedPlan

lazy val sparkPlan: SparkPlan = {
// We need to materialize the optimizedPlan here because sparkPlan is also tracked under
// the planning phase
assertOptimized()
executePhase(QueryPlanningTracker.PLANNING) {
// Clone the logical plan here, in case the planner rules change the states of the logical
// plan.
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
}
}

// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = executePhase(QueryPlanningTracker.PLANNING) {
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
// optimizing and planning.
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
lazy val executedPlan: SparkPlan = {
// We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure
// that the optimization time is not counted as part of the planning phase.
assertOptimized()
executePhase(QueryPlanningTracker.PLANNING) {
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
// optimizing and planning.
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ class QueryPlanningTrackerEndToEndSuite extends StreamTest {
StopStream)
}

test("The start times should be in order: parsing <= analysis <= optimization <= planning") {
val df = spark.sql("select count(*) from range(1)")
df.queryExecution.executedPlan
val phases = df.queryExecution.tracker.phases
assert(phases("parsing").startTimeMs <= phases("analysis").startTimeMs)
assert(phases("analysis").startTimeMs <= phases("optimization").startTimeMs)
assert(phases("optimization").startTimeMs <= phases("planning").startTimeMs)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.test.SQLTestUtils

/**
Expand Down Expand Up @@ -237,7 +238,7 @@ object SparkPlanTest {
* @param spark SqlContext used for execution of the plan
*/
def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
val execution = new QueryExecution(spark.sparkSession, null) {
val execution = new QueryExecution(spark.sparkSession, LocalRelation(Nil)) {
override lazy val sparkPlan: SparkPlan = outputPlan transform {
case plan: SparkPlan =>
val inputMap = plan.children.flatMap(_.output).map(a => (a.name, a)).toMap
Expand Down

0 comments on commit b9cc31c

Please sign in to comment.