Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37377][SQL][3.3] Initial implementation of Storage-Partitioned Join #36068

Closed
wants to merge 1 commit into from

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Apr 5, 2022

This is a backport of #35657 to branch-3.3

What changes were proposed in this pull request?

This PR introduces the initial implementation of Storage-Partitioned Join (SPIP).

Changes:

  • org.apache.spark.sql.connector.read.partitioning.Partitioning currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst Partitioning interface, and added two concrete sub-classes: KeyGroupedPartitioning and UnknownPartitioning. This allows a V2 data source to report to Spark it's partition transform expressions, via SupportsReportPartitioning interface.
  • with the above change, org.apache.spark.sql.connector.read.partitioning.Distribution and org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution now are replaced by classes with the same name in org.apache.spark.sql.connector.distributions package. Therefore, this PR marks the former two as deprecated.
  • DataSourcePartitioning used to be in org.apache.spark.sql.execution.datasources.v2. This moves it into package org.apache.spark.sql.catalyst.plans.physical and renames it to KeyGroupedPartitioning, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.
  • a new expression type: TransformExpression, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in EnsureRequirements to check whether join children are compatible with each other.
  • a new optimizer rule: V2ScanPartitioning, is added to recognize Scans implement SupportsReportPartitioning. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate DataSourceV2ScanRelation with the result. These are later propagated into DataSourceV2ScanExecBase.
  • changes are made in DataSourceV2ScanExecBase to create KeyGroupedPartitioning for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement HasPartitionKey.
  • A new config: spark.sql.sources.v2.bucketing.enabled is introduced to turn on or off the behavior. By default it is false.

Why are the changes needed?

Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.

Does this PR introduce any user-facing change?

With the changes, a user can now:

  • have V2 data sources to report distribution and ordering to Spark on read path
  • Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these.
  • a new config spark.sql.sources.v2.bucketing.enabled is introduced to turn on/off the above behavior

How was this patch tested?

  • Added a new test suite KeyGroupedPartitioningSuite covers end-to-end tests on the new feature
  • Extended EnsureRequirementsSuite to cover KeyGroupedPartitioning
  • Some existing test classes, such as InMemoryTable are extended to cover the changes

This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)).

Changes:
- `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface.
- with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated.
- `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.
- a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other.
- a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`.
- changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`.
- A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false.

Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.

With the changes, a user can now:
- have V2 data sources to report distribution and ordering to Spark on read path
- Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these.
- a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior

- Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature
- Extended `EnsureRequirementsSuite` to cover `DataSourcePartitioning`
- Some existing test classes, such as `InMemoryTable` are extended to cover the changes

Closes apache#35657 from sunchao/SPARK-37377-partitioning.

Lead-authored-by: Chao Sun <[email protected]>
Co-authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CI)

@dongjoon-hyun
Copy link
Member

Merged to branch-3.3 for Apache Spark 3.3.0.

cc @MaxGekk .

dongjoon-hyun pushed a commit that referenced this pull request Apr 5, 2022
… Join

This is a backport of #35657 to `branch-3.3`

### What changes were proposed in this pull request?

This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)).

Changes:
- `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface.
- with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated.
- `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.
- a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other.
- a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`.
- changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`.
- A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false.

### Why are the changes needed?

Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.

### Does this PR introduce _any_ user-facing change?

With the changes, a user can now:
- have V2 data sources to report distribution and ordering to Spark on read path
- Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these.
- a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior

### How was this patch tested?

- Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature
- Extended `EnsureRequirementsSuite` to cover `KeyGroupedPartitioning`
- Some existing test classes, such as `InMemoryTable` are extended to cover the changes

Closes #36068 from sunchao/SPARK-37377-3.3.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants