Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8964] [SQL] Use Exchange to perform shuffle in Limit #7334

Closed
wants to merge 20 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch changes the implementation of the physical Limit operator so that it relies on the Exchange operator to perform data movement rather than directly using ShuffledRDD. In addition to improving efficiency, this lays the necessary groundwork for further optimization of limit, such as limit pushdown or whole-stage codegen.

At a high-level, this replaces the old physical Limit operator with two new operators, LocalLimit and GlobalLimit. LocalLimit performs per-partition limits, while GlobalLimit applies the final limit to a single partition; GlobalLimit's declares that its requiredInputDistribution is SinglePartition, which will cause the planner to use an Exchange to perform the appropriate shuffles. Thus, a logical Limit appearing in the middle of a query plan will be expanded into LocalLimit -> Exchange to one partition -> GlobalLimit.

In the old code, calling someDataFrame.limit(100).collect() or someDataFrame.take(100) would actually skip the shuffle and use a fast-path which used executeTake() in order to avoid computing all partitions in case only a small number of rows were requested. This patch preserves this optimization by treating logical Limit operators specially when they appear as the terminal operator in a query plan: if a Limit is the final operator, then we will plan a special CollectLimit physical operator which implements the old take()-based logic.

In order to be able to match on operators only at the root of the query plan, this patch introduces a special ReturnAnswer logical operator which functions similar to BroadcastHint: this dummy operator is inserted at the root of the optimized logical plan before invoking the physical planner, allowing the planner to pattern-match on it.

@JoshRosen JoshRosen changed the title [SPARK-8964] Use Exchange to perform shuffle in Limit [SPARK-8964] [SQL] Use Exchange to perform shuffle in Limit Jul 9, 2015
@JoshRosen
Copy link
Contributor Author

Ping @rxin, since this is addressing an old TODO that you added.

@JoshRosen
Copy link
Contributor Author

One question that I have: will the change here introduce a performance regression in cases where Limit is the final operator in the plan? I think that we may want to continue to do the executeTake() for those cases, which may require us to use a different physical plan for Limit in cases where it's the terminal operator.

override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition

override def executeCollect(): Array[Row] = child.executeTake(limit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, forgot to remove this. This should probably be split to a separate operator in order to handle the cases where Limit is the final operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the code for executeTake and it looks like it's also doing some potentially-unnecessary copying and inefficient serialization of result rows (AFAIK it's not using SqlSerializer2).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will call the submitJob, probably we don't have opportunity to use the SqlSerializer2, as it will be done by SparkEnv.serializer eventually, is I understand correctly.
At least we'd better do it in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I was planning to submit separate JIRA + pull request to address this for take/collect. I originally thought that we might want that pull request to go in ahead of this one but I now think that they're fairly distinct and don't need to block each other.

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #36977 has finished for PR 7334 at commit dfe6ff1.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaRDD(RDD):
    • class KafkaDStream(DStream):
    • class KafkaTransformedDStream(TransformedDStream):
    • class GenericInternalRowWithSchema(values: Array[Any], override val schema: StructType)
    • case class PartitionLocalLimit(limit: Int, child: SparkPlan)
    • case class StreamInputInfo(

@JoshRosen JoshRosen changed the title [SPARK-8964] [SQL] Use Exchange to perform shuffle in Limit [SPARK-8964] [SQL] [WIP] Use Exchange to perform shuffle in Limit Jul 10, 2015
@JoshRosen
Copy link
Contributor Author

I think that this patch should be revisited after refactoring take() and collect() to use the efficient SQL serializers.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor Author

(Testing a pull request builder configuration change...)

@SparkQA
Copy link

SparkQA commented Jul 13, 2015

Test build #37102 has finished for PR 7334 at commit dfe6ff1.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitionLocalLimit(limit: Int, child: SparkPlan)

@@ -332,8 +332,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
LocalTableScan(output, data) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.Limit(limit, planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Unnecessary { }

@SparkQA
Copy link

SparkQA commented Jul 13, 2015

Test build #37160 has finished for PR 7334 at commit c02324c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Limit(global: Boolean, limit: Int, child: SparkPlan)

@JoshRosen
Copy link
Contributor Author

Alright, I've updated this to allow the planner to insert the Exchange. We still can't merge this as-is, though, since it will introduce a bad regression for the case where Limit is the final operator in a physical plan. AFAIK there's no easy way to write optimizer rules which reference the parent operator, so it's hard to write an optimizer rule to recognize this pattern and use a separate Limit implementation.

@SparkQA
Copy link

SparkQA commented Jul 14, 2015

Test build #37164 has finished for PR 7334 at commit 70f69b6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Limit(global: Boolean, limit: Int, child: SparkPlan)

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37432 has finished for PR 7334 at commit cc63456.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReturnAnswer(child: LogicalPlan) extends UnaryNode
    • case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode
    • case class Limit(global: Boolean, limit: Int, child: SparkPlan) extends UnaryNode

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37433 has finished for PR 7334 at commit 7dbb28e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReturnAnswer(child: LogicalPlan) extends UnaryNode
    • case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode
    • case class Limit(global: Boolean, limit: Int, child: SparkPlan) extends UnaryNode

@chenghao-intel
Copy link
Contributor

@JoshRosen ,seems execution.CollectLimit will eventually invoke the code like (in SparkPlan.executeTake):

sc.runJob(childRDD, (it: Iterator[InternalRow]) => it.take(left).toArray, 
  p,  allowLocal = false)

I am wondering if
execution.CollectLimit(limit, planLater(child))
V.S.
execution.Limit(global = true, limit, execution.Limit(global=false, limit, child))
are actually equals in data shuffling / copying, if so, probably we can simplify the code by removing the CollectLimit and ReturnAnswer.

Sorry if I missed something.

@JoshRosen
Copy link
Contributor Author

@chenghao-intel:

They are not equivalent in terms of data shuffling, which is why I introduced CollectLimit. In the old code, calling executeCollect on a Limit operator would end up running a code path which skips the shuffle after the per-partition limits and uses executeTake to try to only compute as many partitions as are necessary to answer the query. execution.Limit(global = true, limit, execution.Limit(global=false, limit, child)), on the other hand, will end up involving a shuffle, so we'll end up calling executeTake on an RDD with a single partition, preventing it from being able to do less work.

@JoshRosen
Copy link
Contributor Author

There's one problem with the planner changes here: during logical + physical planning we don't know in advance whether executeTake() is going to be called. If I was to call .rdd on a query plan with a limit as the final operator then I would get an error.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Feb 4, 2016

Test build #50730 has finished for PR 7334 at commit 55e27af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {

lazy val sparkPlan: SparkPlan = {
SQLContext.setActive(sqlContext)
sqlContext.planner.plan(optimizedPlan).next()
sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @marmbrus here to make sure it is safe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this looks good to me.

@SparkQA
Copy link

SparkQA commented Feb 4, 2016

Test build #2514 has finished for PR 7334 at commit 55e27af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Feb 4, 2016

cc @davies for a more detailed review

override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be called, should we throw UnsupportedOperaionError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually can get called if you call df.limit(10).cache().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, should we use GlobalLimit?

Or it will waste some time in collecting rows to driver, than sending them to executors,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a tricky problem to address. If we want to actually plan GlobalLimit here instead then we need to be able to distinguish between the collect-to-driver and cache cases earlier in the planning process, which I think is going to be a large and non-trivial change.

If we want to avoid collecting back to the driver and re-broadcasting then I suppose we can do something similar to what the old Limit operator's doExecute did. The problem with the old doExecute() was that it involved a lot of inefficient copying and serialization. Instead, it might be worth seeing whether the inner logic of the Exchange operator could be called from CollectLimit.doExecute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to ReturnAnswer, we could put a CachePlan() as the root (See CacheManager.cacheQuery), and have a special case for it.

If it's not trivial, we could just leave a TODO for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good idea. I think we could just match on CachePlan(CollectLimit(...)) and replace it by GlobalLimit. Let me go ahead and try this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've started working on this and ran into one minor snag: what happens if a user has a DataFrame whose root logical plan is an RDD, then calls .rdd() on it? In that case, CollectLimit will still be the root and we'll need it to have a functioning doExecute().

On reflection, I agree that the current doExecute() here is bad because it breaks linage and laziness. I'll see about switching back to an approach which invokes Exchange directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe having a special case for Limit as root is not a good idea, I think we could always have GlobalLimit, call executeTake(limit) in that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed upthread, simply always having GlobalLimit would lead to regressions when calling df.limit(10).collect(), since it would now have to compute all partitions of df whereas before it might be able to compute only a subset.

@JoshRosen
Copy link
Contributor Author

Alright, just pushed an update which I think should fix the laziness problems with .rdd and .cache. The key idea is to move the ReturnAnswer injection into DataFrame.collect() rather than always putting it at the root of the logical plan before invoking the planner.

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50837 has finished for PR 7334 at commit c4b0a53.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@davies
Copy link
Contributor

davies commented Feb 5, 2016

LGTM, pending tests.

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50844 has finished for PR 7334 at commit c4b0a53.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 6, 2016

Test build #50850 has finished for PR 7334 at commit b8c9e47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

This should now be ready for a final sign-off.

private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
protected override def doExecute(): RDD[InternalRow] = {
val shuffled = new ShuffledRowRDD(
Exchange.prepareShuffleDependency(child.execute(), child.output, SinglePartition, serializer))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the plan in SparkUI (no exchange) will not match the actual plan (having exchange)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the current implementation also adds an ShuffledRowRDD without Exchange, this is not a regression.

@davies
Copy link
Contributor

davies commented Feb 8, 2016

LGTM, merging this into master.

@asfgit asfgit closed this in 06f0df6 Feb 8, 2016
@JoshRosen JoshRosen deleted the remove-copy-in-limit branch February 8, 2016 19:56
asfgit pushed a commit that referenced this pull request Feb 10, 2016
The patch for SPARK-8964 ("use Exchange to perform shuffle in Limit" / #7334) inadvertently broke the planning of the TakeOrderedAndProject operator: because ReturnAnswer was the new root of the query plan, the TakeOrderedAndProject rule was unable to match before BasicOperators.

This patch fixes this by moving the `TakeOrderedAndCollect` and `CollectLimit` rules into the same strategy.

In addition, I made changes to the TakeOrderedAndProject operator in order to make its `doExecute()` method lazy and added a new TakeOrderedAndProjectSuite which tests the new code path.

/cc davies and marmbrus for review.

Author: Josh Rosen <[email protected]>

Closes #11145 from JoshRosen/take-ordered-and-project-fix.
asfgit pushed a commit that referenced this pull request Feb 15, 2016
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases:

- If a limit is on top of a `UNION ALL` operator, then a partition-local limit operator will be pushed to each of the union operator's children.
- If a limit is on top of an `OUTER JOIN` then a partition-local limit will be pushed to one side of the join. For `LEFT OUTER` and `RIGHT OUTER` joins, the limit will be pushed to the left and right side, respectively. For `FULL OUTER` join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger.

These optimizations were proposed previously by gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical `Limit` operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the `Limit` operator into separate `LocalLimit` and `GlobalLimit` operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of gatorsmile's patches, with changes and simplifications due to partition-local-limiting.

When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a `maxRows` method to `SparkPlan` which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion.

Author: Josh Rosen <[email protected]>

Closes #11121 from JoshRosen/limit-pushdown-2.
asfgit pushed a commit that referenced this pull request Sep 14, 2016
… same in Python

## What changes were proposed in this pull request?

In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.

## How was this patch tested?

Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.

Author: Josh Rosen <[email protected]>

Closes #15068 from JoshRosen/pyspark-collect-limit.

(cherry picked from commit 6d06ff6)
Signed-off-by: Davies Liu <[email protected]>
asfgit pushed a commit that referenced this pull request Sep 14, 2016
… same in Python

## What changes were proposed in this pull request?

In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.

## How was this patch tested?

Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.

Author: Josh Rosen <[email protected]>

Closes #15068 from JoshRosen/pyspark-collect-limit.
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
… same in Python

## What changes were proposed in this pull request?

In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in apache#7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / apache#8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.

## How was this patch tested?

Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.

Author: Josh Rosen <[email protected]>

Closes apache#15068 from JoshRosen/pyspark-collect-limit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants