Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14176][SQL]Add DataFrameWriter.trigger to set the stream batch period #11976

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,20 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
name: String,
checkpointLocation: String,
df: DataFrame,
sink: Sink): ContinuousQuery = {
sink: Sink,
triggerIntervalMs: Long): ContinuousQuery = {
activeQueriesLock.synchronized {
if (activeQueries.contains(name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
val query = new StreamExecution(sqlContext, name, checkpointLocation, df.logicalPlan, sink)
val query = new StreamExecution(
sqlContext,
name,
checkpointLocation,
df.logicalPlan,
sink,
triggerIntervalMs)
query.start()
activeQueries.put(name, query)
query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
import java.util.Properties

import scala.collection.JavaConverters._
import scala.concurrent.duration._

import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -77,6 +78,26 @@ final class DataFrameWriter private[sql](df: DataFrame) {
this
}

/**
* Set the trigger interval for the stream query.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scala doc should have an example right here.

write.trigger(ProcessingTime("10 seconds"))
write.trigger("10 seconds")     // less verbose

* @since 2.0.0
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@experimental

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole class is experimental already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove @experimental for all the dataframe stuff and mark just the streaming methods.

/cc @rxin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

we don't need to do that in this pr though

def trigger(interval: Duration): DataFrameWriter = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want a version that takes an interval string (see #12008). We should also do python in this PR or a follow up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no Python startStream. I think we don't need to add a Python API that won't be used now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also accept a startTime together with duration (same to Window), see https://issues.apache.org/jira/browse/SPARK-14230

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also accept a startTime together with duration (same to Window), see https://issues.apache.org/jira/browse/SPARK-14230

Sounds a good idea

this.extraOptions += ("triggerInterval" -> interval.toMillis.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably should create another var, rather than converting this back/forth from a string. Also, how would we extend this to support other types of triggers? The design doc suggests using using classes I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would we extend this to support other types of triggers? The design doc suggests using using classes I think.

We can add a new method triggerMode to support other types of triggers like how we supports different SaveModes. This would be better than adding new classes.

this
}

/**
* Set the trigger interval for the stream query.
*
* @since 2.0.0
*/
def trigger(interval: Long, unit: TimeUnit): DataFrameWriter = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all trigger modes are going to be time based though. In the doc we also propose data sized based triggers.

How about def trigger(trigger: Trigger) and expose Trigger and all its subclasses?

this.extraOptions += ("triggerInterval" -> unit.toMillis(interval).toString)
this
}

/**
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
*
Expand Down Expand Up @@ -257,11 +278,14 @@ final class DataFrameWriter private[sql](df: DataFrame) {
val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
})
val triggerIntervalMs = extraOptions.getOrElse("triggerInterval", "0").toLong
require(triggerIntervalMs >= 0, "the interval of trigger should not be negative")
df.sqlContext.sessionState.continuousQueryManager.startQuery(
queryName,
checkpointLocation,
df,
dataSource.createSink())
dataSource.createSink(),
triggerIntervalMs)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@ class StreamExecution(
override val name: String,
val checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
val sink: Sink) extends ContinuousQuery with Logging {
val sink: Sink,
triggerIntervalMs: Long) extends ContinuousQuery with Logging {

/** An monitor used to wait/notify when batches complete. */
private val awaitBatchLock = new Object
private val startLatch = new CountDownLatch(1)
private val terminationLatch = new CountDownLatch(1)

/** Minimum amount of time in between the start of each batch. */
private val minBatchTime = 10

/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
Expand Down Expand Up @@ -212,9 +210,18 @@ class StreamExecution(
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
while (isActive) {
val batchStartTimeMs = System.currentTimeMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can pull this logic out into its own class so that we can override time and unit test it properly? Some questions:

  • I think this might be implementing the "alternative" and not the proposal from the design doc. If I understand correctly the behavior should be " If the cluster is overloaded, we will skip some firings and wait until the next multiple of period.". Where as I think this is executing ASAP when overloaded.
  • What about failures? What if its an hour trigger and the cluster fails and comes back up after waiting 10 minutes? (we might defer this case)

/cc @rxin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original design is as follows:

trait Trigger {

  def triggerExecution(): TriggerExecution
}

trait TriggerExecution {

  // batchRunner will run a batch and run if we should terminate the execution
  def execute(batchRunner: () => Boolean): Unit
}

private[sql] class ProcessingTime(intervalMs: Long) extends Trigger {
  override def triggerExecution(): TriggerExecution = new ProcessingTriggerExecution(intervalMs)
}

private[sql] class ProcessingTriggerExecution(intervalMs: Long) extends TriggerExecution {
  override def execute(batchRunner: () => Boolean): Unit = {
    while (true) {
      // Codes that waits for next multiple of intervalMs.
      if (batchRunner()) {
        return
      }
    }
  }
}

However, it requires exposing Trigger and TriggerExecution to the user and seems complicated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, it requires exposing Trigger and TriggerExecution to the user and seems complicated.

Never mind, we can hide them.

if (dataAvailable) runBatch()
commitAndConstructNextBatch()
Thread.sleep(minBatchTime) // TODO: Could be tighter
if (triggerIntervalMs > 0) {
val batchElapsedTime = System.currentTimeMillis() - batchStartTimeMs
if (batchElapsedTime > triggerIntervalMs) {
logWarning("Current batch is falling behind. The trigger interval is " +
s"${triggerIntervalMs} milliseconds, but spent ${batchElapsedTime} milliseconds")
} else {
Thread.sleep(triggerIntervalMs - batchElapsedTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this handle spurious wake ups?

}
}
}
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ trait StreamTest extends QueryTest with Timeouts {
currentStream =
sqlContext
.streams
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink)
.startQuery(StreamExecution.nextName, metadataRoot, stream, sink, 10L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not 0?

.asInstanceOf[StreamExecution]
currentStream.microBatchThread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
StreamExecution.nextName,
metadataRoot,
df,
new MemorySink(df.schema))
new MemorySink(df.schema),
10L)
.asInstanceOf[StreamExecution]
} catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql.streaming.test

import java.util.concurrent.TimeUnit

import scala.concurrent.duration._

import org.scalatest.BeforeAndAfter

import org.apache.spark.sql._
Expand Down Expand Up @@ -274,4 +278,28 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(activeStreamNames.contains("name"))
sqlContext.streams.active.foreach(_.stop())
}

test("trigger") {
val df = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
.stream("/test")

df.write
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.trigger(10.seconds)
.startStream()
.stop()

assert(LastOptions.parameters("triggerInterval") == "10000")

df.write
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.trigger(100, TimeUnit.SECONDS)
.startStream()
.stop()

assert(LastOptions.parameters("triggerInterval") == "100000")
}
}