Skip to content

Commit

Permalink
Add test for job cancellation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kunalkhamar committed Apr 27, 2017
1 parent 7a58547 commit 6ab66e2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ class StreamExecution(
*/
private def runBatches(): Unit = {
try {
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString)
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
interruptOnCancel = true)
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
Expand Down Expand Up @@ -290,6 +291,7 @@ class StreamExecution(
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionToRunBatches)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
Expand Down Expand Up @@ -420,7 +422,6 @@ class StreamExecution(
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
Expand Down Expand Up @@ -466,7 +467,6 @@ class StreamExecution(
}
}
currentBatchId = latestCommittedBatchId + 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
constructNextBatch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.util.control.ControlThrowable

import org.apache.commons.io.FileUtils

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.ExplainCommand
Expand Down Expand Up @@ -528,6 +530,38 @@ class StreamSuite extends StreamTest {
assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
}
}

test("batch id is updated correctly in the job description") {
def containsNameAndBatch(desc: String, name: String, batch: Integer): Boolean = {
desc.contains(name) && desc.contains(s"batch = $batch")
}

@volatile var jobDescription: String = null
spark.sparkContext.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobDescription = jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)
}
})

val input = MemoryStream[Int]
val query = input
.toDS()
.map(_ + 1)
.writeStream
.format("memory")
.queryName("memStream")
.start()

input.addData(1)
query.processAllAvailable()
assert(Option(jobDescription).exists(containsNameAndBatch(_, "memStream", 0)))
input.addData(2, 3)
query.processAllAvailable()
assert(Option(jobDescription).exists(containsNameAndBatch(_, "memStream", 1)))
input.addData(4)
query.processAllAvailable()
assert(Option(jobDescription).exists(containsNameAndBatch(_, "memStream", 2)))
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down

0 comments on commit 6ab66e2

Please sign in to comment.