From 72472eaea52e1b86cd5c2fee478f449f638f5f36 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 8 Feb 2018 12:28:03 -0800 Subject: [PATCH 1/5] migrate foreach sink --- .../sql/execution/streaming/ForeachSink.scala | 149 +++++++++++++----- .../streaming/MicroBatchExecution.scala | 7 + .../sql/streaming/DataStreamWriter.scala | 2 +- .../streaming/ForeachSinkSuite.scala | 28 +++- 4 files changed, 143 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 2cc54107f8b83..64b7b8cc3e00a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { - // This logic should've been as simple as: - // ``` - // data.as[T].foreachPartition { iter => ... } - // ``` - // - // Unfortunately, doing that would just break the incremental planing. The reason is, - // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will - // create a new plan. Because StreamExecution uses the existing plan to collect metrics and - // update watermark, we should never create a new plan. Otherwise, metrics and watermark are - // updated in the new plan, and StreamExecution cannot retrieval them. - // - // Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) - data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { - try { - while (iter.hasNext) { - writer.process(encoder.fromRow(iter.next())) - } - } catch { - case e: Throwable => - writer.close(e) - throw e - } - writer.close(null) - } else { - writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) + ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( + writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) + extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { + ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) + extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { + new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( + private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) + extends DataWriter[InternalRow] { + private val initialEpochId: Long = { + // Start with the microbatch ID. If it's not there, we're in continuous execution, + // so get the start epoch. + // This ID will be incremented as commits happen. + TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) match { + case null => TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + case batch => batch.toLong + } + } + private var currentEpochId = initialEpochId + + // The lifecycle of the ForeachWriter is incompatible with the lifecycle of DataSourceV2 writers. + // Unfortunately, we cannot migrate ForeachWriter, as its implementations live in user code. So + // we need a small state machine to shim between them. + // * CLOSED means close() has been called. + // * OPENED + private object WriterState extends Enumeration { + type WriterState = Value + val CLOSED, OPENED, OPENED_SKIP_PROCESSING = Value + } + import WriterState._ + + private var state = CLOSED + + private def openAndSetState(epochId: Long) = { + // Create a new writer by roundtripping through the serialization for compatibility. + // In the old API, a writer instantiation would never get reused. + val byteStream = new ByteArrayOutputStream() + val objectStream = new ObjectOutputStream(byteStream) + objectStream.writeObject(writer) + writer = new ObjectInputStream(new ByteArrayInputStream(byteStream.toByteArray)).readObject() + .asInstanceOf[ForeachWriter[T]] + + writer.open(partitionId, epochId) match { + case true => state = OPENED + case false => state = OPENED_SKIP_PROCESSING + } + } + + openAndSetState(initialEpochId) + + override def write(record: InternalRow): Unit = { + try { + state match { + case OPENED => writer.process(encoder.fromRow(record)) + case OPENED_SKIP_PROCESSING => () + case CLOSED => + // First record of a new epoch, so we need to open a new writer for it. + currentEpochId += 1 + openAndSetState(currentEpochId) + writer.process(encoder.fromRow(record)) } + } catch { + case t: Throwable => + writer.close(t) + throw t } } - override def toString(): String = "ForeachSink" + override def commit(): WriterCommitMessage = { + writer.close(null) + ForeachWriterCommitMessage + } + + override def abort(): Unit = {} } + +case object ForeachWriterCommitMessage extends WriterCommitMessage diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 812533313332e..e4e618ca12604 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -461,6 +461,9 @@ class MicroBatchExecution( case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } + sparkSession.sparkContext.setLocalProperty( + MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString) + reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionToRunBatch, @@ -500,3 +503,7 @@ class MicroBatchExecution( Optional.ofNullable(scalaOption.orNull) } } + +object MicroBatchExecution { + val BATCH_ID_KEY = "sql.streaming.microbatch.batchId" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 2fc903168cfa0..10286df75c153 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -269,7 +269,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { query } else if (source == "foreach") { assertNotPartitioned("foreach") - val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc) + val sink = new ForeachWriterProvider[T](foreachWriter)(ds.exprEnc) df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index b249dd41a84a6..e361f51dcc3d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -141,7 +141,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf query.processAllAvailable() } assert(e.getCause.isInstanceOf[SparkException]) - assert(e.getCause.getCause.getMessage === "error") + assert(e.getCause.getCause.getCause.getMessage === "error") assert(query.isActive === false) val allEvents = ForeachSinkSuite.allEvents() @@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf query.stop() } } + + testQuietly("foreach does not reuse writers") { + withTempDir { checkpointDir => + val input = MemoryStream[Int] + val query = input.toDS().repartition(1).writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreach(new TestForeachWriter() { + override def process(value: Int): Unit = { + super.process(this.hashCode()) + } + }).start() + input.addData(0) + query.processAllAvailable() + input.addData(0) + query.processAllAvailable() + + val allEvents = ForeachSinkSuite.allEvents() + assert(allEvents.size === 2) + assert(allEvents(0)(1).isInstanceOf[ForeachSinkSuite.Process[Int]]) + val firstWriterId = allEvents(0)(1).asInstanceOf[ForeachSinkSuite.Process[Int]].value + assert(allEvents(1)(1).isInstanceOf[ForeachSinkSuite.Process[Int]]) + assert( + allEvents(1)(1).asInstanceOf[ForeachSinkSuite.Process[Int]].value != firstWriterId, + "writer was reused!") + } + } } /** A global object to collect events in the executor */ From 87d0bc8ce23ab5a95ba0b5432d6b58042b32bdac Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 8 Feb 2018 12:48:30 -0800 Subject: [PATCH 2/5] fix rebase --- .../org/apache/spark/sql/execution/streaming/ForeachSink.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 64b7b8cc3e00a..6b72d91b35727 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution -import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.streaming.OutputMode From 23e4138e9de276809dc38d0dbcfd20d260be49ec Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 8 Feb 2018 16:33:33 -0800 Subject: [PATCH 3/5] add comments --- .../sql/execution/streaming/ForeachSink.scala | 78 ++++++++++++------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 6b72d91b35727..b2994ef23a39c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -40,30 +40,56 @@ case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends S val encoder = encoderFor[T].resolveAndBind( schema.toAttributes, SparkSession.getActiveSession.get.sessionState.analyzer) - ForeachInternalWriter(writer, encoder) - } -} - -case class ForeachInternalWriter[T: Encoder]( - writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) - extends StreamWriter with SupportsWriteInternalRow { - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - - override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { - ForeachWriterFactory(writer, encoder) + new StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { + val byteStream = new ByteArrayOutputStream() + val objectStream = new ObjectOutputStream(byteStream) + objectStream.writeObject(writer) + ForeachWriterFactory(byteStream.toByteArray, encoder) + } + } } } -case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +case class ForeachWriterFactory[T: Encoder]( + serializedWriter: Array[Byte], + encoder: ExpressionEncoder[T]) extends DataWriterFactory[InternalRow] { override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { - new ForeachDataWriter(writer, encoder, partitionId) + new ForeachDataWriter(serializedWriter, encoder, partitionId) } } +/** + * A [[DataWriter]] for the foreach sink. + * + * Note that [[ForeachWriter]] has the following lifecycle, and (as was true in the V1 sink API) + * assumes that it's never reused: + * * [create writer] + * * open(partitionId, batchId) + * * if open() returned true: write, write, write, ... + * * close() + * while DataSourceV2 writers have a slightly different lifecycle and will be reused for multiple + * epochs in the continuous processing engine: + * * [create writer] + * * write, write, write, ... + * * commit() + * + * The bulk of the implementation here is a shim between these two models. + * + * @param serializedWriter a serialized version of the user-provided [[ForeachWriter]] + * @param encoder encoder from [[Row]] to the type param [[T]] + * @param partitionId the ID of the partition this data writer is responsible for + * + * @tparam T the type of data to be handled by the writer + */ class ForeachDataWriter[T : Encoder]( - private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) + serializedWriter: Array[Byte], + encoder: ExpressionEncoder[T], + partitionId: Int) extends DataWriter[InternalRow] { private val initialEpochId: Long = { // Start with the microbatch ID. If it's not there, we're in continuous execution, @@ -74,28 +100,23 @@ class ForeachDataWriter[T : Encoder]( case batch => batch.toLong } } - private var currentEpochId = initialEpochId - // The lifecycle of the ForeachWriter is incompatible with the lifecycle of DataSourceV2 writers. - // Unfortunately, we cannot migrate ForeachWriter, as its implementations live in user code. So - // we need a small state machine to shim between them. + // A small state machine representing the lifecycle of the underlying ForeachWriter. // * CLOSED means close() has been called. - // * OPENED + // * OPENED means open() was called and returned true. + // * OPENED_SKIP_PROCESSING means open() was called and returned false. private object WriterState extends Enumeration { type WriterState = Value val CLOSED, OPENED, OPENED_SKIP_PROCESSING = Value } import WriterState._ - private var state = CLOSED + private var writer: ForeachWriter[T] = _ + private var state: WriterState = _ + private var currentEpochId = initialEpochId private def openAndSetState(epochId: Long) = { - // Create a new writer by roundtripping through the serialization for compatibility. - // In the old API, a writer instantiation would never get reused. - val byteStream = new ByteArrayOutputStream() - val objectStream = new ObjectOutputStream(byteStream) - objectStream.writeObject(writer) - writer = new ObjectInputStream(new ByteArrayInputStream(byteStream.toByteArray)).readObject() + writer = new ObjectInputStream(new ByteArrayInputStream(serializedWriter)).readObject() .asInstanceOf[ForeachWriter[T]] writer.open(partitionId, epochId) match { @@ -132,4 +153,7 @@ class ForeachDataWriter[T : Encoder]( override def abort(): Unit = {} } +/** + * An empty [[WriterCommitMessage]]. [[ForeachWriter]] implementations have no global coordination. + */ case object ForeachWriterCommitMessage extends WriterCommitMessage From 4dfe57dc9364752988f9f0ed6e00c3a1d4d39e83 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 8 Feb 2018 18:34:20 -0800 Subject: [PATCH 4/5] continuous processing working --- .../sql/execution/streaming/ForeachSink.scala | 9 ++- .../execution/streaming/StreamExecution.scala | 1 + .../streaming/ForeachSinkSuite.scala | 62 ++++++++++++++++++- 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index b2994ef23a39c..d4f5511792853 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -134,7 +134,6 @@ class ForeachDataWriter[T : Encoder]( case OPENED_SKIP_PROCESSING => () case CLOSED => // First record of a new epoch, so we need to open a new writer for it. - currentEpochId += 1 openAndSetState(currentEpochId) writer.process(encoder.fromRow(record)) } @@ -146,7 +145,13 @@ class ForeachDataWriter[T : Encoder]( } override def commit(): WriterCommitMessage = { - writer.close(null) + // Close if the writer got opened for this epoch. + state match { + case CLOSED => () + case _ => writer.close(null) + } + state = CLOSED + currentEpochId += 1 ForeachWriterCommitMessage } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index e7982d7880ceb..204a5d227a4a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -565,6 +565,7 @@ abstract class StreamExecution( object StreamExecution { val QUERY_ID_KEY = "sql.streaming.queryId" + val BATCH_ID_KEY = "sql.streaming.batchId" } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index e361f51dcc3d7..f8fa47730451a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.Serializable import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.mutable @@ -26,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.functions.{count, window} -import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest, Trigger} import org.apache.spark.sql.test.SharedSQLContext class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { @@ -281,8 +282,67 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf "writer was reused!") } } + + testQuietly("foreach sink for continuous query") { + withTempDir { checkpointDir => + val query = spark.readStream + .format("rate") + .option("numPartitions", "1") + .option("rowsPerSecond", "5") + .load() + .select('value.cast("INT")) + .map(r => r.getInt(0)) + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Continuous(500)) + .foreach(new TestForeachWriter with Serializable { + override def process(value: Int): Unit = { + super.process(this.hashCode()) + } + }).start() + try { + // Wait until we get 3 epochs with at least 3 events in them. This means we'll see + // open, close, and at least 1 process. + eventually(timeout(streamingTimeout)) { + // Check + assert(ForeachSinkSuite.allEvents().count(_.size >= 3) === 3) + } + + val allEvents = ForeachSinkSuite.allEvents().filter(_.size >= 3) + // Check open and close events. + allEvents(0).head match { + case ForeachSinkSuite.Open(0, _) => + case e => assert(false, s"unexpected event $e") + } + allEvents(1).head match { + case ForeachSinkSuite.Open(0, _) => + case e => assert(false, s"unexpected event $e") + } + allEvents(2).head match { + case ForeachSinkSuite.Open(0, _) => + case e => assert(false, s"unexpected event $e") + } + assert(allEvents(0).last == ForeachSinkSuite.Close(None)) + assert(allEvents(1).last == ForeachSinkSuite.Close(None)) + assert(allEvents(2).last == ForeachSinkSuite.Close(None)) + + // Check the first Process event in each epoch, and also check the writer IDs + // we packed in to make sure none got reused. + val writerIds = (0 to 2).map { i => + allEvents(i)(1).asInstanceOf[ForeachSinkSuite.Process[Int]].value + } + assert( + writerIds.toSet.size == 3, + s"writer was reused! expected 3 unique writers but saw $writerIds") + } finally { + query.stop() + } + } + } } + + /** A global object to collect events in the executor */ object ForeachSinkSuite { From a33a35ccbae7350519a3faf8d5d3d6f35692feb3 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 8 Feb 2018 18:36:25 -0800 Subject: [PATCH 5/5] rm spurious changes --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 1 - .../apache/spark/sql/execution/streaming/ForeachSinkSuite.scala | 2 -- 2 files changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 204a5d227a4a1..e7982d7880ceb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -565,7 +565,6 @@ abstract class StreamExecution( object StreamExecution { val QUERY_ID_KEY = "sql.streaming.queryId" - val BATCH_ID_KEY = "sql.streaming.batchId" } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index f8fa47730451a..3e79fd4166288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -341,8 +341,6 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf } } - - /** A global object to collect events in the executor */ object ForeachSinkSuite {