-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect #40834
[SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect #40834
Changes from 10 commits
dd388c6
252dfb9
e7ae746
68bec83
9721aca
5305509
9a5d471
bfbafa5
a007ac2
7e56c75
55fdc47
efd6abb
be96624
5989ad6
a26d344
465e6c2
2f63f7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -623,6 +623,30 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: | |
return plan | ||
|
||
|
||
class DeduplicateWithinWatermark(LogicalPlan): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't ned this anymore, right? we can add the flag to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I noticed that as well, doing the change now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
def __init__( | ||
self, | ||
child: Optional["LogicalPlan"], | ||
all_columns_as_keys: bool = False, | ||
column_names: Optional[List[str]] = None, | ||
within_watermark: bool = True, | ||
) -> None: | ||
super().__init__(child) | ||
self.all_columns_as_keys = all_columns_as_keys | ||
self.column_names = column_names | ||
self.within_watermark = within_watermark | ||
|
||
def plan(self, session: "SparkConnectClient") -> proto.Relation: | ||
assert self._child is not None | ||
plan = self._create_proto_relation() | ||
plan.deduplicate.input.CopyFrom(self._child.plan(session)) | ||
plan.deduplicate.all_columns_as_keys = self.all_columns_as_keys | ||
plan.deduplicate.within_watermark = self.within_watermark | ||
if self.column_names is not None: | ||
plan.deduplicate.column_names.extend(self.column_names) | ||
return plan | ||
|
||
|
||
class Sort(LogicalPlan): | ||
def __init__( | ||
self, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, should we have a dedicated protobuf message for
DeduplicateWithinWatermark
? Seems like we don't also share the same type of logical plan. Do you have any preference on this, @grundprinzip and @amaliujia?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, just noticed that it was discussed at #40834 (comment). I will go ahead and merge if you guys don't have preference. I don't feel strongly about this either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also @hvanhovell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon thanks. This is much simpler code wise. 1:1 for logical plans is not strictly required, I hope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is anything wrong, I think deprecating a field is easier than deprecating a new relation type. Probably starting from this by adding a new flag is a good beginning.