-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36533][SS] Trigger.AvailableNow for running streaming queries like Trigger.Once in multiple batches #33763
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #142557 has finished for PR 33763 at commit
|
6c017c7
to
c2223f0
Compare
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #142578 has finished for PR 33763 at commit
|
Test build #142582 has finished for PR 33763 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
6d9253e
to
c2223f0
Compare
Test build #142601 has finished for PR 33763 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #142614 has finished for PR 33763 at commit
|
@HeartSaVioR, @brkyvz, could you review this? Thanks! |
cc @viirya |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass.
...c/main/scala/org/apache/spark/sql/execution/streaming/FakeLatestOffsetMicroBatchStream.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FakeLatestOffsetSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
Outdated
Show resolved
Hide resolved
|
||
try { | ||
assert(q.awaitTermination(streamingTimeout.toMillis)) | ||
// only one batch has data in both sources, thus counted, see SPARK-24050 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Beyond the scope of the PR) it would be ideal if we can revisit and fix it later.
sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
Show resolved
Hide resolved
Btw, thanks for the great contribution! Nice feature indeed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Seems I failed to post the review comments last 4 days ago...)
Still reviewing this.
...c/main/scala/org/apache/spark/sql/execution/streaming/FakeLatestOffsetMicroBatchStream.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FakeLatestOffsetSource.scala
Outdated
Show resolved
Hide resolved
...c/main/scala/org/apache/spark/sql/execution/streaming/FakeLatestOffsetMicroBatchStream.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work. I will review in next few days.
I think we also need to update the document. But I don't see doc change included yet.
...src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsTriggerAvailableNow.java
Show resolved
Hide resolved
Kubernetes integration test starting |
private def getInitialOffset: streaming.Offset = { | ||
delegate match { | ||
case _: Source => null | ||
case m: MicroBatchStream => m.initialOffset | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use null (for v1 sources) and iniitalOffset (for v2 streams) for the startOffset in latestOffset(startOffset, readLimit), since the readLimit is always allAvailable.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #142865 has finished for PR 33763 at commit
|
retest this, please |
@viirya Would you mind revisiting this? The PR looks OK to me, but I'd like to see your approval as you're reviewing the change. Thanks in advance! |
Kubernetes integration test starting |
Test build #142866 has finished for PR 33763 at commit
|
Kubernetes integration test status failure |
val q = startQuery() | ||
|
||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we add more offsets createFile
here after latest offset was fetched into the source? So we can verify that we only process all available at the beginning of the query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a bit hard to control the order between starting the query and adding new files into the source. Do you know if there is an easy way to do so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we could do it easily, as we can't let the query be "suspended" after figuring out source offsets to process. The streaming query is running concurrently with the main thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, okay, I asked this because the test looks like to me only for verifying the query can run with correct results. Not able to check trigger.availablenow. okay for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(q.recentProgress.count(_.numInputRows != 0) == 3)
This verifies that the query runs three micro-batches instead of one.
Btw, the test SPARK-36533: Trigger.AvailableNow - checkpointing
covers everything in here now. The change on index
determines the number of micro-batch being executed, and we also checked the output DataFrame. I think we can simply remove this test.
sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
Show resolved
Hide resolved
Test build #142872 has finished for PR 33763 at commit
|
q.stop() | ||
} | ||
|
||
var index = 3 // We have processed the first 3 rows in the first query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: probably better to add the comment that it tracks the number of micro-batch execution starting from here. The code is intuitive but worth having elaboration given the importance of this variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending other reviewers' approval.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #142896 has finished for PR 33763 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Thanks! Merging to master! |
Thanks @bozhang2820 for the great contribution and thanks all for reviewing! I just merged this into master. |
@HeartSaVioR @viirya @bozhang2820 is there a way to call this from Pyspark? I can't figure it out |
Nice catch. I realized it was missing. I'm going to address this soon. |
### What changes were proposed in this pull request? This PR proposes to add Trigger.AvailableNow in PySpark on top of #33763. ### Why are the changes needed? We missed adding Trigger.AvailableNow in PySpark in #33763. ### Does this PR introduce _any_ user-facing change? Yes, Trigger.AvailableNow will be available in PySpark as well. ### How was this patch tested? Added simple validation in PySpark doc. Manually tested as below: ``` >>> spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(once=True).start() <pyspark.sql.streaming.StreamingQuery object at 0x118dff6d0> ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ | a| | b| | c| | d| | e| +-----+ >>> spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start() <pyspark.sql.streaming.StreamingQuery object at 0x118dffe50> >>> ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ | a| | b| | c| | d| | e| +-----+ >>> spark.readStream.format("text").option("maxfilespertrigger", "2").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start() <pyspark.sql.streaming.StreamingQuery object at 0x118dff820> >>> ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ | a| | b| +-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +-----+ |value| +-----+ | c| | d| +-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+ |value| +-----+ | e| +-----+ >>> ``` Closes #34592 from HeartSaVioR/SPARK-36533-FOLLOWUP-pyspark. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
What changes were proposed in this pull request?
This change creates a new type of Trigger: Trigger.AvailableNow for streaming queries. It is like Trigger.Once, which process all available data then stop the query, but with better scalability since data can be processed in multiple batches instead of one.
To achieve this, this change proposes a new interface
SupportsTriggerAvailableNow
, which is an extension ofSupportsAdmissionControl
. It has one method,prepareForTriggerAvailableNow
, which will be called at the beginning of streaming queries with Trigger.AvailableNow, to let the source record the offset for the current latest data at the time (a.k.a. the target offset for the query). The source should then behave as if there is no new data coming in after the beginning of the query, i.e., the source will not return an offset higher than the target offset whenlatestOffset
is called.This change also updates
FileStreamSource
to be an implementation ofSupportsTriggerAvailableNow
.For other sources that does not implement
SupportsTriggerAvailableNow
, this change also provides a new classFakeLatestOffsetSupportsTriggerAvailableNow
, which wraps the sources and makes them support Trigger.AvailableNow, by overriding theirlatestOffset
method to always return the latest offset at the beginning of the query.Why are the changes needed?
Currently streaming queries with Trigger.Once will always load all of the available data in a single batch. Because of this, the amount of data a query can process is limited, or Spark driver will run out of memory.
Does this PR introduce any user-facing change?
Users will be able to use Trigger.AvailableNow (to process all available data then stop the streaming query) with this change.
How was this patch tested?
Added unit tests.