-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37377][SQL][3.3] Initial implementation of Storage-Partitioned Join #36068
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)). Changes: - `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface. - with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated. - `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature. - a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other. - a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`. - changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`. - A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false. Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details. With the changes, a user can now: - have V2 data sources to report distribution and ordering to Spark on read path - Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these. - a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior - Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature - Extended `EnsureRequirementsSuite` to cover `DataSourcePartitioning` - Some existing test classes, such as `InMemoryTable` are extended to cover the changes Closes apache#35657 from sunchao/SPARK-37377-partitioning. Lead-authored-by: Chao Sun <[email protected]> Co-authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun
approved these changes
Apr 5, 2022
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (Pending CI)
Merged to branch-3.3 for Apache Spark 3.3.0. cc @MaxGekk . |
dongjoon-hyun
pushed a commit
that referenced
this pull request
Apr 5, 2022
… Join This is a backport of #35657 to `branch-3.3` ### What changes were proposed in this pull request? This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)). Changes: - `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface. - with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated. - `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature. - a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other. - a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`. - changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`. - A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false. ### Why are the changes needed? Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details. ### Does this PR introduce _any_ user-facing change? With the changes, a user can now: - have V2 data sources to report distribution and ordering to Spark on read path - Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these. - a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior ### How was this patch tested? - Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature - Extended `EnsureRequirementsSuite` to cover `KeyGroupedPartitioning` - Some existing test classes, such as `InMemoryTable` are extended to cover the changes Closes #36068 from sunchao/SPARK-37377-3.3. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This is a backport of #35657 to
branch-3.3
What changes were proposed in this pull request?
This PR introduces the initial implementation of Storage-Partitioned Join (SPIP).
Changes:
org.apache.spark.sql.connector.read.partitioning.Partitioning
currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalystPartitioning
interface, and added two concrete sub-classes:KeyGroupedPartitioning
andUnknownPartitioning
. This allows a V2 data source to report to Spark it's partition transform expressions, viaSupportsReportPartitioning
interface.org.apache.spark.sql.connector.read.partitioning.Distribution
andorg.apache.spark.sql.connector.read.partitioning.ClusteredDistribution
now are replaced by classes with the same name inorg.apache.spark.sql.connector.distributions
package. Therefore, this PR marks the former two as deprecated.DataSourcePartitioning
used to be inorg.apache.spark.sql.execution.datasources.v2
. This moves it into packageorg.apache.spark.sql.catalyst.plans.physical
and renames it toKeyGroupedPartitioning
, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.TransformExpression
, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later inEnsureRequirements
to check whether join children are compatible with each other.V2ScanPartitioning
, is added to recognizeScan
s implementSupportsReportPartitioning
. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotateDataSourceV2ScanRelation
with the result. These are later propagated intoDataSourceV2ScanExecBase
.DataSourceV2ScanExecBase
to createKeyGroupedPartitioning
for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implementHasPartitionKey
.spark.sql.sources.v2.bucketing.enabled
is introduced to turn on or off the behavior. By default it is false.Why are the changes needed?
Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.
Does this PR introduce any user-facing change?
With the changes, a user can now:
spark.sql.sources.v2.bucketing.enabled
is introduced to turn on/off the above behaviorHow was this patch tested?
KeyGroupedPartitioningSuite
covers end-to-end tests on the new featureEnsureRequirementsSuite
to coverKeyGroupedPartitioning
InMemoryTable
are extended to cover the changes