-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect #40834
Conversation
@@ -744,6 +746,39 @@ class SparkConnectPlanner(val session: SparkSession) { | |||
} | |||
} | |||
|
|||
private def transformDeduplicateWithinWatermark( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse transformDeduplicate()
. We can pass in 'isWithinWatermark' flag to it. This code looks exactly same of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will update
@@ -363,6 +364,23 @@ message Deduplicate { | |||
optional bool all_columns_as_keys = 3; | |||
} | |||
|
|||
// Relation of type [[DeduplicateWithinWatermark]] which have duplicate rows removed within the time | |||
// range of watermark, could consider either only the subset of columns or all the columns. | |||
message DeduplicateWithinWatermark { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: We can just reuse Deduplicate message inside here :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I can think of is to update to something like below:
message DeduplicateWithinWatermark {
// (Required) Reuse the Deduplicate message for a DeduplicateWithinWatermark.
Deduplicate deduplicate = 1;
}
But if we do that, DeduplicateWithinWatermark.input
would change to DeduplicateWithinWatermark.deduplicate.input
. Is there a way to avoid that? If not, I think it's probably better to keep what it is today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this option does not look good. We could add a flag 'within_watermark' to Deduplicate message. That way we can reuse the code both on the client and server side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, updated.
@rangadi a general question: when I use |
No need. That is fine. It is a known problem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. One more tweak suggested.
LGTM after that.
@@ -68,6 +68,7 @@ message Relation { | |||
WithWatermark with_watermark = 33; | |||
ApplyInPandasWithState apply_in_pandas_with_state = 34; | |||
HtmlString html_string = 35; | |||
Deduplicate deduplicate_within_watermark = 36; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not required, right? The flag in Deduplicate will indicate this.
@@ -90,6 +90,8 @@ class SparkConnectPlanner(val session: SparkSession) { | |||
case proto.Relation.RelTypeCase.TAIL => transformTail(rel.getTail) | |||
case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin) | |||
case proto.Relation.RelTypeCase.DEDUPLICATE => transformDeduplicate(rel.getDeduplicate) | |||
case proto.Relation.RelTypeCase.DEDUPLICATE_WITHIN_WATERMARK => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connected to the proto comment. We don't need this field.
python/pyspark/sql/connect/plan.py
Outdated
@@ -623,6 +623,30 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: | |||
return plan | |||
|
|||
|
|||
class DeduplicateWithinWatermark(LogicalPlan): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't ned this anymore, right? we can add the flag to Deduplicate
above to match the rest of the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I noticed that as well, doing the change now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
—— SHIP IT ——
,:',:`,:'
__||_||_||_||__
____["""""""""""""""]____
\ " '''''''''''''''''''' |
~^~~^~^~^^~^~^~^~^~^~^~^~~^~^~^^~~^~^
@@ -750,7 +751,8 @@ class SparkConnectPlanner(val session: SparkSession) { | |||
} | |||
cols | |||
} | |||
Deduplicate(groupCols, queryExecution.analyzed) | |||
if (rel.getWithinWatermark) DeduplicateWithinWatermark(groupCols, queryExecution.analyzed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, should we have a dedicated protobuf message for DeduplicateWithinWatermark
? Seems like we don't also share the same type of logical plan. Do you have any preference on this, @grundprinzip and @amaliujia?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, just noticed that it was discussed at #40834 (comment). I will go ahead and merge if you guys don't have preference. I don't feel strongly about this either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also @hvanhovell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon thanks. This is much simpler code wise. 1:1 for logical plans is not strictly required, I hope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is anything wrong, I think deprecating a field is easier than deprecating a new relation type. Probably starting from this by adding a new flag is a good beginning.
Merged to master. |
@bogao007 what's your JIRA id? I need to assign you in the JIRA ticket. |
I think this might be my JIRA id |
What changes were proposed in this pull request?
Implemented
dropDuplicatesWithinWatermark
Python API for Spark Connect. This change is based on a previous commit that introduceddropDuplicatesWithinWatermark
API in Spark.Why are the changes needed?
We recently introduced dropDuplicatesWithinWatermark API in Spark (commit link). We want to bring parity to the Spark Connect.
Does this PR introduce any user-facing change?
Yes, this introduces a new public API, dropDuplicatesWithinWatermark in Spark Connect.
How was this patch tested?
Added new test cases in test suites.