Skip to content

Commit

Permalink
Remove test for checkpointing in TriggerAvailableNowSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
bozhang2820 committed Aug 31, 2021
1 parent 937dbe9 commit c96d60d
Showing 1 changed file with 18 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,20 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest {

val df2 = testSource.toDF

def startQuery(trigger: Trigger): StreamingQuery = {
def startQuery(): StreamingQuery = {
df1.union(df2).writeStream
.format("parquet")
.trigger(trigger)
.trigger(Trigger.AvailableNow)
.option("checkpointLocation", checkpoint)
.start(targetDir)
}

testSource.incrementAvailableOffset(3)
createFile(10)
createFile(11)
createFile(12)
createFile(7)
createFile(8)
createFile(9)

// run a query with Trigger.Once first
val q = startQuery(Trigger.Once)
val q = startQuery()

try {
assert(q.awaitTermination(streamingTimeout.toMillis))
Expand All @@ -154,49 +153,29 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest {
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
}
checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"),
Seq(1, 2, 3, 10, 11, 12).map(_.toString).toDF())
Seq(1, 2, 3, 7, 8, 9).map(_.toString).toDF())
} finally {
q.stop()
}

testSource.incrementAvailableOffset(3)
createFile(13)
createFile(14)
createFile(15)
createFile(10)
createFile(11)
createFile(12)

// run a second query with Trigger.AvailableNow
val q2 = startQuery(Trigger.AvailableNow)
// run a second query
val q2 = startQuery()
try {
assert(q2.awaitTermination(streamingTimeout.toMillis))
// only one batch has data in both sources, thus counted, see SPARK-24050
assert(q2.recentProgress.count(_.numInputRows != 0) == 1)
q2.recentProgress.foreach { p =>
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
}
checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"),
Seq(1, 2, 3, 4, 5, 6, 10, 11, 12, 13, 14, 15).map(_.toString).toDF())
checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 12).map(_.toString).toDF())
} finally {
q2.stop()
}

testSource.incrementAvailableOffset(3)
createFile(16)
createFile(17)
createFile(18)

// run a third query with Trigger.AvailableNow again
val q3 = startQuery(Trigger.AvailableNow)
try {
assert(q3.awaitTermination(streamingTimeout.toMillis))
// only one batch has data in both sources, thus counted, see SPARK-24050
assert(q3.recentProgress.count(_.numInputRows != 0) == 1)
q3.recentProgress.foreach { p =>
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
}
checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 18).map(_.toString).toDF())
} finally {
q3.stop()
}
}
}
}
Expand All @@ -211,18 +190,17 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest {
withTable(tableName) {
val df = testSource.toDF

def startQuery(trigger: Trigger): StreamingQuery = {
def startQuery(): StreamingQuery = {
df.writeStream
.format("memory")
.queryName(tableName)
.trigger(trigger)
.trigger(Trigger.AvailableNow)
.start()
}

testSource.incrementAvailableOffset(3)

// run a query with Trigger.Once first
val q = startQuery(Trigger.Once)
val q = startQuery()

try {
assert(q.awaitTermination(streamingTimeout.toMillis))
Expand All @@ -237,8 +215,8 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest {

testSource.incrementAvailableOffset(3)

// run a second query with Trigger.AvailableNow
val q2 = startQuery(Trigger.AvailableNow)
// run a second query
val q2 = startQuery()
try {
assert(q2.awaitTermination(streamingTimeout.toMillis))
assert(q2.recentProgress.count(_.numInputRows != 0) == 1)
Expand All @@ -249,21 +227,6 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest {
} finally {
q2.stop()
}

testSource.incrementAvailableOffset(3)

// run a third query with Trigger.AvailableNow again
val q3 = startQuery(Trigger.AvailableNow)
try {
assert(q3.awaitTermination(streamingTimeout.toMillis))
assert(q3.recentProgress.count(_.numInputRows != 0) == 1)
q3.recentProgress.foreach { p =>
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
}
checkAnswer(spark.table(tableName), (1 to 9).toDF())
} finally {
q3.stop()
}
}
}
}
Expand Down

0 comments on commit c96d60d

Please sign in to comment.