Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37377][SQL] Initial implementation of Storage-Partitioned Join #35657

Closed
wants to merge 13 commits into from

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Feb 25, 2022

What changes were proposed in this pull request?

This PR introduces the initial implementation of Storage-Partitioned Join (SPIP).

Changes:

  • org.apache.spark.sql.connector.read.partitioning.Partitioning currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst Partitioning interface, and added two concrete sub-classes: KeyGroupedPartitioning and UnknownPartitioning. This allows a V2 data source to report to Spark it's partition transform expressions, via SupportsReportPartitioning interface.
  • with the above change, org.apache.spark.sql.connector.read.partitioning.Distribution and org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution now are replaced by classes with the same name in org.apache.spark.sql.connector.distributions package. Therefore, this PR marks the former two as deprecated.
  • DataSourcePartitioning used to be in org.apache.spark.sql.execution.datasources.v2. This moves it into package org.apache.spark.sql.catalyst.plans.physical and renames it to KeyGroupedPartitioning, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.
  • a new expression type: TransformExpression, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in EnsureRequirements to check whether join children are compatible with each other.
  • a new optimizer rule: V2ScanPartitioning, is added to recognize Scans implement SupportsReportPartitioning. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate DataSourceV2ScanRelation with the result. These are later propagated into DataSourceV2ScanExecBase.
  • changes are made in DataSourceV2ScanExecBase to create KeyGroupedPartitioning for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement HasPartitionKey.
  • A new config: spark.sql.sources.v2.bucketing.enabled is introduced to turn on or off the behavior. By default it is false.

Why are the changes needed?

Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.

Does this PR introduce any user-facing change?

With the changes, a user can now:

  • have V2 data sources to report distribution and ordering to Spark on read path
  • Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these.
  • a new config spark.sql.sources.v2.bucketing.enabled is introduced to turn on/off the above behavior

How was this patch tested?

  • Added a new test suite KeyGroupedPartitioningSuite covers end-to-end tests on the new feature
  • Extended EnsureRequirementsSuite to cover DataSourcePartitioning
  • Some existing test classes, such as InMemoryTable are extended to cover the changes

@sunchao
Copy link
Member Author

sunchao commented Feb 25, 2022

cc @cloud-fan @viirya @dongjoon-hyun @c21 @rdblue @aokolnychyi while I'm still trying to cover more tests, I think this PR is almost ready for review now, and I'd love to get some feedback from you.

*/
int numPartitions();
Distribution distribution();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. To make it less disruptive, I can introduce a new interface and mark this as deprecated, although in that way we may need to add a new method in SupportsReportPartitioning or create another interface to replace SupportsReportPartitioning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about keep the methods and mark it as @deprecated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with that, although these two methods will no longer be called after this PR so it could hit users by surprise.

"avoid shuffle if necessary.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default this is false. Previously when V2 data sources report DataSourcePartitioning, Spark can potentially eliminate shuffle in aggregation. However, with this config they now have to turn this flag in order to get the same behavior.

My primary goal is to disable storage-partitioned join by default. So perhaps I can introduce another flag to control the join behavior and use this to control the aggregate behavior, and set it to true by default.

case d: OrderedDistribution => d.ordering.map(e => toCatalyst(e, query))
case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query))
case _: UnspecifiedDistribution => Array.empty[Expression]
val distribution = toCatalystDistribution(write.requiredDistribution(), query) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's close to supporting organize data before writing by repartition/sort by functions defined in FunctionCatalog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this PR also helps on the write path too with required distribution & ordering.

@sunchao
Copy link
Member Author

sunchao commented Mar 3, 2022

@@ -117,7 +118,9 @@ case class DataSourceV2Relation(
case class DataSourceV2ScanRelation(
relation: DataSourceV2Relation,
scan: Scan,
output: Seq[AttributeReference]) extends LeafNode with NamedRelation {
output: Seq[AttributeReference],
distribution: Distribution = UnspecifiedDistribution,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, ideally a scan node should report output partitioning, the parent node such as join node reports required distribution. This looks a bit counterintuitive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I agree with you. What do you think if we pass the clustering expressions instead? something like:

distribution: Option[Seq[Expression]] = None,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use Partitioning here? Which can be DataSourceHashPartitioning under the hood.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using

clustering: Option[Seq[Expression]] = None

for now, since DataSourceHashPartitioning is the outputPartitioning we construct inside DataSourceV2ScanExecBase, so it isn't available at this point.

Let me know if you have any other thought.

@sunchao sunchao force-pushed the SPARK-37377-partitioning branch 5 times, most recently from 6660a64 to 2c8e83d Compare March 15, 2022 01:46
@sunchao
Copy link
Member Author

sunchao commented Mar 15, 2022

@cloud-fan updated according to the discuss we had offline. Please take another look when you get a chance. Thanks!

* An interface to represent data distribution requirement, which specifies how the records should
* be distributed among the data partitions (one {@link PartitionReader} outputs data for one
* partition).
* Represents a partitioning where rows are split across partitions based on the expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... based on the hash value of the "clustering" expressions.

public class HashPartitioning implements Partitioning {
private final Expression[] clustering;
private final int numPartitions;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also include the hash function (v2 function) in this class? it can be optional to represent an unknown hash algorithm.

val resolvedRefs = refs.map(r => resolveRef[NamedExpression](r, query))
funCatalogOpt.flatMap { catalog =>
loadV2Function(catalog, "bucket", resolvedRefs).map { bound =>
DataSourceBucketTransformExpression(numBuckets, bound, resolvedRefs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is it different from DataSourceTransformExpression? I think Hive bucketed table can also use DataSourceTransformExpression in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference is it has the numBuckets in itself which is used in the equality check. Yes, Hive bucketing can also extend DataSourceTransformExpression

Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numBuckets can be part of the transform function inputs as an int literal. Seems we only need a single class DataSourceTransform

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we require the V2 bucket function to accept two inputs? I think it'll work too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the bucket transform function support var-length args: bucket(num_buckets, c1, c2, ...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't want DataSourceBucketTransformExpression then we'd have to create a wrapper BoundFunction for bucket, such as:

  private case class BucketBoundFunction(
      numBuckets: Int, internal: BoundFunction) extends BoundFunction {
    override def inputTypes(): Array[DataType] = internal.inputTypes()
    override def resultType(): DataType = internal.resultType()
    override def name(): String = internal.name()
    override def canonicalName(): String = ???
  }

and pass this to DataSourceTransformExpression . The problem is we can't easily generate a canonicalName here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I'm confused. What the internal BoundFunction means for a bucket function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a function for the bucket transform: bucket(num_buckets, c) (it could be bucket(num_buckets, c1, c2, ..) in future).

The issue here is canonicalName for the bucket BoundFunction, for obvious reason, doesn't consider the value of numBuckets. However, to check of two bucket transforms are compatible, we need to take that into account. That's why we need the extra DataSourceBucketTransformExpression

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we always check if the inputs are the same first, before checking the function identity? num buckets is a input as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is exactly what we do in DataSourceBucketTransformExpression. The check is done in ShuffleSpec.isCompatibleWith when we have bucket expressions from both sides.


override def equalsTo(other: TransformExpression): Boolean = other match {
case DataSourceTransformExpression(otherFunction, _) =>
function.canonicalName() == otherFunction.canonicalName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we check the children as well? I don't think f(a, b) and f(x, y) can be considered as the same transform.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only check whether the two V2 functions are semantically equal here? i.e., given the same input, whether they map to the same output. The comparison on children is done in ShuffleSpec.isCompatibleWith.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, then isSameFunction is probably a better name here.

@sunchao
Copy link
Member Author

sunchao commented Apr 3, 2022

Thanks @dongjoon-hyun , updated.

@sunchao sunchao force-pushed the SPARK-37377-partitioning branch from 74575d4 to de1972b Compare April 3, 2022 22:36
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. (Pending CIs).

To reviewers, this improvement is included in Max's in-progress list with a different title in dev@spark mailing list.

SPARK-37377: Refactor V2 Partitioning interface and remove deprecated usage of Distribution

@dongjoon-hyun
Copy link
Member

Thank you so much, @sunchao , @cloud-fan , @pan3793 . Merged to master.

@sunchao . There is a conflict on branch-3.3. Could you make a backporting PR in order to pass CI again on branch-3.3?

I hope we can start more active testing in branch-3.3.

@sunchao
Copy link
Member Author

sunchao commented Apr 5, 2022

Thank you so much @cloud-fan @dongjoon-hyun and @pan3793 @somani for the review! esp. @cloud-fan for your valuable comments!

@dongjoon-hyun I updated the JIRA title & description to reflect the changes here, and yes, will create a separate PR for the backporting to 3.3.

sunchao added a commit to sunchao/spark that referenced this pull request Apr 5, 2022
This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)).

Changes:
- `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface.
- with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated.
- `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.
- a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other.
- a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`.
- changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`.
- A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false.

Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.

With the changes, a user can now:
- have V2 data sources to report distribution and ordering to Spark on read path
- Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these.
- a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior

- Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature
- Extended `EnsureRequirementsSuite` to cover `DataSourcePartitioning`
- Some existing test classes, such as `InMemoryTable` are extended to cover the changes

Closes apache#35657 from sunchao/SPARK-37377-partitioning.

Lead-authored-by: Chao Sun <[email protected]>
Co-authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Apr 5, 2022
… Join

This is a backport of #35657 to `branch-3.3`

### What changes were proposed in this pull request?

This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)).

Changes:
- `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface.
- with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated.
- `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.
- a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other.
- a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`.
- changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`.
- A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false.

### Why are the changes needed?

Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.

### Does this PR introduce _any_ user-facing change?

With the changes, a user can now:
- have V2 data sources to report distribution and ordering to Spark on read path
- Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these.
- a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior

### How was this patch tested?

- Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature
- Extended `EnsureRequirementsSuite` to cover `KeyGroupedPartitioning`
- Some existing test classes, such as `InMemoryTable` are extended to cover the changes

Closes #36068 from sunchao/SPARK-37377-3.3.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
sunchao pushed a commit that referenced this pull request Jun 1, 2022
…an not be translated

### What changes were proposed in this pull request?

After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict.

Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq.

### Why are the changes needed?

`V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT.

Closes #36697 from pan3793/SPARK-39313.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
sunchao pushed a commit that referenced this pull request Jun 1, 2022
…an not be translated

After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict.

Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq.

`V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT).

No.

New UT.

Closes #36697 from pan3793/SPARK-39313.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
case IdentityTransform(ref) =>
Some(resolveRef[NamedExpression](ref, query))
case BucketTransform(numBuckets, refs, sorted)
if sorted.isEmpty && refs.length == 1 && refs.forall(_.isInstanceOf[NamedReference]) =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the sorted.isEmpty requirement for?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants