From 50c39b83e9d0f0a3ebe8cf5cb2675662b320a6e9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 28 Mar 2016 17:09:28 -0700 Subject: [PATCH 1/5] Allow multiple continuous queries to be started from the same DataFrame --- .../apache/spark/sql/DataFrameReader.scala | 2 +- .../execution/streaming/StreamExecution.scala | 23 +++++-- .../streaming/StreamingRelation.scala | 17 +++-- .../ContinuousQueryManagerSuite.scala | 4 +- .../sql/streaming/FileStreamSourceSuite.scala | 10 +-- .../spark/sql/streaming/StreamSuite.scala | 64 ++++++++++++++++++- 6 files changed, 101 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 704535adaa60d..f9551f335c27c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -176,7 +176,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) - Dataset.ofRows(sqlContext, StreamingRelation(dataSource.createSource())) + Dataset.ofRows(sqlContext, StreamingRelation(dataSource)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c4e410d92cea3..a320be9cbdebe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -42,11 +42,11 @@ import org.apache.spark.util.UninterruptibleThread * and the results are committed transactionally to the given [[Sink]]. */ class StreamExecution( - val sqlContext: SQLContext, + override val sqlContext: SQLContext, override val name: String, - val checkpointRoot: String, - private[sql] val logicalPlan: LogicalPlan, - val sink: Sink) extends ContinuousQuery with Logging { + checkpointRoot: String, + _logicalPlan: LogicalPlan, + sink: Sink) extends ContinuousQuery with Logging { /** An monitor used to wait/notify when batches complete. */ private val awaitBatchLock = new Object @@ -71,9 +71,18 @@ class StreamExecution( /** The current batchId or -1 if execution has not yet been initialized. */ private var currentBatchId: Long = -1 + private[sql] val logicalPlan = _logicalPlan.transform { + case StreamingRelation(sourceCreator, output) => + // Materialize source to avoid creating it in every batch + val source = sourceCreator() + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "_logicalPlan" has already used attributes of the previous `output`. + StreamingRelation(() => source, output) + } + /** All stream sources present the query plan. */ private val sources = - logicalPlan.collect { case s: StreamingRelation => s.source } + logicalPlan.collect { case s: StreamingRelation => s.sourceCreator() } /** A list of unique sources in the query plan. */ private val uniqueSources = sources.distinct @@ -286,8 +295,8 @@ class StreamExecution( var replacements = new ArrayBuffer[(Attribute, Attribute)] // Replace sources in the logical plan with data that has arrived since the last batch. val withNewSources = logicalPlan transform { - case StreamingRelation(source, output) => - newData.get(source).map { data => + case StreamingRelation(sourceCreator, output) => + newData.get(sourceCreator()).map { data => val newPlan = data.logicalPlan assert(output.size == newPlan.output.size, s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index e35c444348f48..5836c3f2e6d58 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -19,16 +19,25 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.execution.datasources.DataSource object StreamingRelation { - def apply(source: Source): StreamingRelation = - StreamingRelation(source, source.schema.toAttributes) + def apply(dataSource: DataSource): StreamingRelation = { + val source = dataSource.createSource() + StreamingRelation(dataSource.createSource, source.schema.toAttributes) + } + + def apply(source: Source): StreamingRelation = { + StreamingRelation(() => source, source.schema.toAttributes) + } } /** * Used to link a streaming [[Source]] of data into a * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. */ -case class StreamingRelation(source: Source, output: Seq[Attribute]) extends LeafNode { - override def toString: String = source.toString +case class StreamingRelation( + sourceCreator: () => Source, + output: Seq[Attribute]) extends LeafNode { + override def toString: String = sourceCreator().toString } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 54ce98d195e25..53b5c27ad97ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -293,8 +293,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with if (withError) { logDebug(s"Terminating query ${queryToStop.name} with error") queryToStop.asInstanceOf[StreamExecution].logicalPlan.collect { - case StreamingRelation(memoryStream, _) => - memoryStream.asInstanceOf[MemoryStream[Int]].addData(0) + case StreamingRelation(sourceCreator, _) => + sourceCreator().asInstanceOf[MemoryStream[Int]].addData(0) } } else { logDebug(s"Stopping query ${queryToStop.name}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 89de15acf506d..d7d017647e556 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -71,8 +71,9 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { } reader.stream(path) .queryExecution.analyzed - .collect { case StreamingRelation(s: FileStreamSource, _) => s } - .head + .collect { case StreamingRelation(sourceCreator, _) => + sourceCreator().asInstanceOf[FileStreamSource] + }.head } val valueSchema = new StructType().add("value", StringType) @@ -96,8 +97,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { reader.stream() } df.queryExecution.analyzed - .collect { case StreamingRelation(s: FileStreamSource, _) => s } - .head + .collect { case StreamingRelation(sourceCreator, _) => + sourceCreator().asInstanceOf[FileStreamSource] + }.head .schema } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index fbb1792596b18..83200fce7817d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.streaming -import org.apache.spark.sql.{Row, StreamTest} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.sql.{DataFrame, Row, SQLContext, StreamTest} import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class StreamSuite extends StreamTest with SharedSQLContext { @@ -81,4 +85,62 @@ class StreamSuite extends StreamTest with SharedSQLContext { AddData(inputData, 1, 2, 3, 4), CheckAnswer(2, 4)) } + + test("DataFrame reuse") { + def assertDF(df: DataFrame) { + withTempDir { outputDir => + withTempDir { checkpointDir => + val query = df.write.format("parquet") + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .startStream(outputDir.getAbsolutePath) + try { + eventually(timeout(streamingTimeout)) { + val outputDf = sqlContext.read.parquet(outputDir.getAbsolutePath).as[Long] + checkDataset[Long](outputDf, (0L to 10L).toArray: _*) + } + } finally { + query.stop() + } + } + } + } + + val df = sqlContext.read.format(classOf[FakeDefaultSource].getName).stream() + assertDF(df) + assertDF(df) + assertDF(df) + } +} + +/** + * A fake StreamSourceProvider thats creates a fake Source that cannot be reused. + */ +class FakeDefaultSource extends StreamSourceProvider { + + override def createSource( + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + // Create a fake Source that emits 0 to 10. + new Source { + private var offset = -1L + + override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil) + + override def getOffset: Option[Offset] = { + if (offset >= 10) { + None + } else { + offset += 1 + Some(LongOffset(offset)) + } + } + + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + val startOffset = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) + 1 + sqlContext.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a") + } + } + } } From 61967904705767af7c1d95af97c7a814ea1d2207 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 31 Mar 2016 12:03:06 -0700 Subject: [PATCH 2/5] Add StreamingExecutionRelation as a Source container --- .../spark/sql/ContinuousQueryManager.scala | 12 ++++++++-- .../execution/streaming/StreamExecution.scala | 17 ++++---------- .../streaming/StreamingRelation.scala | 22 +++++++++++-------- .../sql/execution/streaming/memory.scala | 6 ++--- .../org/apache/spark/sql/StreamTest.scala | 5 +++-- .../ContinuousQueryManagerSuite.scala | 6 ++--- .../sql/streaming/FileStreamSourceSuite.scala | 8 +++---- .../spark/sql/streaming/StreamSuite.scala | 8 +++---- 8 files changed, 42 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 465feeb60412f..92efd451d0da1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import scala.collection.mutable import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{ContinuousQueryListenerBus, Sink, StreamExecution} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.util.ContinuousQueryListener @@ -177,7 +177,15 @@ class ContinuousQueryManager(sqlContext: SQLContext) { throw new IllegalArgumentException( s"Cannot start query with name $name as a query with that name is already active") } - val query = new StreamExecution(sqlContext, name, checkpointLocation, df.logicalPlan, sink) + val logicalPlan = df.logicalPlan.transform { + case StreamingRelation(dataSource, output) => + // Materialize source to avoid creating it in every batch + val source = dataSource.createSource() + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "_logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output) + } + val query = new StreamExecution(sqlContext, name, checkpointLocation, logicalPlan, sink) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a320be9cbdebe..38ac94886c7fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -45,7 +45,7 @@ class StreamExecution( override val sqlContext: SQLContext, override val name: String, checkpointRoot: String, - _logicalPlan: LogicalPlan, + val logicalPlan: LogicalPlan, sink: Sink) extends ContinuousQuery with Logging { /** An monitor used to wait/notify when batches complete. */ @@ -71,18 +71,9 @@ class StreamExecution( /** The current batchId or -1 if execution has not yet been initialized. */ private var currentBatchId: Long = -1 - private[sql] val logicalPlan = _logicalPlan.transform { - case StreamingRelation(sourceCreator, output) => - // Materialize source to avoid creating it in every batch - val source = sourceCreator() - // We still need to use the previous `output` instead of `source.schema` as attributes in - // "_logicalPlan" has already used attributes of the previous `output`. - StreamingRelation(() => source, output) - } - /** All stream sources present the query plan. */ private val sources = - logicalPlan.collect { case s: StreamingRelation => s.sourceCreator() } + logicalPlan.collect { case s: StreamingExecutionRelation => s.source } /** A list of unique sources in the query plan. */ private val uniqueSources = sources.distinct @@ -295,8 +286,8 @@ class StreamExecution( var replacements = new ArrayBuffer[(Attribute, Attribute)] // Replace sources in the logical plan with data that has arrived since the last batch. val withNewSources = logicalPlan transform { - case StreamingRelation(sourceCreator, output) => - newData.get(sourceCreator()).map { data => + case StreamingExecutionRelation(source, output) => + newData.get(source).map { data => val newPlan = data.logicalPlan assert(output.size == newPlan.output.size, s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 5836c3f2e6d58..d3e6ba55f431c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -24,11 +24,7 @@ import org.apache.spark.sql.execution.datasources.DataSource object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { val source = dataSource.createSource() - StreamingRelation(dataSource.createSource, source.schema.toAttributes) - } - - def apply(source: Source): StreamingRelation = { - StreamingRelation(() => source, source.schema.toAttributes) + StreamingRelation(dataSource, source.schema.toAttributes) } } @@ -36,8 +32,16 @@ object StreamingRelation { * Used to link a streaming [[Source]] of data into a * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. */ -case class StreamingRelation( - sourceCreator: () => Source, - output: Seq[Attribute]) extends LeafNode { - override def toString: String = sourceCreator().toString +case class StreamingRelation(dataSource: DataSource, output: Seq[Attribute]) extends LeafNode { + override def toString: String = dataSource.createSource().toString +} + +case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode { + override def toString: String = source.toString +} + +object StreamingExecutionRelation { + def apply(source: Source): StreamingExecutionRelation = { + StreamingExecutionRelation(source, source.schema.toAttributes) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 0f91e59e04ac9..6523dc4a2f4cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -22,11 +22,9 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext} -import org.apache.spark.sql.catalyst.encoders.{encoderFor, RowEncoder} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.types.StructType object MemoryStream { @@ -45,7 +43,7 @@ object MemoryStream { case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) extends Source with Logging { protected val encoder = encoderFor[A] - protected val logicalPlan = StreamingRelation(this) + protected val logicalPlan = StreamingExecutionRelation(this) protected val output = logicalPlan.output protected val batches = new ArrayBuffer[Dataset[A]] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 4ca739450c607..1b7bb53e3d2ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -36,6 +36,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ import org.apache.spark.util.Utils @@ -66,9 +67,9 @@ import org.apache.spark.util.Utils trait StreamTest extends QueryTest with Timeouts { implicit class RichSource(s: Source) { - def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingRelation(s)) + def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingExecutionRelation(s)) - def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(s)) + def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingExecutionRelation(s)) } /** How long to wait for an active stream to catch up when checking a result. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 53b5c27ad97ba..ed6490154f129 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} -import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -293,8 +293,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with if (withError) { logDebug(s"Terminating query ${queryToStop.name} with error") queryToStop.asInstanceOf[StreamExecution].logicalPlan.collect { - case StreamingRelation(sourceCreator, _) => - sourceCreator().asInstanceOf[MemoryStream[Int]].addData(0) + case StreamingExecutionRelation(source, _) => + source.asInstanceOf[MemoryStream[Int]].addData(0) } } else { logDebug(s"Stopping query ${queryToStop.name}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index d7d017647e556..a48c5561756b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -71,8 +71,8 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { } reader.stream(path) .queryExecution.analyzed - .collect { case StreamingRelation(sourceCreator, _) => - sourceCreator().asInstanceOf[FileStreamSource] + .collect { case StreamingRelation(dataSource, _) => + dataSource.createSource().asInstanceOf[FileStreamSource] }.head } @@ -97,8 +97,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { reader.stream() } df.queryExecution.analyzed - .collect { case StreamingRelation(sourceCreator, _) => - sourceCreator().asInstanceOf[FileStreamSource] + .collect { case StreamingRelation(dataSource, _) => + dataSource.createSource().asInstanceOf[FileStreamSource] }.head .schema } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 83200fce7817d..e4ea55552691d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -94,10 +94,9 @@ class StreamSuite extends StreamTest with SharedSQLContext { .option("checkpointLocation", checkpointDir.getAbsolutePath) .startStream(outputDir.getAbsolutePath) try { - eventually(timeout(streamingTimeout)) { - val outputDf = sqlContext.read.parquet(outputDir.getAbsolutePath).as[Long] - checkDataset[Long](outputDf, (0L to 10L).toArray: _*) - } + query.processAllAvailable() + val outputDf = sqlContext.read.parquet(outputDir.getAbsolutePath).as[Long] + checkDataset[Long](outputDf, (0L to 10L).toArray: _*) } finally { query.stop() } @@ -108,7 +107,6 @@ class StreamSuite extends StreamTest with SharedSQLContext { val df = sqlContext.read.format(classOf[FakeDefaultSource].getName).stream() assertDF(df) assertDF(df) - assertDF(df) } } From 9b5f00745de1b303f114c6145e2445bcaf76869e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 4 Apr 2016 12:27:49 -0700 Subject: [PATCH 3/5] comments --- .../spark/sql/execution/streaming/StreamingRelation.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index d3e6ba55f431c..ccee2c49ec538 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -29,13 +29,17 @@ object StreamingRelation { } /** - * Used to link a streaming [[Source]] of data into a + * Used to link a streaming [[DataSource]] into a * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. */ case class StreamingRelation(dataSource: DataSource, output: Seq[Attribute]) extends LeafNode { override def toString: String = dataSource.createSource().toString } +/** + * Used to link a streaming [[Source]] of data into a + * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. + */ case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode { override def toString: String = source.toString } From 527f55feef0418c36745f9287f4df1bf5067fe64 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 4 Apr 2016 19:15:42 -0700 Subject: [PATCH 4/5] Fix isStreaming --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index db2134b020167..f472a5068e4b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython -import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.execution.streaming.{StreamingExecutionRelation, StreamingRelation} import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -462,7 +462,9 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @Experimental - def isStreaming: Boolean = logicalPlan.find(_.isInstanceOf[StreamingRelation]).isDefined + def isStreaming: Boolean = logicalPlan.find { n => + n.isInstanceOf[StreamingRelation] || n.isInstanceOf[StreamingExecutionRelation] + }.isDefined /** * Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated, From 48d760eed41a3d559ad8aa6363b6000d4b9ed54d Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 4 Apr 2016 22:21:33 -0700 Subject: [PATCH 5/5] Address --- .../apache/spark/sql/ContinuousQueryManager.scala | 4 ++-- .../sql/execution/streaming/StreamingRelation.scala | 12 ++++++++---- .../spark/sql/streaming/FileStreamSourceSuite.scala | 4 ++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index adff75038bb02..d7f71bd4b0895 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -179,11 +179,11 @@ class ContinuousQueryManager(sqlContext: SQLContext) { s"Cannot start query with name $name as a query with that name is already active") } val logicalPlan = df.logicalPlan.transform { - case StreamingRelation(dataSource, output) => + case StreamingRelation(dataSource, _, output) => // Materialize source to avoid creating it in every batch val source = dataSource.createSource() // We still need to use the previous `output` instead of `source.schema` as attributes in - // "_logicalPlan" has already used attributes of the previous `output`. + // "df.logicalPlan" has already used attributes of the previous `output`. StreamingExecutionRelation(source, output) } val query = new StreamExecution( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index ccee2c49ec538..f951dea735d9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -24,16 +24,20 @@ import org.apache.spark.sql.execution.datasources.DataSource object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { val source = dataSource.createSource() - StreamingRelation(dataSource, source.schema.toAttributes) + StreamingRelation(dataSource, source.toString, source.schema.toAttributes) } } /** * Used to link a streaming [[DataSource]] into a - * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. + * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating + * a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]]. + * It should be used to create [[Source]] and converted to [[StreamingExecutionRelation]] when + * passing to [StreamExecution]] to run a query. */ -case class StreamingRelation(dataSource: DataSource, output: Seq[Attribute]) extends LeafNode { - override def toString: String = dataSource.createSource().toString +case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute]) + extends LeafNode { + override def toString: String = sourceName } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b8d30f410e4c3..09daa7f81a979 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -71,7 +71,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { } reader.stream(path) .queryExecution.analyzed - .collect { case StreamingRelation(dataSource, _) => + .collect { case StreamingRelation(dataSource, _, _) => dataSource.createSource().asInstanceOf[FileStreamSource] }.head } @@ -97,7 +97,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { reader.stream() } df.queryExecution.analyzed - .collect { case StreamingRelation(dataSource, _) => + .collect { case StreamingRelation(dataSource, _, _) => dataSource.createSource().asInstanceOf[FileStreamSource] }.head .schema