Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14176][SQL]Add DataFrameWriter.trigger to set the stream batch period #11976

Closed
wants to merge 8 commits into from
Closed

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Mar 26, 2016

What changes were proposed in this pull request?

Add a processing time trigger to control the batch processing speed

How was this patch tested?

Unit tests

@SparkQA
Copy link

SparkQA commented Mar 26, 2016

Test build #54253 has finished for PR 11976 at commit bf5d675.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Mar 28, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54329 has finished for PR 11976 at commit bf5d675.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @since 2.0.0
*/
def trigger(period: Duration): DataFrameWriter = {
this.extraOptions += ("period" -> period.toMillis.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this name is exposed for the user to accidentally modify (through option() ) we probably should make this more specific. Maybe "triggerInterval"

Also, i think "interval" is better than "period"

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #54376 has finished for PR 11976 at commit 6f5c6ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -276,7 +276,7 @@ trait StreamTest extends QueryTest with Timeouts {
currentStream =
sqlContext
.streams
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink)
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink, 10L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not 0?

@zsxwing
Copy link
Member Author

zsxwing commented Mar 30, 2016

I updated the PR to add Trigger and ProcessingTime and it supports to add other triggers in future.

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54498 has finished for PR 11976 at commit 92d204c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait Trigger
    • case class ProcessingTime(intervalMs: Long) extends Trigger with Logging

*
* @since 2.0.0
*/
def trigger(interval: Long, unit: TimeUnit): DataFrameWriter = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers.

How about def trigger(trigger: Trigger) and expose Trigger and all its subclasses?

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54568 has finished for PR 11976 at commit f3526d0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54566 has finished for PR 11976 at commit a7355ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

metadataRoot,
stream,
sink,
ProcessingTime(0L))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: maybe just make this the default arg since this function is internal.

@tdas
Copy link
Contributor

tdas commented Apr 1, 2016

There does not seem to be any end-to-end test that makes sure that trigger is working, and keep the right timing. Also, things like what is the behavior if the previous batch takes longer? None of that is tested.

* {{{
* def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* def.writer.trigger(ProcessingTime.create("10 seconds"))
* }}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice documentation! Maybe put the typesafe one second and include the imports that are required.

@@ -78,6 +76,11 @@ class StreamExecution(
/** A list of unique sources in the query plan. */
private val uniqueSources = sources.distinct

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t)
case t => throw new IllegalArgumentException(s"${t.getClass} is not supported")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait is sealed. Do we need this?

@marmbrus
Copy link
Contributor

marmbrus commented Apr 1, 2016

This looks great! Minor comments only.

@@ -78,6 +78,17 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}

/**
* Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
* the query as fast as possible.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scala doc should have an example right here.

write.trigger(ProcessingTime("10 seconds"))
write.trigger("10 seconds")     // less verbose

@zsxwing
Copy link
Member Author

zsxwing commented Apr 1, 2016

Addressed all comments

@SparkQA
Copy link

SparkQA commented Apr 1, 2016

Test build #54722 has finished for PR 11976 at commit 7c4bc42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())

@SparkQA
Copy link

SparkQA commented Apr 2, 2016

Test build #54733 has finished for PR 11976 at commit 6c1b382.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

marmbrus commented Apr 4, 2016

LGTM, merging to master.

@asfgit asfgit closed this in 855ed44 Apr 4, 2016
@zsxwing zsxwing deleted the trigger branch April 4, 2016 17:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants