Skip to content

Commit

Permalink
Add test for job cancellation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kunalkhamar committed Apr 26, 2017
1 parent bd13a01 commit 7a58547
Showing 1 changed file with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,34 @@ class StreamSuite extends StreamTest {
}
}
}

test("calling stop() on a query cancels related jobs") {
val input = MemoryStream[Int]
val query = input
.toDS()
.map { i =>
while (!org.apache.spark.TaskContext.get().isInterrupted()) {
// keep looping till interrupted by query.stop()
Thread.sleep(100)
}
i
}
.writeStream
.format("console")
.start()

input.addData(1)
// wait for jobs to start
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
}

query.stop()
// make sure jobs are stopped
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
}
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down

0 comments on commit 7a58547

Please sign in to comment.