-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect #40834
[SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect #40834
Changes from 9 commits
dd388c6
252dfb9
e7ae746
68bec83
9721aca
5305509
9a5d471
bfbafa5
a007ac2
7e56c75
55fdc47
efd6abb
be96624
5989ad6
a26d344
465e6c2
2f63f7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.expressions._ | |
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException, ParserUtils} | ||
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin} | ||
import org.apache.spark.sql.catalyst.plans.logical | ||
import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, CommandResult, Deduplicate, DeserializeToObject, Except, Intersect, LocalRelation, LogicalPlan, MapPartitions, Project, Sample, SerializeFromObject, Sort, SubqueryAlias, TypedFilter, Union, Unpivot, UnresolvedHint} | ||
import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, CommandResult, Deduplicate, DeduplicateWithinWatermark, DeserializeToObject, Except, Intersect, LocalRelation, LogicalPlan, MapPartitions, Project, Sample, SerializeFromObject, Sort, SubqueryAlias, TypedFilter, Union, Unpivot, UnresolvedHint} | ||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} | ||
import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager | ||
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput, LiteralValueProtoConverter, StorageLevelProtoConverter, UdfPacket} | ||
|
@@ -90,6 +90,8 @@ class SparkConnectPlanner(val session: SparkSession) { | |
case proto.Relation.RelTypeCase.TAIL => transformTail(rel.getTail) | ||
case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin) | ||
case proto.Relation.RelTypeCase.DEDUPLICATE => transformDeduplicate(rel.getDeduplicate) | ||
case proto.Relation.RelTypeCase.DEDUPLICATE_WITHIN_WATERMARK => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Connected to the proto comment. We don't need this field. |
||
transformDeduplicate(rel.getDeduplicateWithinWatermark, isWithinWatermark = true) | ||
case proto.Relation.RelTypeCase.SET_OP => transformSetOperation(rel.getSetOp) | ||
case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort) | ||
case proto.Relation.RelTypeCase.DROP => transformDrop(rel.getDrop) | ||
|
@@ -723,7 +725,8 @@ class SparkConnectPlanner(val session: SparkSession) { | |
CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput)) | ||
} | ||
|
||
private def transformDeduplicate(rel: proto.Deduplicate): LogicalPlan = { | ||
private def transformDeduplicate(rel: proto.Deduplicate, | ||
isWithinWatermark: Boolean = false): LogicalPlan = { | ||
if (!rel.hasInput) { | ||
throw InvalidPlanInput("Deduplicate needs a plan input") | ||
} | ||
|
@@ -738,7 +741,8 @@ class SparkConnectPlanner(val session: SparkSession) { | |
val resolver = session.sessionState.analyzer.resolver | ||
val allColumns = queryExecution.analyzed.output | ||
if (rel.getAllColumnsAsKeys) { | ||
Deduplicate(allColumns, queryExecution.analyzed) | ||
if (isWithinWatermark) DeduplicateWithinWatermark(allColumns, queryExecution.analyzed) | ||
else Deduplicate(allColumns, queryExecution.analyzed) | ||
} else { | ||
val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq | ||
val groupCols = toGroupColumnNames.flatMap { (colName: String) => | ||
|
@@ -750,7 +754,8 @@ class SparkConnectPlanner(val session: SparkSession) { | |
} | ||
cols | ||
} | ||
Deduplicate(groupCols, queryExecution.analyzed) | ||
if (isWithinWatermark) DeduplicateWithinWatermark(groupCols, queryExecution.analyzed) | ||
else Deduplicate(groupCols, queryExecution.analyzed) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -623,6 +623,27 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: | |
return plan | ||
|
||
|
||
class DeduplicateWithinWatermark(LogicalPlan): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't ned this anymore, right? we can add the flag to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I noticed that as well, doing the change now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
def __init__( | ||
self, | ||
child: Optional["LogicalPlan"], | ||
all_columns_as_keys: bool = False, | ||
column_names: Optional[List[str]] = None, | ||
) -> None: | ||
super().__init__(child) | ||
self.all_columns_as_keys = all_columns_as_keys | ||
self.column_names = column_names | ||
|
||
def plan(self, session: "SparkConnectClient") -> proto.Relation: | ||
assert self._child is not None | ||
plan = self._create_proto_relation() | ||
plan.deduplicate_within_watermark.input.CopyFrom(self._child.plan(session)) | ||
plan.deduplicate_within_watermark.all_columns_as_keys = self.all_columns_as_keys | ||
if self.column_names is not None: | ||
plan.deduplicate_within_watermark.column_names.extend(self.column_names) | ||
return plan | ||
|
||
|
||
class Sort(LogicalPlan): | ||
def __init__( | ||
self, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not required, right? The flag in Deduplicate will indicate this.