Skip to content

Commit

Permalink
Add a unit test for checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
bozhang2820 committed Aug 31, 2021
1 parent 62681ca commit 937dbe9
Showing 1 changed file with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,84 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-36533: Trigger.AvailableNow - checkpointing") {
withTempDirs { (src, target) =>
val checkpoint = new File(target, "chk").getCanonicalPath
var lastFileModTime: Option[Long] = None

/** Create a text file with a single data item */
def createFile(data: Int): File = {
val file = stringToFile(new File(src, s"$data.txt"), data.toString)
if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
lastFileModTime = Some(file.lastModified)
file
}

createFile(1)
createFile(2)
createFile(3)

// Set up a query to read text files one at a time
val df = spark
.readStream
.option("maxFilesPerTrigger", 1)
.text(src.getCanonicalPath)

def startTriggerOnceQuery(): StreamingQuery = {
df.writeStream
.foreachBatch((_: Dataset[Row], _: Long) => {})
.trigger(Trigger.Once)
.option("checkpointLocation", checkpoint)
.start()
}

// run a query with Trigger.Once first
val q = startTriggerOnceQuery()

try {
assert(q.awaitTermination(streamingTimeout.toMillis))
} finally {
q.stop()
}

var index = 3 // We have processed the first 3 rows in the first query
def startTriggerAvailableNowQuery(): StreamingQuery = {
df.writeStream
.foreachBatch((df: Dataset[Row], _: Long) => {
index += 1
checkAnswer(df, Row(index.toString))
})
.trigger(Trigger.AvailableNow)
.option("checkpointLocation", checkpoint)
.start()
}

createFile(4)
createFile(5)

// run a second query with Trigger.AvailableNow
val q2 = startTriggerAvailableNowQuery()
try {
assert(q2.awaitTermination(streamingTimeout.toMillis))
assert(index == 5)
} finally {
q2.stop()
}

createFile(6)
createFile(7)

// run a third query with Trigger.AvailableNow
val q3 = startTriggerAvailableNowQuery()
try {
assert(q3.awaitTermination(streamingTimeout.toMillis))
assert(index == 7)
} finally {
q3.stop()
}
}
}

test("explain") {
withTempDirs { case (src, tmp) =>
src.mkdirs()
Expand Down

0 comments on commit 937dbe9

Please sign in to comment.