Skip to content

Commit

Permalink
Added progress to termination event
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Nov 29, 2016
1 parent 32ff04e commit d6200d1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class StreamExecution(
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}

postEvent(new QueryStartedEvent(id)) // Assumption: Does not throw exception.
postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception.

// Unblock starting thread
startLatch.countDown()
Expand Down Expand Up @@ -259,7 +259,7 @@ class StreamExecution(
// Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString)))
new QueryTerminatedEvent(lastProgress, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ object StreamingQueryListener {
* @since 2.1.0
*/
@Experimental
class QueryStartedEvent private[sql](val id: UUID) extends Event
class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event

/**
* :: Experimental ::
Expand All @@ -100,11 +100,13 @@ object StreamingQueryListener {
* :: Experimental ::
* Event representing that termination of a query.
*
* @param id The unique id of the query that terminated.
* @param exception The exception message of the [[StreamingQuery]] if the query was terminated
* @param lastProgress The last progress the query made before it was terminated.
* @param exception The exception message of the query if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @since 2.1.0
*/
@Experimental
class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event
class QueryTerminatedEvent private[sql](
val lastProgress: StreamingQueryProgress,
val exception: Option[String]) extends Event
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import java.util.UUID
import scala.collection.mutable

import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester._

import org.apache.spark.SparkException
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.util.{JsonProtocol, ManualClock}
import org.apache.spark.util.JsonProtocol

class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

Expand All @@ -50,21 +52,26 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
val df = inputData.toDS().as[Long].map { 10 / _ }
val listener = new EventCollector
try {
// No events until started
spark.streams.addListener(listener)
assert(listener.startEvent === null)
assert(listener.progressEvents.isEmpty)
assert(listener.terminationEvent === null)

testStream(df, OutputMode.Append)(

// Start event generated when query started
StartStream(ProcessingTime(100), triggerClock = clock),
AssertOnQuery(query => {
AssertOnQuery { query =>
assert(listener.startEvent !== null)
assert(listener.startEvent.id === query.id)
assert(listener.startEvent.name === query.name)
assert(listener.progressEvents.isEmpty)
assert(listener.terminationEvent === null)
true
}),
},

// Progress event generated when data processed
AddData(inputData, 1, 2),
AdvanceManualClock(100),
CheckAnswer(10, 5),
Expand All @@ -75,23 +82,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
true
},

// Termination event generated when stopped cleanly
StopStream,
AssertOnQuery { query =>
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.id === query.id)
assert(listener.terminationEvent.exception === None)
eventually(Timeout(streamingTimeout)) {
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.lastProgress === query.lastProgress)
assert(listener.terminationEvent.exception === None)
}
listener.checkAsyncErrors()
listener.reset()
true
},

// Termination event generated with exception message when stopped with error
StartStream(ProcessingTime(100), triggerClock = clock),
AddData(inputData, 0),
AdvanceManualClock(100),
ExpectFailure[SparkException],
AssertOnQuery { query =>
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.id === query.id)
assert(listener.terminationEvent.lastProgress === query.lastProgress)
assert(listener.terminationEvent.exception.nonEmpty)
listener.checkAsyncErrors()
true
Expand All @@ -100,7 +111,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
} finally {
spark.streams.removeListener(listener)
}

}

test("adding and removing listener") {
Expand Down Expand Up @@ -170,7 +180,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}

test("QueryStartedEvent serialization") {
val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID())
val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
val json = JsonProtocol.sparkEventToJson(queryStarted)
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryStartedEvent]
Expand All @@ -182,17 +192,17 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
val json = JsonProtocol.sparkEventToJson(event)
val newEvent = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
assert(event.progress.jsonValue === newEvent.progress.jsonValue)
assert(event.progress.json === newEvent.progress.json)
}

test("QueryTerminatedEvent serialization") {
val exception = new RuntimeException("exception")
val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
UUID.randomUUID(), Some(exception.getMessage))
StreamingQueryProgressSuite.testProgress, Some(exception.getMessage))
val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
assert(queryQueryTerminated.id === newQueryTerminated.id)
assert(queryQueryTerminated.lastProgress.json === newQueryTerminated.lastProgress.json)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}

Expand Down

0 comments on commit d6200d1

Please sign in to comment.