-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37377][SQL] Initial implementation of Storage-Partitioned Join #35657
Conversation
cc @cloud-fan @viirya @dongjoon-hyun @c21 @rdblue @aokolnychyi while I'm still trying to cover more tests, I think this PR is almost ready for review now, and I'd love to get some feedback from you. |
*/ | ||
int numPartitions(); | ||
Distribution distribution(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. To make it less disruptive, I can introduce a new interface and mark this as deprecated, although in that way we may need to add a new method in SupportsReportPartitioning
or create another interface to replace SupportsReportPartitioning
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about keep the methods and mark it as @deprecated
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with that, although these two methods will no longer be called after this PR so it could hit users by surprise.
"avoid shuffle if necessary.") | ||
.version("3.3.0") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default this is false. Previously when V2 data sources report DataSourcePartitioning
, Spark can potentially eliminate shuffle in aggregation. However, with this config they now have to turn this flag in order to get the same behavior.
My primary goal is to disable storage-partitioned join by default. So perhaps I can introduce another flag to control the join behavior and use this to control the aggregate behavior, and set it to true by default.
case d: OrderedDistribution => d.ordering.map(e => toCatalyst(e, query)) | ||
case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query)) | ||
case _: UnspecifiedDistribution => Array.empty[Expression] | ||
val distribution = toCatalystDistribution(write.requiredDistribution(), query) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's close to supporting organize data before writing by repartition/sort by functions defined in FunctionCatalog
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think this PR also helps on the write path too with required distribution & ordering.
Gently ping @cloud-fan @viirya @dongjoon-hyun @c21 @rdblue @aokolnychyi |
...st/src/main/java/org/apache/spark/sql/connector/read/partitioning/ClusteredDistribution.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/partitioning/Distribution.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Outdated
Show resolved
Hide resolved
@@ -117,7 +118,9 @@ case class DataSourceV2Relation( | |||
case class DataSourceV2ScanRelation( | |||
relation: DataSourceV2Relation, | |||
scan: Scan, | |||
output: Seq[AttributeReference]) extends LeafNode with NamedRelation { | |||
output: Seq[AttributeReference], | |||
distribution: Distribution = UnspecifiedDistribution, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, ideally a scan node should report output partitioning, the parent node such as join node reports required distribution. This looks a bit counterintuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I agree with you. What do you think if we pass the clustering expressions instead? something like:
distribution: Option[Seq[Expression]] = None,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use Partitioning
here? Which can be DataSourceHashPartitioning
under the hood.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using
clustering: Option[Seq[Expression]] = None
for now, since DataSourceHashPartitioning
is the outputPartitioning
we construct inside DataSourceV2ScanExecBase
, so it isn't available at this point.
Let me know if you have any other thought.
6660a64
to
2c8e83d
Compare
@cloud-fan updated according to the discuss we had offline. Please take another look when you get a chance. Thanks! |
core/src/main/scala/org/apache/spark/util/collection/Utils.scala
Outdated
Show resolved
Hide resolved
* An interface to represent data distribution requirement, which specifies how the records should | ||
* be distributed among the data partitions (one {@link PartitionReader} outputs data for one | ||
* partition). | ||
* Represents a partitioning where rows are split across partitions based on the expressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... based on the hash value of the "clustering" expressions.
public class HashPartitioning implements Partitioning { | ||
private final Expression[] clustering; | ||
private final int numPartitions; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also include the hash function (v2 function) in this class? it can be optional to represent an unknown hash algorithm.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/partitioning/Partitioning.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/transformExpressions.scala
Outdated
Show resolved
Hide resolved
val resolvedRefs = refs.map(r => resolveRef[NamedExpression](r, query)) | ||
funCatalogOpt.flatMap { catalog => | ||
loadV2Function(catalog, "bucket", resolvedRefs).map { bound => | ||
DataSourceBucketTransformExpression(numBuckets, bound, resolvedRefs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is it different from DataSourceTransformExpression
? I think Hive bucketed table can also use DataSourceTransformExpression
in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only difference is it has the numBuckets
in itself which is used in the equality check. Yes, Hive bucketing can also extend DataSourceTransformExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numBuckets
can be part of the transform function inputs as an int literal. Seems we only need a single class DataSourceTransform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we require the V2 bucket
function to accept two inputs? I think it'll work too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK the bucket
transform function support var-length args: bucket(num_buckets, c1, c2, ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't want DataSourceBucketTransformExpression
then we'd have to create a wrapper BoundFunction
for bucket, such as:
private case class BucketBoundFunction(
numBuckets: Int, internal: BoundFunction) extends BoundFunction {
override def inputTypes(): Array[DataType] = internal.inputTypes()
override def resultType(): DataType = internal.resultType()
override def name(): String = internal.name()
override def canonicalName(): String = ???
}
and pass this to DataSourceTransformExpression
. The problem is we can't easily generate a canonicalName
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I'm confused. What the internal BoundFunction
means for a bucket function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a function for the bucket transform: bucket(num_buckets, c)
(it could be bucket(num_buckets, c1, c2, ..)
in future).
The issue here is canonicalName
for the bucket BoundFunction
, for obvious reason, doesn't consider the value of numBuckets
. However, to check of two bucket transforms are compatible, we need to take that into account. That's why we need the extra DataSourceBucketTransformExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we always check if the inputs are the same first, before checking the function identity? num buckets is a input as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is exactly what we do in DataSourceBucketTransformExpression
. The check is done in ShuffleSpec.isCompatibleWith
when we have bucket expressions from both sides.
|
||
override def equalsTo(other: TransformExpression): Boolean = other match { | ||
case DataSourceTransformExpression(otherFunction, _) => | ||
function.canonicalName() == otherFunction.canonicalName() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we check the children
as well? I don't think f(a, b)
and f(x, y)
can be considered as the same transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we only check whether the two V2 functions are semantically equal here? i.e., given the same input, whether they map to the same output. The comparison on children is done in ShuffleSpec.isCompatibleWith
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, then isSameFunction
is probably a better name here.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
Outdated
Show resolved
Hide resolved
Thanks @dongjoon-hyun , updated. |
74575d4
to
de1972b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. (Pending CIs).
To reviewers, this improvement is included in Max's in-progress list with a different title in dev@spark
mailing list.
SPARK-37377: Refactor V2 Partitioning interface and remove deprecated usage of Distribution
Thank you so much, @sunchao , @cloud-fan , @pan3793 . Merged to master. @sunchao . There is a conflict on branch-3.3. Could you make a backporting PR in order to pass CI again on branch-3.3? I hope we can start more active testing in |
Thank you so much @cloud-fan @dongjoon-hyun and @pan3793 @somani for the review! esp. @cloud-fan for your valuable comments! @dongjoon-hyun I updated the JIRA title & description to reflect the changes here, and yes, will create a separate PR for the backporting to 3.3. |
This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)). Changes: - `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface. - with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated. - `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature. - a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other. - a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`. - changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`. - A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false. Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details. With the changes, a user can now: - have V2 data sources to report distribution and ordering to Spark on read path - Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these. - a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior - Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature - Extended `EnsureRequirementsSuite` to cover `DataSourcePartitioning` - Some existing test classes, such as `InMemoryTable` are extended to cover the changes Closes apache#35657 from sunchao/SPARK-37377-partitioning. Lead-authored-by: Chao Sun <[email protected]> Co-authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
… Join This is a backport of #35657 to `branch-3.3` ### What changes were proposed in this pull request? This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)). Changes: - `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface. - with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated. - `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature. - a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other. - a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`. - changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`. - A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false. ### Why are the changes needed? Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details. ### Does this PR introduce _any_ user-facing change? With the changes, a user can now: - have V2 data sources to report distribution and ordering to Spark on read path - Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these. - a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior ### How was this patch tested? - Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature - Extended `EnsureRequirementsSuite` to cover `KeyGroupedPartitioning` - Some existing test classes, such as `InMemoryTable` are extended to cover the changes Closes #36068 from sunchao/SPARK-37377-3.3. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…an not be translated ### What changes were proposed in this pull request? After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict. Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq. ### Why are the changes needed? `V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. Closes #36697 from pan3793/SPARK-39313. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Chao Sun <[email protected]>
…an not be translated After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict. Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq. `V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT). No. New UT. Closes #36697 from pan3793/SPARK-39313. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Chao Sun <[email protected]>
case IdentityTransform(ref) => | ||
Some(resolveRef[NamedExpression](ref, query)) | ||
case BucketTransform(numBuckets, refs, sorted) | ||
if sorted.isEmpty && refs.length == 1 && refs.forall(_.isInstanceOf[NamedReference]) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the sorted.isEmpty
requirement for?
What changes were proposed in this pull request?
This PR introduces the initial implementation of Storage-Partitioned Join (SPIP).
Changes:
org.apache.spark.sql.connector.read.partitioning.Partitioning
currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalystPartitioning
interface, and added two concrete sub-classes:KeyGroupedPartitioning
andUnknownPartitioning
. This allows a V2 data source to report to Spark it's partition transform expressions, viaSupportsReportPartitioning
interface.org.apache.spark.sql.connector.read.partitioning.Distribution
andorg.apache.spark.sql.connector.read.partitioning.ClusteredDistribution
now are replaced by classes with the same name inorg.apache.spark.sql.connector.distributions
package. Therefore, this PR marks the former two as deprecated.DataSourcePartitioning
used to be inorg.apache.spark.sql.execution.datasources.v2
. This moves it into packageorg.apache.spark.sql.catalyst.plans.physical
and renames it toKeyGroupedPartitioning
, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.TransformExpression
, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later inEnsureRequirements
to check whether join children are compatible with each other.V2ScanPartitioning
, is added to recognizeScan
s implementSupportsReportPartitioning
. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotateDataSourceV2ScanRelation
with the result. These are later propagated intoDataSourceV2ScanExecBase
.DataSourceV2ScanExecBase
to createKeyGroupedPartitioning
for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implementHasPartitionKey
.spark.sql.sources.v2.bucketing.enabled
is introduced to turn on or off the behavior. By default it is false.Why are the changes needed?
Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.
Does this PR introduce any user-facing change?
With the changes, a user can now:
spark.sql.sources.v2.bucketing.enabled
is introduced to turn on/off the above behaviorHow was this patch tested?
KeyGroupedPartitioningSuite
covers end-to-end tests on the new featureEnsureRequirementsSuite
to coverDataSourcePartitioning
InMemoryTable
are extended to cover the changes