Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect #40834

Closed
wants to merge 17 commits into from

Conversation

bogao007
Copy link
Contributor

What changes were proposed in this pull request?

Implemented dropDuplicatesWithinWatermark Python API for Spark Connect. This change is based on a previous commit that introduced dropDuplicatesWithinWatermark API in Spark.

Why are the changes needed?

We recently introduced dropDuplicatesWithinWatermark API in Spark (commit link). We want to bring parity to the Spark Connect.

Does this PR introduce any user-facing change?

Yes, this introduces a new public API, dropDuplicatesWithinWatermark in Spark Connect.

How was this patch tested?

Added new test cases in test suites.

@@ -744,6 +746,39 @@ class SparkConnectPlanner(val session: SparkSession) {
}
}

private def transformDeduplicateWithinWatermark(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse transformDeduplicate(). We can pass in 'isWithinWatermark' flag to it. This code looks exactly same of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will update

@@ -363,6 +364,23 @@ message Deduplicate {
optional bool all_columns_as_keys = 3;
}

// Relation of type [[DeduplicateWithinWatermark]] which have duplicate rows removed within the time
// range of watermark, could consider either only the subset of columns or all the columns.
message DeduplicateWithinWatermark {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: We can just reuse Deduplicate message inside here :).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I can think of is to update to something like below:

message DeduplicateWithinWatermark {

  // (Required) Reuse the Deduplicate message for a DeduplicateWithinWatermark.
  Deduplicate deduplicate = 1;
}

But if we do that, DeduplicateWithinWatermark.input would change to DeduplicateWithinWatermark.deduplicate.input. Is there a way to avoid that? If not, I think it's probably better to keep what it is today.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this option does not look good. We could add a flag 'within_watermark' to Deduplicate message. That way we can reuse the code both on the client and server side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, updated.

@bogao007
Copy link
Contributor Author

@rangadi a general question: when I use dev/connect-gen-protos.sh to generate protobuf related changes, it automatically added a lot of lines of @typing_extensions.final to different files, should I remove this in my PR?

@rangadi
Copy link

rangadi commented Apr 19, 2023

when I use dev/connect-gen-protos.sh to generate protobuf related changes, it automatically added a lot of lines of @typing_extensions.final to different files, should I remove this in my PR?

No need. That is fine. It is a known problem.

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. One more tweak suggested.
LGTM after that.

@@ -68,6 +68,7 @@ message Relation {
WithWatermark with_watermark = 33;
ApplyInPandasWithState apply_in_pandas_with_state = 34;
HtmlString html_string = 35;
Deduplicate deduplicate_within_watermark = 36;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not required, right? The flag in Deduplicate will indicate this.

@@ -90,6 +90,8 @@ class SparkConnectPlanner(val session: SparkSession) {
case proto.Relation.RelTypeCase.TAIL => transformTail(rel.getTail)
case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
case proto.Relation.RelTypeCase.DEDUPLICATE => transformDeduplicate(rel.getDeduplicate)
case proto.Relation.RelTypeCase.DEDUPLICATE_WITHIN_WATERMARK =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connected to the proto comment. We don't need this field.

@@ -623,6 +623,30 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
return plan


class DeduplicateWithinWatermark(LogicalPlan):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't ned this anymore, right? we can add the flag to Deduplicate above to match the rest of the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I noticed that as well, doing the change now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                 —— SHIP IT ——

                   ,:',:`,:'
                __||_||_||_||__
           ____["""""""""""""""]____
           \ " '''''''''''''''''''' |
    ~^~~^~^~^^~^~^~^~^~^~^~^~~^~^~^^~~^~^

@@ -750,7 +751,8 @@ class SparkConnectPlanner(val session: SparkSession) {
}
cols
}
Deduplicate(groupCols, queryExecution.analyzed)
if (rel.getWithinWatermark) DeduplicateWithinWatermark(groupCols, queryExecution.analyzed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, should we have a dedicated protobuf message for DeduplicateWithinWatermark? Seems like we don't also share the same type of logical plan. Do you have any preference on this, @grundprinzip and @amaliujia?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, just noticed that it was discussed at #40834 (comment). I will go ahead and merge if you guys don't have preference. I don't feel strongly about this either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon thanks. This is much simpler code wise. 1:1 for logical plans is not strictly required, I hope.

Copy link
Contributor

@amaliujia amaliujia Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is anything wrong, I think deprecating a field is easier than deprecating a new relation type. Probably starting from this by adding a new flag is a good beginning.

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

@bogao007 what's your JIRA id? I need to assign you in the JIRA ticket.

@bogao007
Copy link
Contributor Author

@bogao007 what's your JIRA id? I need to assign you in the JIRA ticket.

I think this might be my JIRA id 62cbecffa94a6f9c0efe1622, let me know if it doesn't work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants