From bf5d675e42b1cca216ecdaf55fac2042ee4a1e86 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 25 Mar 2016 22:34:13 -0700 Subject: [PATCH 1/7] Add DataFrameWriter.trigger to set the stream batch period --- .../spark/sql/ContinuousQueryManager.scala | 11 ++++++-- .../apache/spark/sql/DataFrameWriter.scala | 26 ++++++++++++++++- .../execution/streaming/StreamExecution.scala | 17 +++++++---- .../org/apache/spark/sql/StreamTest.scala | 2 +- .../ContinuousQueryManagerSuite.scala | 3 +- .../DataFrameReaderWriterSuite.scala | 28 +++++++++++++++++++ 6 files changed, 77 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 465feeb60412f..1bcf4aa46c867 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -171,13 +171,20 @@ class ContinuousQueryManager(sqlContext: SQLContext) { name: String, checkpointLocation: String, df: DataFrame, - sink: Sink): ContinuousQuery = { + sink: Sink, + triggerPeriodMs: Long): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { throw new IllegalArgumentException( s"Cannot start query with name $name as a query with that name is already active") } - val query = new StreamExecution(sqlContext, name, checkpointLocation, df.logicalPlan, sink) + val query = new StreamExecution( + sqlContext, + name, + checkpointLocation, + df.logicalPlan, + sink, + triggerPeriodMs) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c07bd0e7b7175..e5dc9d258a518 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.util.Properties import scala.collection.JavaConverters._ +import scala.concurrent.duration._ import org.apache.hadoop.fs.Path @@ -77,6 +78,26 @@ final class DataFrameWriter private[sql](df: DataFrame) { this } + /** + * Set the trigger period for the stream query. + * + * @since 2.0.0 + */ + def trigger(period: Duration): DataFrameWriter = { + this.extraOptions += ("period" -> period.toMillis.toString) + this + } + + /** + * Set the trigger period for the stream query. + * + * @since 2.0.0 + */ + def trigger(period: Long, unit: TimeUnit): DataFrameWriter = { + this.extraOptions += ("period" -> unit.toMillis(period).toString) + this + } + /** * Specifies the underlying output data source. Built-in options include "parquet", "json", etc. * @@ -257,11 +278,14 @@ final class DataFrameWriter private[sql](df: DataFrame) { val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString }) + val triggerPeriodMs = extraOptions.getOrElse("period", "0").toLong + require(triggerPeriodMs >= 0, "the period of trigger should not be negative") df.sqlContext.sessionState.continuousQueryManager.startQuery( queryName, checkpointLocation, df, - dataSource.createSink()) + dataSource.createSink(), + triggerPeriodMs) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 60e00d203ccde..98b33f6653fca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -46,16 +46,14 @@ class StreamExecution( override val name: String, val checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, - val sink: Sink) extends ContinuousQuery with Logging { + val sink: Sink, + triggerPeriodMs: Long) extends ContinuousQuery with Logging { /** An monitor used to wait/notify when batches complete. */ private val awaitBatchLock = new Object private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) - /** Minimum amount of time in between the start of each batch. */ - private val minBatchTime = 10 - /** * Tracks how much data we have processed and committed to the sink or state store from each * input source. @@ -212,9 +210,18 @@ class StreamExecution( populateStartOffsets() logDebug(s"Stream running from $committedOffsets to $availableOffsets") while (isActive) { + val batchStartTimeMs = System.currentTimeMillis() if (dataAvailable) runBatch() commitAndConstructNextBatch() - Thread.sleep(minBatchTime) // TODO: Could be tighter + if (triggerPeriodMs > 0) { + val batchElapsedTime = System.currentTimeMillis() - batchStartTimeMs + if (batchElapsedTime > triggerPeriodMs) { + logWarning("Current batch is falling behind. The trigger period is " + + s"${triggerPeriodMs} milliseconds, but spent ${batchElapsedTime} milliseconds") + } else { + Thread.sleep(triggerPeriodMs - batchElapsedTime) + } + } } } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 4ca739450c607..e509c378e0bf2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -276,7 +276,7 @@ trait StreamTest extends QueryTest with Timeouts { currentStream = sqlContext .streams - .startQuery(StreamExecution.nextName, metadataRoot, stream, sink) + .startQuery(StreamExecution.nextName, metadataRoot, stream, sink, 10L) .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 54ce98d195e25..baa55133a560e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -243,7 +243,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with StreamExecution.nextName, metadataRoot, df, - new MemorySink(df.schema)) + new MemorySink(df.schema), + 10L) .asInstanceOf[StreamExecution] } catch { case NonFatal(e) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index c1bab9b577bbb..e1a96dff1a463 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.streaming.test +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration._ + import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ @@ -274,4 +278,28 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(activeStreamNames.contains("name")) sqlContext.streams.active.foreach(_.stop()) } + + test("trigger") { + val df = sqlContext.read + .format("org.apache.spark.sql.streaming.test") + .stream("/test") + + df.write + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .trigger(10.seconds) + .startStream() + .stop() + + assert(LastOptions.parameters("period") == "10000") + + df.write + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .trigger(100, TimeUnit.SECONDS) + .startStream() + .stop() + + assert(LastOptions.parameters("period") == "100000") + } } From ab5dbd33acda7b300565da294a472e544d341a1c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 28 Mar 2016 15:54:35 -0700 Subject: [PATCH 2/7] period -> interval --- .../spark/sql/ContinuousQueryManager.scala | 4 ++-- .../org/apache/spark/sql/DataFrameWriter.scala | 18 +++++++++--------- .../execution/streaming/StreamExecution.scala | 12 ++++++------ .../streaming/DataFrameReaderWriterSuite.scala | 4 ++-- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 1bcf4aa46c867..02b53e65049a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -172,7 +172,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) { checkpointLocation: String, df: DataFrame, sink: Sink, - triggerPeriodMs: Long): ContinuousQuery = { + triggerIntervalMs: Long): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { throw new IllegalArgumentException( @@ -184,7 +184,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) { checkpointLocation, df.logicalPlan, sink, - triggerPeriodMs) + triggerIntervalMs) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e5dc9d258a518..9be707d516449 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -79,22 +79,22 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** - * Set the trigger period for the stream query. + * Set the trigger interval for the stream query. * * @since 2.0.0 */ - def trigger(period: Duration): DataFrameWriter = { - this.extraOptions += ("period" -> period.toMillis.toString) + def trigger(interval: Duration): DataFrameWriter = { + this.extraOptions += ("triggerInterval" -> interval.toMillis.toString) this } /** - * Set the trigger period for the stream query. + * Set the trigger interval for the stream query. * * @since 2.0.0 */ - def trigger(period: Long, unit: TimeUnit): DataFrameWriter = { - this.extraOptions += ("period" -> unit.toMillis(period).toString) + def trigger(interval: Long, unit: TimeUnit): DataFrameWriter = { + this.extraOptions += ("triggerInterval" -> unit.toMillis(interval).toString) this } @@ -278,14 +278,14 @@ final class DataFrameWriter private[sql](df: DataFrame) { val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString }) - val triggerPeriodMs = extraOptions.getOrElse("period", "0").toLong - require(triggerPeriodMs >= 0, "the period of trigger should not be negative") + val triggerIntervalMs = extraOptions.getOrElse("triggerInterval", "0").toLong + require(triggerIntervalMs >= 0, "the interval of trigger should not be negative") df.sqlContext.sessionState.continuousQueryManager.startQuery( queryName, checkpointLocation, df, dataSource.createSink(), - triggerPeriodMs) + triggerIntervalMs) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 98b33f6653fca..53015e9027fc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -47,7 +47,7 @@ class StreamExecution( val checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, val sink: Sink, - triggerPeriodMs: Long) extends ContinuousQuery with Logging { + triggerIntervalMs: Long) extends ContinuousQuery with Logging { /** An monitor used to wait/notify when batches complete. */ private val awaitBatchLock = new Object @@ -213,13 +213,13 @@ class StreamExecution( val batchStartTimeMs = System.currentTimeMillis() if (dataAvailable) runBatch() commitAndConstructNextBatch() - if (triggerPeriodMs > 0) { + if (triggerIntervalMs > 0) { val batchElapsedTime = System.currentTimeMillis() - batchStartTimeMs - if (batchElapsedTime > triggerPeriodMs) { - logWarning("Current batch is falling behind. The trigger period is " + - s"${triggerPeriodMs} milliseconds, but spent ${batchElapsedTime} milliseconds") + if (batchElapsedTime > triggerIntervalMs) { + logWarning("Current batch is falling behind. The trigger interval is " + + s"${triggerIntervalMs} milliseconds, but spent ${batchElapsedTime} milliseconds") } else { - Thread.sleep(triggerPeriodMs - batchElapsedTime) + Thread.sleep(triggerIntervalMs - batchElapsedTime) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index e1a96dff1a463..ed0be2e41b282 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -291,7 +291,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .startStream() .stop() - assert(LastOptions.parameters("period") == "10000") + assert(LastOptions.parameters("triggerInterval") == "10000") df.write .format("org.apache.spark.sql.streaming.test") @@ -300,6 +300,6 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .startStream() .stop() - assert(LastOptions.parameters("period") == "100000") + assert(LastOptions.parameters("triggerInterval") == "100000") } } From 92d204c8aab4d0fe149adc812e781c616a0b30f4 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 29 Mar 2016 22:55:27 -0700 Subject: [PATCH 3/7] Add Trigger and ProcessingTime to control how to execute a batch --- .../spark/sql/ContinuousQueryManager.scala | 6 +- .../apache/spark/sql/DataFrameWriter.scala | 12 ++-- .../execution/streaming/StreamExecution.scala | 23 +++--- .../sql/execution/streaming/Trigger.scala | 70 +++++++++++++++++++ .../org/apache/spark/sql/StreamTest.scala | 7 +- .../streaming/ProcessingTimeSuite.scala | 32 +++++++++ .../ContinuousQueryManagerSuite.scala | 4 +- .../DataFrameReaderWriterSuite.scala | 12 ++-- 8 files changed, 134 insertions(+), 32 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Trigger.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 02b53e65049a1..132ed9c953ac3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import scala.collection.mutable import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{ContinuousQueryListenerBus, Sink, StreamExecution} +import org.apache.spark.sql.execution.streaming.{ContinuousQueryListenerBus, Sink, StreamExecution, Trigger} import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.util.ContinuousQueryListener @@ -172,7 +172,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) { checkpointLocation: String, df: DataFrame, sink: Sink, - triggerIntervalMs: Long): ContinuousQuery = { + trigger: Trigger): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { throw new IllegalArgumentException( @@ -184,7 +184,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) { checkpointLocation, df.logicalPlan, sink, - triggerIntervalMs) + trigger) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9be707d516449..6280f9593803a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils -import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.{ProcessingTime, StreamExecution, Trigger} import org.apache.spark.sql.sources.HadoopFsRelation /** @@ -84,7 +84,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 2.0.0 */ def trigger(interval: Duration): DataFrameWriter = { - this.extraOptions += ("triggerInterval" -> interval.toMillis.toString) + trigger = ProcessingTime(interval.toMillis) this } @@ -94,7 +94,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 2.0.0 */ def trigger(interval: Long, unit: TimeUnit): DataFrameWriter = { - this.extraOptions += ("triggerInterval" -> unit.toMillis(interval).toString) + trigger = ProcessingTime(unit.toMillis(interval)) this } @@ -278,14 +278,12 @@ final class DataFrameWriter private[sql](df: DataFrame) { val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString }) - val triggerIntervalMs = extraOptions.getOrElse("triggerInterval", "0").toLong - require(triggerIntervalMs >= 0, "the interval of trigger should not be negative") df.sqlContext.sessionState.continuousQueryManager.startQuery( queryName, checkpointLocation, df, dataSource.createSink(), - triggerIntervalMs) + trigger) } /** @@ -576,6 +574,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { private var mode: SaveMode = SaveMode.ErrorIfExists + private var trigger: Trigger = ProcessingTime(0L) + private var extraOptions = new scala.collection.mutable.HashMap[String, String] private var partitioningColumns: Option[Seq[String]] = None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 53015e9027fc9..32b5886910191 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -47,7 +47,7 @@ class StreamExecution( val checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, val sink: Sink, - triggerIntervalMs: Long) extends ContinuousQuery with Logging { + val trigger: Trigger) extends ContinuousQuery with Logging { /** An monitor used to wait/notify when batches complete. */ private val awaitBatchLock = new Object @@ -209,20 +209,15 @@ class StreamExecution( SQLContext.setActive(sqlContext) populateStartOffsets() logDebug(s"Stream running from $committedOffsets to $availableOffsets") - while (isActive) { - val batchStartTimeMs = System.currentTimeMillis() - if (dataAvailable) runBatch() - commitAndConstructNextBatch() - if (triggerIntervalMs > 0) { - val batchElapsedTime = System.currentTimeMillis() - batchStartTimeMs - if (batchElapsedTime > triggerIntervalMs) { - logWarning("Current batch is falling behind. The trigger interval is " + - s"${triggerIntervalMs} milliseconds, but spent ${batchElapsedTime} milliseconds") - } else { - Thread.sleep(triggerIntervalMs - batchElapsedTime) - } + trigger.execute(() => { + if (isActive) { + if (dataAvailable) runBatch() + commitAndConstructNextBatch() + true + } else { + false } - } + }) } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() case NonFatal(e) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Trigger.scala new file mode 100644 index 0000000000000..bb0470823777d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Trigger.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging + +/** + * A interface that indicates how to run a batch. + */ +trait Trigger { + + /** + * Execute batches using `batchRunner`. If `batchRunner` runs `false`, terminate the execution. + */ + def execute(batchRunner: () => Boolean): Unit +} + +/** + * A trigger that runs a batch every `intervalMs` milliseconds. + */ +case class ProcessingTime(intervalMs: Long) extends Trigger with Logging { + + require(intervalMs >= 0, "the interval of trigger should not be negative") + + override def execute(batchRunner: () => Boolean): Unit = { + while (true) { + val batchStartTimeMs = System.currentTimeMillis() + if (!batchRunner()) { + return + } + if (intervalMs > 0) { + val batchEndTimeMs = System.currentTimeMillis() + val batchElapsedTimeMs = batchEndTimeMs - batchStartTimeMs + if (batchElapsedTimeMs > intervalMs) { + logWarning("Current batch is falling behind. The trigger interval is " + + s"${intervalMs} milliseconds, but spent ${batchElapsedTimeMs} milliseconds") + } + waitUntil(nextBatchTime(batchEndTimeMs)) + } + } + } + + private def waitUntil(time: Long): Unit = { + var now = System.currentTimeMillis() + while (now < time) { + Thread.sleep(time - now) + now = System.currentTimeMillis() + } + } + + /** Return the next multiple of intervalMs */ + def nextBatchTime(now: Long): Long = { + (now - 1) / intervalMs * intervalMs + intervalMs + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index e509c378e0bf2..7d333268b7ada 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -276,7 +276,12 @@ trait StreamTest extends QueryTest with Timeouts { currentStream = sqlContext .streams - .startQuery(StreamExecution.nextName, metadataRoot, stream, sink, 10L) + .startQuery( + StreamExecution.nextName, + metadataRoot, + stream, + sink, + ProcessingTime(0L)) .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeSuite.scala new file mode 100644 index 0000000000000..d70446eeb605d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeSuite.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.SparkFunSuite + +class ProcessingTimeSuite extends SparkFunSuite { + + test("nextBatchTime") { + val processingTime = ProcessingTime(100) + assert(processingTime.nextBatchTime(1) === 100) + assert(processingTime.nextBatchTime(99) === 100) + assert(processingTime.nextBatchTime(100) === 100) + assert(processingTime.nextBatchTime(101) === 200) + assert(processingTime.nextBatchTime(150) === 200) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index baa55133a560e..fd43744006291 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} -import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -244,7 +244,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with metadataRoot, df, new MemorySink(df.schema), - 10L) + ProcessingTime(0)) .asInstanceOf[StreamExecution] } catch { case NonFatal(e) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index ed0be2e41b282..e2973edf2ae54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -284,22 +284,22 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .stream("/test") - df.write + var q = df.write .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .trigger(10.seconds) .startStream() - .stop() + q.stop() - assert(LastOptions.parameters("triggerInterval") == "10000") + assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(10000)) - df.write + q = df.write .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .trigger(100, TimeUnit.SECONDS) .startStream() - .stop() + q.stop() - assert(LastOptions.parameters("triggerInterval") == "100000") + assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000)) } } From a7355ed88434d006cf9afdecb3f2eeb9d249b8fc Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 30 Mar 2016 16:59:07 -0700 Subject: [PATCH 4/7] Add 'trigger(trigger: Trigger) and remove other APIs' --- .../spark/sql/ContinuousQueryManager.scala | 2 +- .../apache/spark/sql/DataFrameWriter.scala | 19 +--- .../scala/org/apache/spark/sql/Trigger.scala | 87 +++++++++++++++++++ .../execution/streaming/StreamExecution.scala | 7 +- .../{Trigger.scala => TriggerExecutor.scala} | 13 ++- .../spark/sql/ProcessingTimeSuite.scala | 39 +++++++++ ...cala => ProcessingTimeExecutorSuite.scala} | 15 ++-- .../ContinuousQueryManagerSuite.scala | 2 +- .../DataFrameReaderWriterSuite.scala | 4 +- 9 files changed, 154 insertions(+), 34 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{Trigger.scala => TriggerExecutor.scala} (87%) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala rename sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/{ProcessingTimeSuite.scala => ProcessingTimeExecutorSuite.scala} (65%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 132ed9c953ac3..c5577ac60649e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import scala.collection.mutable import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{ContinuousQueryListenerBus, Sink, StreamExecution, Trigger} +import org.apache.spark.sql.execution.streaming.{ContinuousQueryListenerBus, Sink, StreamExecution} import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.util.ContinuousQueryListener diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6280f9593803a..2ba69203ba58a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql import java.util.Properties import scala.collection.JavaConverters._ -import scala.concurrent.duration._ import org.apache.hadoop.fs.Path @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils -import org.apache.spark.sql.execution.streaming.{ProcessingTime, StreamExecution, Trigger} +import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.HadoopFsRelation /** @@ -79,22 +78,12 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** - * Set the trigger interval for the stream query. + * Set the trigger for the stream query. * * @since 2.0.0 */ - def trigger(interval: Duration): DataFrameWriter = { - trigger = ProcessingTime(interval.toMillis) - this - } - - /** - * Set the trigger interval for the stream query. - * - * @since 2.0.0 - */ - def trigger(interval: Long, unit: TimeUnit): DataFrameWriter = { - trigger = ProcessingTime(unit.toMillis(interval)) + def trigger(trigger: Trigger): DataFrameWriter = { + this.trigger = trigger this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala new file mode 100644 index 0000000000000..7b19aba6e5753 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.Duration + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.unsafe.types.CalendarInterval + +/** + * A interface that indicates how to run a batch. + */ +sealed trait Trigger {} + +/** + * A trigger that runs a batch periodically based on the procesing time. + * + * Scala Example: + * {{{ + * def.writer.trigger(ProcessingTime(10.seconds)) + * def.writer.trigger(ProcessingTime("10 seconds")) + * + * }}} + * + * Java Example: + * + * {{{ + * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * def.writer.trigger(ProcessingTime.create("10 seconds")) + * + * }}} + */ +case class ProcessingTime(intervalMs: Long) extends Trigger { + require(intervalMs >= 0, "the interval of trigger should not be negative") +} + +object ProcessingTime { + + def apply(interval: String): ProcessingTime = { + if (StringUtils.isBlank(interval)) { + throw new IllegalArgumentException( + "interval cannot be null or blank.") + } + val cal = if (interval.startsWith("interval")) { + CalendarInterval.fromString(interval) + } else { + CalendarInterval.fromString("interval " + interval) + } + if (cal == null) { + throw new IllegalArgumentException(s"Invalid interval: $interval") + } + if (cal.months > 0) { + throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") + } + new ProcessingTime(cal.microseconds / 1000) + } + + def apply(interval: Duration): ProcessingTime = { + new ProcessingTime(interval.toMillis) + } + + def create(interval: String): ProcessingTime = { + apply(interval) + } + + def create(interval: Long, unit: TimeUnit): ProcessingTime = { + new ProcessingTime(unit.toMillis(interval)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 32b5886910191..6edb1d31846fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -76,6 +76,11 @@ class StreamExecution( /** A list of unique sources in the query plan. */ private val uniqueSources = sources.distinct + private val triggerExecutor = trigger match { + case t: ProcessingTime => ProcessingTimeExecutor(t) + case t => throw new IllegalArgumentException(s"${t.getClass} is not supported") + } + /** Defines the internal state of execution */ @volatile private var state: State = INITIALIZED @@ -209,7 +214,7 @@ class StreamExecution( SQLContext.setActive(sqlContext) populateStartOffsets() logDebug(s"Stream running from $committedOffsets to $availableOffsets") - trigger.execute(() => { + triggerExecutor.execute(() => { if (isActive) { if (dataAvailable) runBatch() commitAndConstructNextBatch() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala similarity index 87% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Trigger.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index bb0470823777d..568767363f712 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -18,11 +18,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging +import org.apache.spark.sql.ProcessingTime -/** - * A interface that indicates how to run a batch. - */ -trait Trigger { +trait TriggerExecutor { /** * Execute batches using `batchRunner`. If `batchRunner` runs `false`, terminate the execution. @@ -31,11 +29,12 @@ trait Trigger { } /** - * A trigger that runs a batch every `intervalMs` milliseconds. + * A trigger executor that runs a batch every `intervalMs` milliseconds. */ -case class ProcessingTime(intervalMs: Long) extends Trigger with Logging { +case class ProcessingTimeExecutor(processingTime: ProcessingTime) + extends TriggerExecutor with Logging { - require(intervalMs >= 0, "the interval of trigger should not be negative") + private val intervalMs = processingTime.intervalMs override def execute(batchRunner: () => Boolean): Unit = { while (true) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala new file mode 100644 index 0000000000000..a241a4d53d069 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration._ + +import org.apache.spark.SparkFunSuite + +class ProcessingTimeSuite extends SparkFunSuite { + + test("create") { + assert(ProcessingTime(10.seconds).intervalMs === 10 * 1000) + assert(ProcessingTime.create(10, TimeUnit.SECONDS).intervalMs === 10 * 1000) + assert(ProcessingTime("1 minute").intervalMs === 60 * 1000) + + intercept[IllegalArgumentException] { ProcessingTime(null: String) } + intercept[IllegalArgumentException] { ProcessingTime("") } + intercept[IllegalArgumentException] { ProcessingTime("invalid") } + intercept[IllegalArgumentException] { ProcessingTime("1 month") } + intercept[IllegalArgumentException] { ProcessingTime("1 year") } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala similarity index 65% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index d70446eeb605d..e6a0ce6239e0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -18,15 +18,16 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.ProcessingTime -class ProcessingTimeSuite extends SparkFunSuite { +class ProcessingTimeExecutorSuite extends SparkFunSuite { test("nextBatchTime") { - val processingTime = ProcessingTime(100) - assert(processingTime.nextBatchTime(1) === 100) - assert(processingTime.nextBatchTime(99) === 100) - assert(processingTime.nextBatchTime(100) === 100) - assert(processingTime.nextBatchTime(101) === 200) - assert(processingTime.nextBatchTime(150) === 200) + val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100)) + assert(processingTimeExecutor.nextBatchTime(1) === 100) + assert(processingTimeExecutor.nextBatchTime(99) === 100) + assert(processingTimeExecutor.nextBatchTime(100) === 100) + assert(processingTimeExecutor.nextBatchTime(101) === 200) + assert(processingTimeExecutor.nextBatchTime(150) === 200) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index fd43744006291..f4f4c52080089 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} +import org.apache.spark.sql.{ContinuousQuery, Dataset, ProcessingTime, StreamTest} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index e2973edf2ae54..575a5806020c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -287,7 +287,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B var q = df.write .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) - .trigger(10.seconds) + .trigger(ProcessingTime(10.seconds)) .startStream() q.stop() @@ -296,7 +296,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B q = df.write .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) - .trigger(100, TimeUnit.SECONDS) + .trigger(ProcessingTime.create(100, TimeUnit.SECONDS)) .startStream() q.stop() From f3526d0dbc265ab600c88b66ed7ecc36f0afb149 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 30 Mar 2016 17:09:16 -0700 Subject: [PATCH 5/7] Fix some docs --- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 3 ++- sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala | 5 ++--- .../scala/org/apache/spark/sql/ProcessingTimeSuite.scala | 1 + 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2ba69203ba58a..e3f22d8923048 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -78,7 +78,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** - * Set the trigger for the stream query. + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala index 7b19aba6e5753..3a655cb201177 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala @@ -31,13 +31,13 @@ import org.apache.spark.unsafe.types.CalendarInterval sealed trait Trigger {} /** - * A trigger that runs a batch periodically based on the procesing time. + * A trigger that runs a query periodically based on the processing time. If `intervalMs` is 0, + * the query will run as fast as possible. * * Scala Example: * {{{ * def.writer.trigger(ProcessingTime(10.seconds)) * def.writer.trigger(ProcessingTime("10 seconds")) - * * }}} * * Java Example: @@ -45,7 +45,6 @@ sealed trait Trigger {} * {{{ * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * def.writer.trigger(ProcessingTime.create("10 seconds")) - * * }}} */ case class ProcessingTime(intervalMs: Long) extends Trigger { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala index a241a4d53d069..0d18a645f6790 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala @@ -29,6 +29,7 @@ class ProcessingTimeSuite extends SparkFunSuite { assert(ProcessingTime(10.seconds).intervalMs === 10 * 1000) assert(ProcessingTime.create(10, TimeUnit.SECONDS).intervalMs === 10 * 1000) assert(ProcessingTime("1 minute").intervalMs === 60 * 1000) + assert(ProcessingTime("interval 1 minute").intervalMs === 60 * 1000) intercept[IllegalArgumentException] { ProcessingTime(null: String) } intercept[IllegalArgumentException] { ProcessingTime("") } From 7c4bc422bba49cd59a4f9cc3450352d68f61c1db Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 1 Apr 2016 14:01:18 -0700 Subject: [PATCH 6/7] Address comments --- .../spark/sql/ContinuousQueryManager.scala | 2 +- .../apache/spark/sql/DataFrameWriter.scala | 18 ++++++ .../scala/org/apache/spark/sql/Trigger.scala | 55 +++++++++++++++++-- .../execution/streaming/StreamExecution.scala | 1 - .../execution/streaming/TriggerExecutor.scala | 33 ++++++----- .../org/apache/spark/sql/StreamTest.scala | 3 +- .../ProcessingTimeExecutorSuite.scala | 45 +++++++++++++++ .../ContinuousQueryManagerSuite.scala | 10 ++-- 8 files changed, 139 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index c5577ac60649e..2306df09b8b76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -172,7 +172,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) { checkpointLocation: String, df: DataFrame, sink: Sink, - trigger: Trigger): ContinuousQuery = { + trigger: Trigger = ProcessingTime(0)): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { throw new IllegalArgumentException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e3f22d8923048..3332a997cda90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -78,11 +78,29 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** + * :: Experimental :: * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run * the query as fast as possible. * + * Scala Example: + * {{{ + * def.writer.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * def.writer.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * def.writer.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * * @since 2.0.0 */ + @Experimental def trigger(trigger: Trigger): DataFrameWriter = { this.trigger = trigger this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala index 3a655cb201177..c4e54b3f90ac5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala @@ -23,36 +23,57 @@ import scala.concurrent.duration.Duration import org.apache.commons.lang3.StringUtils +import org.apache.spark.annotation.Experimental import org.apache.spark.unsafe.types.CalendarInterval /** - * A interface that indicates how to run a batch. + * :: Experimental :: + * Used to indicate how often results should be produced by a [[ContinuousQuery]]. */ +@Experimental sealed trait Trigger {} /** + * :: Experimental :: * A trigger that runs a query periodically based on the processing time. If `intervalMs` is 0, * the query will run as fast as possible. * * Scala Example: * {{{ - * def.writer.trigger(ProcessingTime(10.seconds)) * def.writer.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * def.writer.trigger(ProcessingTime(10.seconds)) * }}} * * Java Example: - * * {{{ - * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * def.writer.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} */ +@Experimental case class ProcessingTime(intervalMs: Long) extends Trigger { require(intervalMs >= 0, "the interval of trigger should not be negative") } +/** + * :: Experimental :: + * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s. + */ +@Experimental object ProcessingTime { + /** + * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * def.writer.trigger(ProcessingTime("10 seconds")) + * }}} + */ def apply(interval: String): ProcessingTime = { if (StringUtils.isBlank(interval)) { throw new IllegalArgumentException( @@ -72,14 +93,40 @@ object ProcessingTime { new ProcessingTime(cal.microseconds / 1000) } + /** + * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * import scala.concurrent.duration._ + * def.writer.trigger(ProcessingTime(10.seconds)) + * }}} + */ def apply(interval: Duration): ProcessingTime = { new ProcessingTime(interval.toMillis) } + /** + * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * def.writer.trigger(ProcessingTime.create("10 seconds")) + * }}} + */ def create(interval: String): ProcessingTime = { apply(interval) } + /** + * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * import java.util.concurrent.TimeUnit + * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + */ def create(interval: Long, unit: TimeUnit): ProcessingTime = { new ProcessingTime(unit.toMillis(interval)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6edb1d31846fc..a57f7ff3735af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -78,7 +78,6 @@ class StreamExecution( private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t) - case t => throw new IllegalArgumentException(s"${t.getClass} is not supported") } /** Defines the internal state of execution */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index 568767363f712..a1132d510685c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging import org.apache.spark.sql.ProcessingTime +import org.apache.spark.util.{Clock, SystemClock} trait TriggerExecutor { @@ -31,35 +32,37 @@ trait TriggerExecutor { /** * A trigger executor that runs a batch every `intervalMs` milliseconds. */ -case class ProcessingTimeExecutor(processingTime: ProcessingTime) +case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock()) extends TriggerExecutor with Logging { private val intervalMs = processingTime.intervalMs override def execute(batchRunner: () => Boolean): Unit = { while (true) { - val batchStartTimeMs = System.currentTimeMillis() - if (!batchRunner()) { - return - } + val batchStartTimeMs = clock.getTimeMillis() + val terminated = !batchRunner() if (intervalMs > 0) { - val batchEndTimeMs = System.currentTimeMillis() + val batchEndTimeMs = clock.getTimeMillis() val batchElapsedTimeMs = batchEndTimeMs - batchStartTimeMs if (batchElapsedTimeMs > intervalMs) { - logWarning("Current batch is falling behind. The trigger interval is " + - s"${intervalMs} milliseconds, but spent ${batchElapsedTimeMs} milliseconds") + notifyBatchFallingBehind(batchElapsedTimeMs) + } + if (terminated) { + return + } + clock.waitTillTime(nextBatchTime(batchEndTimeMs)) + } else { + if (terminated) { + return } - waitUntil(nextBatchTime(batchEndTimeMs)) } } } - private def waitUntil(time: Long): Unit = { - var now = System.currentTimeMillis() - while (now < time) { - Thread.sleep(time - now) - now = System.currentTimeMillis() - } + /** Called when a batch falls behind. Expose for test only */ + def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = { + logWarning("Current batch is falling behind. The trigger interval is " + + s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds") } /** Return the next multiple of intervalMs */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 7d333268b7ada..8e618d3b4e7d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -280,8 +280,7 @@ trait StreamTest extends QueryTest with Timeouts { StreamExecution.nextName, metadataRoot, stream, - sink, - ProcessingTime(0L)) + sink) .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index e6a0ce6239e0d..b2bc7788d6f79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.execution.streaming +import java.util.concurrent.{CountDownLatch, TimeUnit} + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.ProcessingTime +import org.apache.spark.util.ManualClock class ProcessingTimeExecutorSuite extends SparkFunSuite { @@ -30,4 +33,46 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { assert(processingTimeExecutor.nextBatchTime(101) === 200) assert(processingTimeExecutor.nextBatchTime(150) === 200) } + + private def testBatchTermination(intervalMs: Long): Unit = { + @volatile var batchCounts = 0 + val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs)) + processingTimeExecutor.execute(() => { + batchCounts += 1 + // If the batch termination works well, batchCounts should be 3 after `execute` + batchCounts < 3 + }) + assert(batchCounts === 3) + } + + test("batch termination") { + testBatchTermination(0) + testBatchTermination(10) + } + + test("notifyBatchFallingBehind") { + val clock = new ManualClock() + @volatile var batchFallingBehindCalled = false + val latch = new CountDownLatch(1) + val t = new Thread() { + override def run(): Unit = { + val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTime(100), clock) { + override def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = { + batchFallingBehindCalled = true + } + } + processingTimeExecutor.execute(() => { + latch.countDown() + clock.waitTillTime(200) + false + }) + } + } + t.start() + // Wait until the batch is running so that we don't call `advance` too early + assert(latch.await(10, TimeUnit.SECONDS), "the batch has not yet started in 10 seconds") + clock.advance(200) + t.join() + assert(batchFallingBehindCalled === true) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index f4f4c52080089..29bd3e018ed04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -28,8 +28,8 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.sql.{ContinuousQuery, Dataset, ProcessingTime, StreamTest} -import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} +import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -236,15 +236,15 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with @volatile var query: StreamExecution = null try { val df = ds.toDF - val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath + val metadataRoot = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath query = sqlContext .streams .startQuery( StreamExecution.nextName, metadataRoot, df, - new MemorySink(df.schema), - ProcessingTime(0)) + new MemorySink(df.schema)) .asInstanceOf[StreamExecution] } catch { case NonFatal(e) => From 6c1b382be6fbe07fabbb92249c06967f9603ac00 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 1 Apr 2016 15:51:48 -0700 Subject: [PATCH 7/7] Remove unnecessary volatile --- .../sql/execution/streaming/ProcessingTimeExecutorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index b2bc7788d6f79..dd5f92248bf5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -35,7 +35,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { } private def testBatchTermination(intervalMs: Long): Unit = { - @volatile var batchCounts = 0 + var batchCounts = 0 val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs)) processingTimeExecutor.execute(() => { batchCounts += 1