diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 1c53c65338324..3832905576c9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1360,6 +1360,84 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-36533: Trigger.AvailableNow - checkpointing") { + withTempDirs { (src, target) => + val checkpoint = new File(target, "chk").getCanonicalPath + var lastFileModTime: Option[Long] = None + + /** Create a text file with a single data item */ + def createFile(data: Int): File = { + val file = stringToFile(new File(src, s"$data.txt"), data.toString) + if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000) + lastFileModTime = Some(file.lastModified) + file + } + + createFile(1) + createFile(2) + createFile(3) + + // Set up a query to read text files one at a time + val df = spark + .readStream + .option("maxFilesPerTrigger", 1) + .text(src.getCanonicalPath) + + def startTriggerOnceQuery(): StreamingQuery = { + df.writeStream + .foreachBatch((_: Dataset[Row], _: Long) => {}) + .trigger(Trigger.Once) + .option("checkpointLocation", checkpoint) + .start() + } + + // run a query with Trigger.Once first + val q = startTriggerOnceQuery() + + try { + assert(q.awaitTermination(streamingTimeout.toMillis)) + } finally { + q.stop() + } + + var index = 3 // We have processed the first 3 rows in the first query + def startTriggerAvailableNowQuery(): StreamingQuery = { + df.writeStream + .foreachBatch((df: Dataset[Row], _: Long) => { + index += 1 + checkAnswer(df, Row(index.toString)) + }) + .trigger(Trigger.AvailableNow) + .option("checkpointLocation", checkpoint) + .start() + } + + createFile(4) + createFile(5) + + // run a second query with Trigger.AvailableNow + val q2 = startTriggerAvailableNowQuery() + try { + assert(q2.awaitTermination(streamingTimeout.toMillis)) + assert(index == 5) + } finally { + q2.stop() + } + + createFile(6) + createFile(7) + + // run a third query with Trigger.AvailableNow + val q3 = startTriggerAvailableNowQuery() + try { + assert(q3.awaitTermination(streamingTimeout.toMillis)) + assert(index == 7) + } finally { + q3.stop() + } + } + } + test("explain") { withTempDirs { case (src, tmp) => src.mkdirs()