Skip to content

Commit

Permalink
Add createJobStart to eliminate duplicate codes
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 29, 2015
1 parent 9a3083d commit b380cfb
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class UISeleniumSuite
findAll(cssSelector(""".progress-cell""")).map(_.text).toSeq should be
(List("1/1", "1/1", "1/1", "0/1 (1 failed)"))

// Check stacktrack
// Check stacktrace
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.text).toSeq
errorCells should have size 1
errorCells(0) should include("java.lang.RuntimeException: Oops")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val input = (1 to 4).map(Seq(_)).toSeq
val operation = (d: DStream[Int]) => d.map(x => x)

private def createJobStart(
batchTime: Time, outputOpId: Int, jobId: Int): SparkListenerJobStart = {
val properties = new Properties()
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, batchTime.milliseconds.toString)
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, outputOpId.toString)
SparkListenerJobStart(jobId = jobId,
0L, // unused
Nil, // unused
properties)
}

override def batchDuration: Duration = Milliseconds(100)

test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
Expand Down Expand Up @@ -69,40 +80,16 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalReceivedRecords should be (600)

// onJobStart
val properties1 = new Properties()
properties1.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
properties1.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 0.toString)
val jobStart1 = SparkListenerJobStart(jobId = 0,
0L, // unused
Nil, // unused
properties1)
val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
listener.onJobStart(jobStart1)

val properties2 = new Properties()
properties2.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
properties2.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 0.toString)
val jobStart2 = SparkListenerJobStart(jobId = 1,
0L, // unused
Nil, // unused
properties2)
val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart2)

val properties3 = new Properties()
properties3.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
properties3.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 1.toString)
val jobStart3 = SparkListenerJobStart(jobId = 0,
0L, // unused
Nil, // unused
properties3)
val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
listener.onJobStart(jobStart3)

val properties4 = new Properties()
properties4.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
properties4.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 1.toString)
val jobStart4 = SparkListenerJobStart(jobId = 1,
0L, // unused
Nil, // unused
properties4)
val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
listener.onJobStart(jobStart4)

val batchUIData = listener.getBatchUIData(Time(1000))
Expand Down Expand Up @@ -172,7 +159,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalCompletedBatches should be(limit + 10)
}

test("disorder onJobStart and onBatchXXX") {
test("out-of-order onJobStart and onBatchXXX") {
val ssc = setupStreams(input, operation)
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
val listener = new StreamingJobProgressListener(ssc)
Expand All @@ -181,25 +168,13 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
for(i <- 0 until limit) {
val batchInfoCompleted =
BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
val properties = new Properties()
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + i * 100).toString)
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
val jobStart = SparkListenerJobStart(jobId = 1,
0L, // unused
Nil, // unused
properties)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart)
}

// onJobStart happens before onBatchSubmitted
val properties = new Properties()
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + limit * 100).toString)
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
val jobStart = SparkListenerJobStart(jobId = 0,
0L, // unused
Nil, // unused
properties)
val jobStart = createJobStart(Time(1000 + limit * 100), outputOpId = 0, jobId = 0)
listener.onJobStart(jobStart)

val batchInfoSubmitted =
Expand All @@ -225,13 +200,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
}

for(i <- limit + 1 to limit * 2) {
val properties = new Properties()
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + i * 100).toString)
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
val jobStart = SparkListenerJobStart(jobId = 1,
0L, // unused
Nil, // unused
properties)
val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart)
}

Expand Down

0 comments on commit b380cfb

Please sign in to comment.