Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10731][SQL] Delegate to Scala's DataFrame.take implementation in Python DataFrame. #8876

Closed
wants to merge 3 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Sep 23, 2015

Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take).

This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42876 has finished for PR 8876 at commit 778c2fd.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42891 has finished for PR 8876 at commit 267ffa7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #1793 has finished for PR 8876 at commit 267ffa7.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Sep 23, 2015

LGTM

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42920 has finished for PR 8876 at commit c52ff7e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Sep 23, 2015
… in Python DataFrame.

Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take).

This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion.

Author: Reynold Xin <[email protected]>

Closes #8876 from rxin/SPARK-10731.

(cherry picked from commit 9952217)
Signed-off-by: Reynold Xin <[email protected]>
@asfgit asfgit closed this in 9952217 Sep 23, 2015
asfgit pushed a commit that referenced this pull request Sep 14, 2016
… same in Python

## What changes were proposed in this pull request?

In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.

## How was this patch tested?

Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.

Author: Josh Rosen <[email protected]>

Closes #15068 from JoshRosen/pyspark-collect-limit.

(cherry picked from commit 6d06ff6)
Signed-off-by: Davies Liu <[email protected]>
asfgit pushed a commit that referenced this pull request Sep 14, 2016
… same in Python

## What changes were proposed in this pull request?

In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.

## How was this patch tested?

Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.

Author: Josh Rosen <[email protected]>

Closes #15068 from JoshRosen/pyspark-collect-limit.
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
… same in Python

## What changes were proposed in this pull request?

In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in apache#7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / apache#8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.

## How was this patch tested?

Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.

Author: Josh Rosen <[email protected]>

Closes apache#15068 from JoshRosen/pyspark-collect-limit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants