Skip to content

Commit

Permalink
Removed progress from termination event
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Nov 29, 2016
1 parent 8bd49b7 commit d9d8f82
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class StreamExecution(
// Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
new QueryTerminatedEvent(lastProgress, exception.map(_.cause).map(Utils.exceptionString)))
new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,11 @@ object StreamingQueryListener {
* :: Experimental ::
* Event representing that termination of a query.
*
* @param lastProgress The last progress the query made before it was terminated.
* @param id The query id.
* @param exception The exception message of the query if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @since 2.1.0
*/
@Experimental
class QueryTerminatedEvent private[sql](
val lastProgress: StreamingQueryProgress,
val exception: Option[String]) extends Event
class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.lastProgress === query.lastProgress)
assert(listener.terminationEvent.id === query.id)
assert(listener.terminationEvent.exception === None)
}
listener.checkAsyncErrors()
Expand All @@ -102,7 +102,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
ExpectFailure[SparkException],
AssertOnQuery { query =>
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.lastProgress === query.lastProgress)
assert(listener.terminationEvent.id === query.id)
assert(listener.terminationEvent.exception.nonEmpty)
listener.checkAsyncErrors()
true
Expand Down Expand Up @@ -198,11 +198,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryTerminatedEvent serialization") {
val exception = new RuntimeException("exception")
val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
StreamingQueryProgressSuite.testProgress, Some(exception.getMessage))
UUID.randomUUID, Some(exception.getMessage))
val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
assert(queryQueryTerminated.lastProgress.json === newQueryTerminated.lastProgress.json)
assert(queryQueryTerminated.id === newQueryTerminated.id)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}

Expand Down

0 comments on commit d9d8f82

Please sign in to comment.