Skip to content

Commit

Permalink
[SPARK-38124][SS][FOLLOWUP] Document the current challenge on fixing …
Browse files Browse the repository at this point in the history
…distribution of stateful operator

### What changes were proposed in this pull request?

This PR proposes to add the context of current challenge on fixing distribution of stateful operator, even the distribution is a sort of "broken" now.

This PR addresses the review comment #35419 (comment)

### Why are the changes needed?

In SPARK-38124 we figured out the existing long-standing problem in stateful operator, but it is not easy to fix since the fix may break the existing query if the fix is not carefully designed. Anyone should also be pretty much careful when touching the required distribution. We want to document this explicitly to help others to be careful whenever someone is around the codebase.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Code comment only changes.

Closes #35512 from HeartSaVioR/SPARK-38124-followup.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Feb 15, 2022
1 parent 88696eb commit 0be132c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ case class ClusteredDistribution(
* Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
* stateful operator, only [[HashPartitioning]] (and HashPartitioning in
* [[PartitioningCollection]]) can satisfy this distribution.
*
* NOTE: This is applied only to stream-stream join as of now. For other stateful operators, we
* have been using ClusteredDistribution, which could construct the physical partitioning of the
* state in different way (ClusteredDistribution requires relaxed condition and multiple
* partitionings can satisfy the requirement.) We need to construct the way to fix this with
* minimizing possibility to break the existing checkpoints.
*
* TODO(SPARK-38204): address the issue explained in above note.
*/
case class StatefulOpClusteredDistribution(
expressions: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ case class FlatMapGroupsWithStateExec(
* to have the same grouping so that the data are co-lacated on the same task.
*/
override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) ::
ClusteredDistribution(initialStateGroupAttrs, stateInfo.map(_.numPartitions)) ::
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ case class StateStoreRestoreExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
if (keyExpressions.isEmpty) {
AllTuples :: Nil
} else {
Expand Down Expand Up @@ -493,6 +496,9 @@ case class StateStoreSaveExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
if (keyExpressions.isEmpty) {
AllTuples :: Nil
} else {
Expand Down Expand Up @@ -573,6 +579,9 @@ case class SessionWindowStateStoreRestoreExec(
}

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(keyWithoutSessionExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

Expand Down Expand Up @@ -684,6 +693,9 @@ case class SessionWindowStateStoreSaveExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

Expand Down Expand Up @@ -741,8 +753,12 @@ case class StreamingDeduplicateExec(
extends UnaryExecNode with StateStoreWriter with WatermarkSupport {

/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
Expand Down

0 comments on commit 0be132c

Please sign in to comment.