-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14176][SQL]Add DataFrameWriter.trigger to set the stream batch period #11976
Changes from 3 commits
bf5d675
ab5dbd3
6f5c6ed
92d204c
a7355ed
f3526d0
7c4bc42
6c1b382
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ package org.apache.spark.sql | |
import java.util.Properties | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.concurrent.duration._ | ||
|
||
import org.apache.hadoop.fs.Path | ||
|
||
|
@@ -77,6 +78,26 @@ final class DataFrameWriter private[sql](df: DataFrame) { | |
this | ||
} | ||
|
||
/** | ||
* Set the trigger interval for the stream query. | ||
* | ||
* @since 2.0.0 | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole class is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should remove /cc @rxin There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 we don't need to do that in this pr though |
||
def trigger(interval: Duration): DataFrameWriter = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want a version that takes an interval string (see #12008). We should also do python in this PR or a follow up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no Python There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also accept a startTime together with duration (same to Window), see https://issues.apache.org/jira/browse/SPARK-14230 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sounds a good idea |
||
this.extraOptions += ("triggerInterval" -> interval.toMillis.toString) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we probably should create another var, rather than converting this back/forth from a string. Also, how would we extend this to support other types of triggers? The design doc suggests using using classes I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We can add a new method |
||
this | ||
} | ||
|
||
/** | ||
* Set the trigger interval for the stream query. | ||
* | ||
* @since 2.0.0 | ||
*/ | ||
def trigger(interval: Long, unit: TimeUnit): DataFrameWriter = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How about |
||
this.extraOptions += ("triggerInterval" -> unit.toMillis(interval).toString) | ||
this | ||
} | ||
|
||
/** | ||
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc. | ||
* | ||
|
@@ -257,11 +278,14 @@ final class DataFrameWriter private[sql](df: DataFrame) { | |
val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { | ||
new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString | ||
}) | ||
val triggerIntervalMs = extraOptions.getOrElse("triggerInterval", "0").toLong | ||
require(triggerIntervalMs >= 0, "the interval of trigger should not be negative") | ||
df.sqlContext.sessionState.continuousQueryManager.startQuery( | ||
queryName, | ||
checkpointLocation, | ||
df, | ||
dataSource.createSink()) | ||
dataSource.createSink(), | ||
triggerIntervalMs) | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,16 +46,14 @@ class StreamExecution( | |
override val name: String, | ||
val checkpointRoot: String, | ||
private[sql] val logicalPlan: LogicalPlan, | ||
val sink: Sink) extends ContinuousQuery with Logging { | ||
val sink: Sink, | ||
triggerIntervalMs: Long) extends ContinuousQuery with Logging { | ||
|
||
/** An monitor used to wait/notify when batches complete. */ | ||
private val awaitBatchLock = new Object | ||
private val startLatch = new CountDownLatch(1) | ||
private val terminationLatch = new CountDownLatch(1) | ||
|
||
/** Minimum amount of time in between the start of each batch. */ | ||
private val minBatchTime = 10 | ||
|
||
/** | ||
* Tracks how much data we have processed and committed to the sink or state store from each | ||
* input source. | ||
|
@@ -212,9 +210,18 @@ class StreamExecution( | |
populateStartOffsets() | ||
logDebug(s"Stream running from $committedOffsets to $availableOffsets") | ||
while (isActive) { | ||
val batchStartTimeMs = System.currentTimeMillis() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can pull this logic out into its own class so that we can override time and unit test it properly? Some questions:
/cc @rxin There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My original design is as follows: trait Trigger {
def triggerExecution(): TriggerExecution
}
trait TriggerExecution {
// batchRunner will run a batch and run if we should terminate the execution
def execute(batchRunner: () => Boolean): Unit
}
private[sql] class ProcessingTime(intervalMs: Long) extends Trigger {
override def triggerExecution(): TriggerExecution = new ProcessingTriggerExecution(intervalMs)
}
private[sql] class ProcessingTriggerExecution(intervalMs: Long) extends TriggerExecution {
override def execute(batchRunner: () => Boolean): Unit = {
while (true) {
// Codes that waits for next multiple of intervalMs.
if (batchRunner()) {
return
}
}
}
} However, it requires exposing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Never mind, we can hide them. |
||
if (dataAvailable) runBatch() | ||
commitAndConstructNextBatch() | ||
Thread.sleep(minBatchTime) // TODO: Could be tighter | ||
if (triggerIntervalMs > 0) { | ||
val batchElapsedTime = System.currentTimeMillis() - batchStartTimeMs | ||
if (batchElapsedTime > triggerIntervalMs) { | ||
logWarning("Current batch is falling behind. The trigger interval is " + | ||
s"${triggerIntervalMs} milliseconds, but spent ${batchElapsedTime} milliseconds") | ||
} else { | ||
Thread.sleep(triggerIntervalMs - batchElapsedTime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this handle spurious wake ups? |
||
} | ||
} | ||
} | ||
} catch { | ||
case _: InterruptedException if state == TERMINATED => // interrupted by stop() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -276,7 +276,7 @@ trait StreamTest extends QueryTest with Timeouts { | |
currentStream = | ||
sqlContext | ||
.streams | ||
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink) | ||
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink, 10L) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not |
||
.asInstanceOf[StreamExecution] | ||
currentStream.microBatchThread.setUncaughtExceptionHandler( | ||
new UncaughtExceptionHandler { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This scala doc should have an example right here.