-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36559][SQL][PYTHON] Create plans dedicated to distributed-sequence index for optimization #33807
Conversation
6f441fc
to
296a6ae
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
296a6ae
to
68e0a23
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
14021b4
to
d28b29c
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
d28b29c
to
63ef433
Compare
cc @ueshin and @cloud-fan can you take a look when you find some time? |
Kubernetes integration test starting |
Kubernetes integration test status success |
override def outputPartitioning: Partitioning = child.outputPartitioning | ||
|
||
override protected def doExecute(): RDD[InternalRow] = { | ||
child.execute().map(_.copy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to copy the unsafe rows before calling localCheckpoint
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, i forgot to describe it. localCheckpoint
caches (persists) the data, and it stores the rows so it needs to copy. This is actually being done at Dataset.checkpoint: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L679
|
||
override protected def doExecute(): RDD[InternalRow] = { | ||
child.execute().map(_.copy()) | ||
.localCheckpoint() // to avoid execute multiple jobs. zipWithIndex launches a Spark job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still not sure if we need to localCheckPoint
in the middle here ... but let me keep it as is for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.) if the child RDD has a shuffle, the shuffle will be triggered twice, and this checkpoint is to avoid that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shuffle will be reused. I think localCheckpoint
is useful to save computation. e.g. df.sort(...).withSequenceColumn
, if we don't do localCheckpoint
, the shuffle is still done only once, but the local sort after shuffle will be done twice.
* increases one by one. This is for 'distributed-sequence' default index | ||
* in pandas API on Spark. | ||
*/ | ||
case class AttachDistributedSequenceExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could think about implementing this with an expression (like Python UDF or Window) .. but just decided to do this with plans to avoid making it too much complicated.
Test build #142723 has finished for PR 33807 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
One thing I'm worrying is that we can't push down filters through AttachDistributedSequence
, but it won't happen, right?
Yeah, I think it won't happen. Just did a quick double check. |
Let me merge this one into 3.2 together. |
Merged to master and branch-3.2. |
…ence index for optimization ### What changes were proposed in this pull request? This PR proposes to move distributed-sequence index implementation to SQL plan to leverage optimizations such as column pruning. ```python import pyspark.pandas as ps ps.set_option('compute.default_index_type', 'distributed-sequence') ps.range(10).id.value_counts().to_frame().spark.explain() ``` **Before:** ```bash == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [count#51L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(count#51L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#70] +- HashAggregate(keys=[id#37L], functions=[count(1)], output=[__index_level_0__#48L, count#51L]) +- Exchange hashpartitioning(id#37L, 200), ENSURE_REQUIREMENTS, [id=#67] +- HashAggregate(keys=[id#37L], functions=[partial_count(1)], output=[id#37L, count#63L]) +- Project [id#37L] +- Filter atleastnnonnulls(1, id#37L) +- Scan ExistingRDD[__index_level_0__#36L,id#37L] # ^^^ Base DataFrame created by the output RDD from zipWithIndex (and checkpointed) ``` **After:** ```bash == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [count#275L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(count#275L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#174] +- HashAggregate(keys=[id#258L], functions=[count(1)]) +- HashAggregate(keys=[id#258L], functions=[partial_count(1)]) +- Filter atleastnnonnulls(1, id#258L) +- Range (0, 10, step=1, splits=16) # ^^^ Removed the Spark job execution for `zipWithIndex` ``` ### Why are the changes needed? To leverage optimization of SQL engine and avoid unnecessary shuffle to create default index. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unittests were added. Also, this PR will test all unittests in pandas API on Spark after switching the default index implementation to `distributed-sequence`. Closes #33807 from HyukjinKwon/SPARK-36559. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 93cec49) Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR proposes to move distributed-sequence index implementation to SQL plan to leverage optimizations such as column pruning.
Before:
After:
Why are the changes needed?
To leverage optimization of SQL engine and avoid unnecessary shuffle to create default index.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unittests were added. Also, this PR will test all unittests in pandas API on Spark after switching the default index implementation to
distributed-sequence
.