Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35093] [SQL] AQE now uses newQueryStage plan as key for looking up cached exchanges for re-use #32195

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ case class AdaptiveSparkPlanExec(
// Check the `stageCache` again for reuse. If a match is found, ditch the new stage
// and reuse the existing stage found in the `stageCache`, otherwise update the
// `stageCache` with the new stage.
val queryStage = context.stageCache.getOrElseUpdate(e.canonicalized, newStage)
val queryStage = context.stageCache.getOrElseUpdate(
newStage.plan.canonicalized, newStage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks DPP. I'm not sure exactly why, but the key of stage cache was plan without applying queryStageOptimizerRules, now it's different. This probably breaks DPP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea is to introduce a new phase: preStageCreationRules. We can put ApplyColumnarRulesAndInsertTransitions and run it before reusing exchanges.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions. I am working on this now and will try this out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing it out. That feels odd to me that it breaks the DPP. I did also notice seems more changes to the way that works in flight: https://github.com/apache/spark/pull/31756/files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we merge #31756 and then rebase this PR the DPP tests pass so I think this is the best path forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#31756 was merged @andygrove can you up merge and retest to make sure this works?

if (queryStage.ne(newStage)) {
newStage = reuseQueryStage(queryStage, e)
}
Expand Down