Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48567][SS] StreamingQuery.lastProgress should return the actual StreamingQueryProgress #46921

Closed
wants to merge 14 commits into from

Conversation

WweiL
Copy link
Contributor

@WweiL WweiL commented Jun 8, 2024

What changes were proposed in this pull request?

This PR is created after discussion in this closed one: #46886
I was trying to fix a bug (in connect, query.lastProgress doesn't have numInputRows, inputRowsPerSecond, and processedRowsPerSecond), and we reached the conclusion that what purposed in this PR should be the ultimate fix.

In python, for both classic spark and spark connect, the return type of lastProgress is Dict (and recentProgress is List[Dict]), but in scala it's the actual StreamingQueryProgress object:

def recentProgress: Array[StreamingQueryProgress]
/**
* Returns the most recent [[StreamingQueryProgress]] update of this streaming query.
*
* @since 2.1.0
*/
def lastProgress: StreamingQueryProgress

This API discrepancy brings some confusion, like in Scala, users can do query.lastProgress.batchId, while in Python they have to do query.lastProgress["batchId"].

This PR makes StreamingQuery.lastProgress to return the actual StreamingQueryProgress (and StreamingQuery.recentProgress to return List[StreamingQueryProgress]).

To prevent breaking change, we extend StreamingQueryProgress to be a subclass of dict, so existing code accessing using dictionary method (e.g. query.lastProgress["id"]) is still functional.

Why are the changes needed?

API parity

Does this PR introduce any user-facing change?

Yes, now StreamingQuery.lastProgress returns the actual StreamingQueryProgress (and StreamingQuery.recentProgress returns List[StreamingQueryProgress]).

How was this patch tested?

Added unit test

Was this patch authored or co-authored using generative AI tooling?

No

@WweiL
Copy link
Contributor Author

WweiL commented Jun 8, 2024

Pending merging of #46920

# Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which casts id and runId
# to string. To prevent breaking change, also cast them to string when accessed with
# __getitem__.
if key == "id" or key == "runId":
Copy link
Contributor Author

@WweiL WweiL Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is really needed. But if we delete this if, now "query.lastProgress["id"]" would return type uuid, before it was string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there would be lots of breaking changes (e.g. now the sources method also return the actual SourceProgress

def sources(self) -> List["SourceProgress"]:

let me also make these subclass of dict...

@WweiL WweiL changed the title [SPARK-48567][DO-NOT-REVIEW] last progress [SPARK-48567][DO-NOT-REVIEW] Make StreamingQueryProgress and friends subclasses of dict Jun 8, 2024
else:
return getattr(self, key)

def __setitem__(self, key, value):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the fear of users ever set the value of the returned dict before this change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm .. but the end users can't access to this value if I am reading this correctly?

Copy link
Contributor Author

@WweiL WweiL Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fear is backward compatibility. This is possible in current master:

>>> q = spark.readStream.format("rate").load().writeStream.format("noop").start()
24/06/10 16:10:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-709975db-23ed-4838-b9ae-93a7ffe59183. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/06/10 16:10:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
>>> p = q.lastProgress
>>> p
{'id': '44510846-29f8-4218-95cf-616efecadb05', 'runId': 'afcac0a7-424b-428b-948e-2c0fc21a43a2', 'name': None, 'timestamp': '2024-06-10T23:10:38.257Z', 'batchId': 2, 'batchDuration': 215, 'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 4.651162790697675, 'durationMs': {'addBatch': 30, 'commitOffsets': 82, 'getBatch': 0, 'latestOffset': 0, 'queryPlanning': 4, 'triggerExecution': 215, 'walCommit': 98}, 'stateOperators': [], 'sources': [{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 4.651162790697675}], 'sink': {'description': 'org.apache.spark.sql.execution.datasources.noop.NoopTable$@67a2b2a4', 'numOutputRows': 1}}
>>> p["id"]
'44510846-29f8-4218-95cf-616efecadb05'
>>> p["id"] = "aaaaaaa"
>>> p["id"]
'aaaaaaa'

This is not possible in Scala of course, but not sure if we should keep this python specific behavior....

@WweiL WweiL changed the title [SPARK-48567][DO-NOT-REVIEW] Make StreamingQueryProgress and friends subclasses of dict [SPARK-48567][SS] Make StreamingQueryProgress and friends subclasses of dict Jun 10, 2024
@WweiL WweiL changed the title [SPARK-48567][SS] Make StreamingQueryProgress and friends subclasses of dict [SPARK-48567][SS] StreamingQuery.lastProgress should return the actual StreamingQueryProgress Jun 10, 2024
@WweiL
Copy link
Contributor Author

WweiL commented Jun 10, 2024

cc @HyukjinKwon @LuciferYang @HeartSaVioR Could you guys take a look? Thank you so much!!

else:
return getattr(self, key)

def __setitem__(self, key, value):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm .. but the end users can't access to this value if I am reading this correctly?

@WweiL WweiL requested a review from HyukjinKwon June 12, 2024 00:39
else:
return getattr(self, key)

def __setitem__(self, key: str, value: Any) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the fixes of __getitem__ and __setitem__ but we do self.update(dict(id=id, runId=runId, ...)) at __init__?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sure let me do that

@WweiL WweiL requested a review from HyukjinKwon June 14, 2024 21:32
@WweiL
Copy link
Contributor Author

WweiL commented Jun 17, 2024

@HyukjinKwon I think this is finally ready for another review : )

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I like this change

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

Seems like it broke the test:


======================================================================
ERROR [42.293s]: test_listener_events (pyspark.sql.tests.connect.streaming.test_parity_listener.StreamingListenerParityTests.test_listener_events)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py", line 82, in test_listener_events
    progress_event = pyspark.cloudpickle.loads(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'StateOperatorProgress' object does not support item assignment

Let me revert this out for now.

HyukjinKwon pushed a commit that referenced this pull request Jul 25, 2024
…led 3.5 <> 4.0 test

### What changes were proposed in this pull request?

Disable the listener test. This test would fail after #46921, which is now reverted. The reason was because with #46921, the server starts a server side python process which serializes the `StreamingQueryProgress` object with the new `StreamingQueryProgress` change. But in the client, the client tries to deserialize `StreamingQueryProgress` use the old `StreamingQueryProgress` without the change, which caused serde error.

However, as the change is going to spark 4.0, and is considered a generally good improvement and does more good than harm, we would like to disable this test to bring back #46921.

### Why are the changes needed?

Unblock bringing back #46921

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

No need

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47468 from WweiL/3.5-disable-server-listener-test-cross-version.

Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants