-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8964] [SQL] Use Exchange to perform shuffle in Limit #7334
Conversation
Ping @rxin, since this is addressing an old TODO that you added. |
One question that I have: will the change here introduce a performance regression in cases where Limit is the final operator in the plan? I think that we may want to continue to do the |
override def output: Seq[Attribute] = child.output | ||
override def outputPartitioning: Partitioning = SinglePartition | ||
|
||
override def executeCollect(): Array[Row] = child.executeTake(limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, forgot to remove this. This should probably be split to a separate operator in order to handle the cases where Limit is the final operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the code for executeTake
and it looks like it's also doing some potentially-unnecessary copying and inefficient serialization of result rows (AFAIK it's not using SqlSerializer2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will call the submitJob
, probably we don't have opportunity to use the SqlSerializer2
, as it will be done by SparkEnv.serializer
eventually, is I understand correctly.
At least we'd better do it in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I was planning to submit separate JIRA + pull request to address this for take/collect. I originally thought that we might want that pull request to go in ahead of this one but I now think that they're fairly distinct and don't need to block each other.
Test build #36977 has finished for PR 7334 at commit
|
I think that this patch should be revisited after refactoring |
Jenkins, retest this please. |
(Testing a pull request builder configuration change...) |
Test build #37102 has finished for PR 7334 at commit
|
@@ -332,8 +332,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |||
execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil | |||
case logical.LocalRelation(output, data) => | |||
LocalTableScan(output, data) :: Nil | |||
case logical.Limit(IntegerLiteral(limit), child) => | |||
execution.Limit(limit, planLater(child)) :: Nil | |||
case logical.Limit(IntegerLiteral(limit), child) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Unnecessary { }
Test build #37160 has finished for PR 7334 at commit
|
Alright, I've updated this to allow the planner to insert the Exchange. We still can't merge this as-is, though, since it will introduce a bad regression for the case where Limit is the final operator in a physical plan. AFAIK there's no easy way to write optimizer rules which reference the parent operator, so it's hard to write an optimizer rule to recognize this pattern and use a separate Limit implementation. |
Test build #37164 has finished for PR 7334 at commit
|
Test build #37432 has finished for PR 7334 at commit
|
Test build #37433 has finished for PR 7334 at commit
|
@JoshRosen ,seems sc.runJob(childRDD, (it: Iterator[InternalRow]) => it.take(left).toArray,
p, allowLocal = false) I am wondering if Sorry if I missed something. |
They are not equivalent in terms of data shuffling, which is why I introduced |
There's one problem with the planner changes here: during logical + physical planning we don't know in advance whether |
Jenkins, retest this please. |
Test build #50730 has finished for PR 7334 at commit
|
@@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { | |||
|
|||
lazy val sparkPlan: SparkPlan = { | |||
SQLContext.setActive(sqlContext) | |||
sqlContext.planner.plan(optimizedPlan).next() | |||
sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @marmbrus here to make sure it is safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this looks good to me.
Test build #2514 has finished for PR 7334 at commit
|
cc @davies for a more detailed review |
override def output: Seq[Attribute] = child.output | ||
override def outputPartitioning: Partitioning = SinglePartition | ||
override def executeCollect(): Array[InternalRow] = child.executeTake(limit) | ||
protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be called, should we throw UnsupportedOperaionError?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually can get called if you call df.limit(10).cache()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this case, should we use GlobalLimit?
Or it will waste some time in collecting rows to driver, than sending them to executors,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a tricky problem to address. If we want to actually plan GlobalLimit
here instead then we need to be able to distinguish between the collect-to-driver and cache cases earlier in the planning process, which I think is going to be a large and non-trivial change.
If we want to avoid collecting back to the driver and re-broadcasting then I suppose we can do something similar to what the old Limit
operator's doExecute
did. The problem with the old doExecute()
was that it involved a lot of inefficient copying and serialization. Instead, it might be worth seeing whether the inner logic of the Exchange
operator could be called from CollectLimit.doExecute
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to ReturnAnswer, we could put a CachePlan() as the root (See CacheManager.cacheQuery), and have a special case for it.
If it's not trivial, we could just leave a TODO for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a really good idea. I think we could just match on CachePlan(CollectLimit(...))
and replace it by GlobalLimit
. Let me go ahead and try this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've started working on this and ran into one minor snag: what happens if a user has a DataFrame whose root logical plan is an RDD, then calls .rdd()
on it? In that case, CollectLimit
will still be the root and we'll need it to have a functioning doExecute()
.
On reflection, I agree that the current doExecute()
here is bad because it breaks linage and laziness. I'll see about switching back to an approach which invokes Exchange directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe having a special case for Limit as root is not a good idea, I think we could always have GlobalLimit, call executeTake(limit)
in that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed upthread, simply always having GlobalLimit
would lead to regressions when calling df.limit(10).collect()
, since it would now have to compute all partitions of df
whereas before it might be able to compute only a subset.
Alright, just pushed an update which I think should fix the laziness problems with |
Test build #50837 has finished for PR 7334 at commit
|
Jenkins, retest this please. |
LGTM, pending tests. |
Test build #50844 has finished for PR 7334 at commit
|
Test build #50850 has finished for PR 7334 at commit
|
This should now be ready for a final sign-off. |
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) | ||
protected override def doExecute(): RDD[InternalRow] = { | ||
val shuffled = new ShuffledRowRDD( | ||
Exchange.prepareShuffleDependency(child.execute(), child.output, SinglePartition, serializer)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the plan in SparkUI (no exchange) will not match the actual plan (having exchange)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the current implementation also adds an ShuffledRowRDD without Exchange, this is not a regression.
LGTM, merging this into master. |
The patch for SPARK-8964 ("use Exchange to perform shuffle in Limit" / #7334) inadvertently broke the planning of the TakeOrderedAndProject operator: because ReturnAnswer was the new root of the query plan, the TakeOrderedAndProject rule was unable to match before BasicOperators. This patch fixes this by moving the `TakeOrderedAndCollect` and `CollectLimit` rules into the same strategy. In addition, I made changes to the TakeOrderedAndProject operator in order to make its `doExecute()` method lazy and added a new TakeOrderedAndProjectSuite which tests the new code path. /cc davies and marmbrus for review. Author: Josh Rosen <[email protected]> Closes #11145 from JoshRosen/take-ordered-and-project-fix.
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases: - If a limit is on top of a `UNION ALL` operator, then a partition-local limit operator will be pushed to each of the union operator's children. - If a limit is on top of an `OUTER JOIN` then a partition-local limit will be pushed to one side of the join. For `LEFT OUTER` and `RIGHT OUTER` joins, the limit will be pushed to the left and right side, respectively. For `FULL OUTER` join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger. These optimizations were proposed previously by gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical `Limit` operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the `Limit` operator into separate `LocalLimit` and `GlobalLimit` operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of gatorsmile's patches, with changes and simplifications due to partition-local-limiting. When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a `maxRows` method to `SparkPlan` which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion. Author: Josh Rosen <[email protected]> Closes #11121 from JoshRosen/limit-pushdown-2.
… same in Python ## What changes were proposed in this pull request? In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing. The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.). In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations. ## How was this patch tested? Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries. Author: Josh Rosen <[email protected]> Closes #15068 from JoshRosen/pyspark-collect-limit. (cherry picked from commit 6d06ff6) Signed-off-by: Davies Liu <[email protected]>
… same in Python ## What changes were proposed in this pull request? In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing. The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.). In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations. ## How was this patch tested? Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries. Author: Josh Rosen <[email protected]> Closes #15068 from JoshRosen/pyspark-collect-limit.
… same in Python ## What changes were proposed in this pull request? In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing. The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in apache#7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.). In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / apache#8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations. ## How was this patch tested? Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries. Author: Josh Rosen <[email protected]> Closes apache#15068 from JoshRosen/pyspark-collect-limit.
This patch changes the implementation of the physical
Limit
operator so that it relies on theExchange
operator to perform data movement rather than directly usingShuffledRDD
. In addition to improving efficiency, this lays the necessary groundwork for further optimization of limit, such as limit pushdown or whole-stage codegen.At a high-level, this replaces the old physical
Limit
operator with two new operators,LocalLimit
andGlobalLimit
.LocalLimit
performs per-partition limits, whileGlobalLimit
applies the final limit to a single partition;GlobalLimit
's declares that itsrequiredInputDistribution
isSinglePartition
, which will cause the planner to use anExchange
to perform the appropriate shuffles. Thus, a logicalLimit
appearing in the middle of a query plan will be expanded intoLocalLimit -> Exchange to one partition -> GlobalLimit
.In the old code, calling
someDataFrame.limit(100).collect()
orsomeDataFrame.take(100)
would actually skip the shuffle and use a fast-path which usedexecuteTake()
in order to avoid computing all partitions in case only a small number of rows were requested. This patch preserves this optimization by treating logicalLimit
operators specially when they appear as the terminal operator in a query plan: if aLimit
is the final operator, then we will plan a specialCollectLimit
physical operator which implements the oldtake()
-based logic.In order to be able to match on operators only at the root of the query plan, this patch introduces a special
ReturnAnswer
logical operator which functions similar toBroadcastHint
: this dummy operator is inserted at the root of the optimized logical plan before invoking the physical planner, allowing the planner to pattern-match on it.