-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10731][SQL] Delegate to Scala's DataFrame.take implementation in Python DataFrame. #8876
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…in Python DataFrame.
Test build #42876 has finished for PR 8876 at commit
|
Test build #42891 has finished for PR 8876 at commit
|
Test build #1793 has finished for PR 8876 at commit
|
LGTM |
Test build #42920 has finished for PR 8876 at commit
|
asfgit
pushed a commit
that referenced
this pull request
Sep 23, 2015
… in Python DataFrame. Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take). This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion. Author: Reynold Xin <[email protected]> Closes #8876 from rxin/SPARK-10731. (cherry picked from commit 9952217) Signed-off-by: Reynold Xin <[email protected]>
asfgit
pushed a commit
that referenced
this pull request
Sep 14, 2016
… same in Python ## What changes were proposed in this pull request? In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing. The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.). In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations. ## How was this patch tested? Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries. Author: Josh Rosen <[email protected]> Closes #15068 from JoshRosen/pyspark-collect-limit. (cherry picked from commit 6d06ff6) Signed-off-by: Davies Liu <[email protected]>
asfgit
pushed a commit
that referenced
this pull request
Sep 14, 2016
… same in Python ## What changes were proposed in this pull request? In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing. The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.). In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations. ## How was this patch tested? Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries. Author: Josh Rosen <[email protected]> Closes #15068 from JoshRosen/pyspark-collect-limit.
wgtmac
pushed a commit
to wgtmac/spark
that referenced
this pull request
Sep 19, 2016
… same in Python ## What changes were proposed in this pull request? In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing. The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in apache#7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.). In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / apache#8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations. ## How was this patch tested? Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries. Author: Josh Rosen <[email protected]> Closes apache#15068 from JoshRosen/pyspark-collect-limit.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take).
This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion.