Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36533][SS] Trigger.AvailableNow for running streaming queries like Trigger.Once in multiple batches #33763

Closed
wants to merge 9 commits into from

Conversation

bozhang2820
Copy link
Contributor

@bozhang2820 bozhang2820 commented Aug 17, 2021

What changes were proposed in this pull request?

This change creates a new type of Trigger: Trigger.AvailableNow for streaming queries. It is like Trigger.Once, which process all available data then stop the query, but with better scalability since data can be processed in multiple batches instead of one.

To achieve this, this change proposes a new interface SupportsTriggerAvailableNow, which is an extension of SupportsAdmissionControl. It has one method, prepareForTriggerAvailableNow, which will be called at the beginning of streaming queries with Trigger.AvailableNow, to let the source record the offset for the current latest data at the time (a.k.a. the target offset for the query). The source should then behave as if there is no new data coming in after the beginning of the query, i.e., the source will not return an offset higher than the target offset when latestOffset is called.

This change also updates FileStreamSource to be an implementation of SupportsTriggerAvailableNow.

For other sources that does not implement SupportsTriggerAvailableNow, this change also provides a new class FakeLatestOffsetSupportsTriggerAvailableNow, which wraps the sources and makes them support Trigger.AvailableNow, by overriding their latestOffset method to always return the latest offset at the beginning of the query.

Why are the changes needed?

Currently streaming queries with Trigger.Once will always load all of the available data in a single batch. Because of this, the amount of data a query can process is limited, or Spark driver will run out of memory.

Does this PR introduce any user-facing change?

Users will be able to use Trigger.AvailableNow (to process all available data then stop the streaming query) with this change.

How was this patch tested?

Added unit tests.

@SparkQA
Copy link

SparkQA commented Aug 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47059/

@SparkQA
Copy link

SparkQA commented Aug 17, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47059/

@SparkQA
Copy link

SparkQA commented Aug 17, 2021

Test build #142557 has finished for PR 33763 at commit 5d0aee8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeLatestOffsetMicroBatchStream(source: MicroBatchStream)
  • class FakeLatestOffsetSource(source: Source)
  • class FakeLatestOffsetSupportsTriggerAvailableNow(source: SparkDataStream)
  • case class OneBatchExecutor() extends TriggerExecutor
  • case class MultiBatchExecutor() extends TriggerExecutor

@HyukjinKwon HyukjinKwon changed the title [SPARK-36533] Trigger.AvailableNow for running streaming queries like Trigger.Once in multiple batches [SPARK-36533][SS] Trigger.AvailableNow for running streaming queries like Trigger.Once in multiple batches Aug 18, 2021
@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47079/

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47079/

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47084/

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47084/

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Test build #142578 has finished for PR 33763 at commit 6c017c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Test build #142582 has finished for PR 33763 at commit c2223f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeLatestOffsetMicroBatchStream(source: MicroBatchStream)
  • class FakeLatestOffsetSource(source: Source)
  • class FakeLatestOffsetSupportsTriggerAvailableNow(source: SparkDataStream)
  • case class OneBatchExecutor() extends TriggerExecutor
  • case class MultiBatchExecutor() extends TriggerExecutor

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47101/

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47101/

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Test build #142601 has finished for PR 33763 at commit 6d9253e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47114/

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47114/

@SparkQA
Copy link

SparkQA commented Aug 18, 2021

Test build #142614 has finished for PR 33763 at commit c2223f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeLatestOffsetMicroBatchStream(source: MicroBatchStream)
  • class FakeLatestOffsetSource(source: Source)
  • class FakeLatestOffsetSupportsTriggerAvailableNow(source: SparkDataStream)
  • case class OneBatchExecutor() extends TriggerExecutor
  • case class MultiBatchExecutor() extends TriggerExecutor

@bozhang2820
Copy link
Contributor Author

@HeartSaVioR, @brkyvz, could you review this? Thanks!

@xuanyuanking
Copy link
Member

cc @viirya

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass.


try {
assert(q.awaitTermination(streamingTimeout.toMillis))
// only one batch has data in both sources, thus counted, see SPARK-24050
Copy link
Contributor

@HeartSaVioR HeartSaVioR Aug 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Beyond the scope of the PR) it would be ideal if we can revisit and fix it later.

@HeartSaVioR
Copy link
Contributor

Btw, thanks for the great contribution! Nice feature indeed.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Seems I failed to post the review comments last 4 days ago...)
Still reviewing this.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work. I will review in next few days.

I think we also need to update the document. But I don't see doc change included yet.

@SparkQA
Copy link

SparkQA commented Aug 24, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47220/

Comment on lines +41 to +46
private def getInitialOffset: streaming.Offset = {
delegate match {
case _: Source => null
case m: MicroBatchStream => m.initialOffset
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use null (for v1 sources) and iniitalOffset (for v2 streams) for the startOffset in latestOffset(startOffset, readLimit), since the readLimit is always allAvailable.

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47369/

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47369/

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Test build #142865 has finished for PR 33763 at commit 0824910.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

retest this, please

@HeartSaVioR
Copy link
Contributor

@viirya Would you mind revisiting this? The PR looks OK to me, but I'd like to see your approval as you're reviewing the change. Thanks in advance!

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47375/

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Test build #142866 has finished for PR 33763 at commit c96d60d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47375/

@bozhang2820
Copy link
Contributor Author

@viirya Would you mind revisiting this? The PR looks OK to me, but I'd like to see your approval as you're reviewing the change. Thanks in advance!

Also @brkyvz could you review again for the changes you requested?

Comment on lines 1338 to 1340
val q = startQuery()

try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we add more offsets createFile here after latest offset was fetched into the source? So we can verify that we only process all available at the beginning of the query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a bit hard to control the order between starting the query and adding new files into the source. Do you know if there is an easy way to do so?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we could do it easily, as we can't let the query be "suspended" after figuring out source offsets to process. The streaming query is running concurrently with the main thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, okay, I asked this because the test looks like to me only for verifying the query can run with correct results. Not able to check trigger.availablenow. okay for me.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Aug 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(q.recentProgress.count(_.numInputRows != 0) == 3)

This verifies that the query runs three micro-batches instead of one.

Btw, the test SPARK-36533: Trigger.AvailableNow - checkpointing covers everything in here now. The change on index determines the number of micro-batch being executed, and we also checked the output DataFrame. I think we can simply remove this test.

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Test build #142872 has finished for PR 33763 at commit c96d60d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

q.stop()
}

var index = 3 // We have processed the first 3 rows in the first query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably better to add the comment that it tracks the number of micro-batch execution starting from here. The code is intuitive but worth having elaboration given the importance of this variable.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending other reviewers' approval.

@SparkQA
Copy link

SparkQA commented Sep 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47399/

@SparkQA
Copy link

SparkQA commented Sep 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47399/

@SparkQA
Copy link

SparkQA commented Sep 1, 2021

Test build #142896 has finished for PR 33763 at commit b6c26e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master!

@HeartSaVioR
Copy link
Contributor

Thanks @bozhang2820 for the great contribution and thanks all for reviewing! I just merged this into master.

@srowen
Copy link
Member

srowen commented Nov 13, 2021

@HeartSaVioR @viirya @bozhang2820 is there a way to call this from Pyspark? I can't figure it out

@HeartSaVioR
Copy link
Contributor

Nice catch. I realized it was missing. I'm going to address this soon.

HeartSaVioR added a commit that referenced this pull request Nov 14, 2021
### What changes were proposed in this pull request?

This PR proposes to add Trigger.AvailableNow in PySpark on top of #33763.

### Why are the changes needed?

We missed adding Trigger.AvailableNow in PySpark in #33763.

### Does this PR introduce _any_ user-facing change?

Yes, Trigger.AvailableNow will be available in PySpark as well.

### How was this patch tested?

Added simple validation in PySpark doc. Manually tested as below:

```
>>> spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(once=True).start()
<pyspark.sql.streaming.StreamingQuery object at 0x118dff6d0>
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
|    a|
|    b|
|    c|
|    d|
|    e|
+-----+

>>> spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start()
<pyspark.sql.streaming.StreamingQuery object at 0x118dffe50>
>>> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
|    a|
|    b|
|    c|
|    d|
|    e|
+-----+

>>> spark.readStream.format("text").option("maxfilespertrigger", "2").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start()
<pyspark.sql.streaming.StreamingQuery object at 0x118dff820>
>>> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
|    a|
|    b|
+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|    c|
|    d|
+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
|    e|
+-----+

>>>
```

Closes #34592 from HeartSaVioR/SPARK-36533-FOLLOWUP-pyspark.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants