-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48567][SS] StreamingQuery.lastProgress should return the actual StreamingQueryProgress #46921
Conversation
Pending merging of #46920 |
# Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which casts id and runId | ||
# to string. To prevent breaking change, also cast them to string when accessed with | ||
# __getitem__. | ||
if key == "id" or key == "runId": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is really needed. But if we delete this if, now "query.lastProgress["id"]" would return type uuid, before it was string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there would be lots of breaking changes (e.g. now the sources
method also return the actual SourceProgress
def sources(self) -> List["SourceProgress"]: |
let me also make these subclass of dict...
else: | ||
return getattr(self, key) | ||
|
||
def __setitem__(self, key, value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the fear of users ever set the value of the returned dict before this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm .. but the end users can't access to this value if I am reading this correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fear is backward compatibility. This is possible in current master:
>>> q = spark.readStream.format("rate").load().writeStream.format("noop").start()
24/06/10 16:10:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-709975db-23ed-4838-b9ae-93a7ffe59183. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/06/10 16:10:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
>>> p = q.lastProgress
>>> p
{'id': '44510846-29f8-4218-95cf-616efecadb05', 'runId': 'afcac0a7-424b-428b-948e-2c0fc21a43a2', 'name': None, 'timestamp': '2024-06-10T23:10:38.257Z', 'batchId': 2, 'batchDuration': 215, 'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 4.651162790697675, 'durationMs': {'addBatch': 30, 'commitOffsets': 82, 'getBatch': 0, 'latestOffset': 0, 'queryPlanning': 4, 'triggerExecution': 215, 'walCommit': 98}, 'stateOperators': [], 'sources': [{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 4.651162790697675}], 'sink': {'description': 'org.apache.spark.sql.execution.datasources.noop.NoopTable$@67a2b2a4', 'numOutputRows': 1}}
>>> p["id"]
'44510846-29f8-4218-95cf-616efecadb05'
>>> p["id"] = "aaaaaaa"
>>> p["id"]
'aaaaaaa'
This is not possible in Scala of course, but not sure if we should keep this python specific behavior....
cc @HyukjinKwon @LuciferYang @HeartSaVioR Could you guys take a look? Thank you so much!! |
else: | ||
return getattr(self, key) | ||
|
||
def __setitem__(self, key, value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm .. but the end users can't access to this value if I am reading this correctly?
else: | ||
return getattr(self, key) | ||
|
||
def __setitem__(self, key: str, value: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the fixes of __getitem__
and __setitem__
but we do self.update(dict(id=id, runId=runId, ...))
at __init__
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sure let me do that
@HyukjinKwon I think this is finally ready for another review : ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I like this change
Merged to master. |
Seems like it broke the test:
Let me revert this out for now. |
…led 3.5 <> 4.0 test ### What changes were proposed in this pull request? Disable the listener test. This test would fail after #46921, which is now reverted. The reason was because with #46921, the server starts a server side python process which serializes the `StreamingQueryProgress` object with the new `StreamingQueryProgress` change. But in the client, the client tries to deserialize `StreamingQueryProgress` use the old `StreamingQueryProgress` without the change, which caused serde error. However, as the change is going to spark 4.0, and is considered a generally good improvement and does more good than harm, we would like to disable this test to bring back #46921. ### Why are the changes needed? Unblock bringing back #46921 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need ### Was this patch authored or co-authored using generative AI tooling? No Closes #47468 from WweiL/3.5-disable-server-listener-test-cross-version. Authored-by: Wei Liu <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR is created after discussion in this closed one: #46886
I was trying to fix a bug (in connect, query.lastProgress doesn't have
numInputRows
,inputRowsPerSecond
, andprocessedRowsPerSecond
), and we reached the conclusion that what purposed in this PR should be the ultimate fix.In python, for both classic spark and spark connect, the return type of
lastProgress
isDict
(andrecentProgress
isList[Dict]
), but in scala it's the actualStreamingQueryProgress
object:spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
Lines 94 to 101 in 1a5d22a
This API discrepancy brings some confusion, like in Scala, users can do
query.lastProgress.batchId
, while in Python they have to doquery.lastProgress["batchId"]
.This PR makes
StreamingQuery.lastProgress
to return the actualStreamingQueryProgress
(andStreamingQuery.recentProgress
to returnList[StreamingQueryProgress]
).To prevent breaking change, we extend
StreamingQueryProgress
to be a subclass ofdict
, so existing code accessing using dictionary method (e.g.query.lastProgress["id"]
) is still functional.Why are the changes needed?
API parity
Does this PR introduce any user-facing change?
Yes, now
StreamingQuery.lastProgress
returns the actualStreamingQueryProgress
(andStreamingQuery.recentProgress
returnsList[StreamingQueryProgress]
).How was this patch tested?
Added unit test
Was this patch authored or co-authored using generative AI tooling?
No