-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14176][SQL]Add DataFrameWriter.trigger to set the stream batch period #11976
Conversation
Test build #54253 has finished for PR 11976 at commit
|
retest this please |
Test build #54329 has finished for PR 11976 at commit
|
* @since 2.0.0 | ||
*/ | ||
def trigger(period: Duration): DataFrameWriter = { | ||
this.extraOptions += ("period" -> period.toMillis.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this name is exposed for the user to accidentally modify (through option() ) we probably should make this more specific. Maybe "triggerInterval"
Also, i think "interval" is better than "period"
Test build #54376 has finished for PR 11976 at commit
|
@@ -276,7 +276,7 @@ trait StreamTest extends QueryTest with Timeouts { | |||
currentStream = | |||
sqlContext | |||
.streams | |||
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink) | |||
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink, 10L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not 0
?
I updated the PR to add |
Test build #54498 has finished for PR 11976 at commit
|
* | ||
* @since 2.0.0 | ||
*/ | ||
def trigger(interval: Long, unit: TimeUnit): DataFrameWriter = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers.
How about def trigger(trigger: Trigger)
and expose Trigger
and all its subclasses?
Test build #54568 has finished for PR 11976 at commit
|
Test build #54566 has finished for PR 11976 at commit
|
metadataRoot, | ||
stream, | ||
sink, | ||
ProcessingTime(0L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: maybe just make this the default arg since this function is internal.
There does not seem to be any end-to-end test that makes sure that trigger is working, and keep the right timing. Also, things like what is the behavior if the previous batch takes longer? None of that is tested. |
* {{{ | ||
* def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) | ||
* def.writer.trigger(ProcessingTime.create("10 seconds")) | ||
* }}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice documentation! Maybe put the typesafe one second and include the import
s that are required.
@@ -78,6 +76,11 @@ class StreamExecution( | |||
/** A list of unique sources in the query plan. */ | |||
private val uniqueSources = sources.distinct | |||
|
|||
private val triggerExecutor = trigger match { | |||
case t: ProcessingTime => ProcessingTimeExecutor(t) | |||
case t => throw new IllegalArgumentException(s"${t.getClass} is not supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The trait is sealed
. Do we need this?
This looks great! Minor comments only. |
@@ -78,6 +78,17 @@ final class DataFrameWriter private[sql](df: DataFrame) { | |||
} | |||
|
|||
/** | |||
* Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run | |||
* the query as fast as possible. | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This scala doc should have an example right here.
write.trigger(ProcessingTime("10 seconds"))
write.trigger("10 seconds") // less verbose
Addressed all comments |
Test build #54722 has finished for PR 11976 at commit
|
Test build #54733 has finished for PR 11976 at commit
|
LGTM, merging to master. |
What changes were proposed in this pull request?
Add a processing time trigger to control the batch processing speed
How was this patch tested?
Unit tests