-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42960] [CONNECT] [SS] Add await_termination() and exception() API for Streaming Query in Python #40785
Conversation
…SIS flag as it's already in test options
@HyukjinKwon @rangadi @pengzhon-db Hey guys, PTAL when you get a chance, thanks! |
...connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
Outdated
Show resolved
Hide resolved
connector/connect/common/src/main/protobuf/spark/connect/commands.proto
Outdated
Show resolved
Hide resolved
...connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
Outdated
Show resolved
Hide resolved
...connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
Outdated
Show resolved
Hide resolved
@@ -298,6 +306,16 @@ message StreamingQueryCommandResult { | |||
// Logical and physical plans as string | |||
string result = 1; | |||
} | |||
|
|||
message ExceptionResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need at least the stacktrace. Leave a comment about what our thinking is there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree, currently the support for error is limited.
I tracked how they handle errors, and found that it's through here:
spark/python/pyspark/sql/connect/client.py
Line 1115 in 0c2776a
raise convert_exception(info, status.message) from None |
And then in the convert_exception
method:
spark/python/pyspark/errors/exceptions/connect.py
Lines 57 to 58 in 0c2776a
elif "org.apache.spark.sql.streaming.StreamingQueryException" in classes: | |
return StreamingQueryException(message) |
Only the message is directly passed.
I guess we could file a ticket to wait until batch side's change, and then we could align with them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should try to be consistent with what current exception()
returns, which is return CapturedStreamingQueryException(msg, stackTrace, je.getCause())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right but satckTrace and cause are not included in connect's error framework so far. There is an ongoing PR about this: #40575.
@@ -196,7 +196,7 @@ def awaitTermination(self, timeout: Optional[int] = None) -> Optional[bool]: | |||
>>> sq.stop() | |||
""" | |||
if timeout is not None: | |||
if not isinstance(timeout, (int, float)) or timeout < 0: | |||
if not isinstance(timeout, (int, float)) or timeout <= 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I also change this to be leq to align with StreamExecution's awaitTermination
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L549
...connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
Outdated
Show resolved
Hide resolved
@HyukjinKwon Can you take a look? Thanks! |
@WweiL mind rebasing this one please? |
Fetched and Merged with master. I noticed stack trace was added. I'll create a SPARK ticket to include it |
respBuilder.getAwaitTerminationBuilder | ||
.setTerminated(terminated) | ||
} else { | ||
query.awaitTermination() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm .. just to be extra clear, it will be disconnected when it reaches gRPC timeout .. am i correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right this is intended at this stage. @rangadi will push update regarding this I believe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client keep sending heart beat message to keep the RPC connection alive.
That said, we would still need to improve handling of this. E.g. it should exit if client side disconnects.
@hvanhovell or @grundprinzip actually mind taking a look please when you find some time? |
connector/connect/common/src/main/protobuf/spark/connect/commands.proto
Outdated
Show resolved
Hide resolved
I see .. sorry for a bit of back and forth:
Let's add them back :-) .. |
@HyukjinKwon Can you merge this : ) Thanks! |
Merged to master. |
…logics out ### What changes were proposed in this pull request? This PR factor Connect/non-Connect specific logics out into dedicated test classes. This PR is a followup of #40785 ### Why are the changes needed? In order to avoid test failure such as #44698 (comment) by missing dependencies ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? CI in this PR should verify it. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44715 from HyukjinKwon/SPARK-42960-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
Add the
await_termination()
andexception()
to streaming query class.For
exception
, only pass themessage
the same way inSparkConnectService
, and construct the error the same way as theconvert_exception
method in_handle_rpc_error
inclient.py
.For
await_termination
, send the command multiple times instead of waiting to prevent RPC timeout.<-- I'm definitely open to any discussion on its implementation!
Why are the changes needed?
Add missing APIs.
Does this PR introduce any user-facing change?
Yes but part of ongoing developing of Streaming Spark Connect.
How was this patch tested?
Existing unit tests. Note that the unit tests for them are still skipped because of 1. queryManager is not implemented. 2. Allow access to stopped query is not implemented.
I was able to test them manually by
test_stream_await_termination()
, comment out thetest_stream_exception()
, comment out unregistering terminated query:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L411