-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18516][SQL] Split state and progress in streaming #15954
Conversation
/cc @tdas |
Test build #68916 has finished for PR 15954 at commit
|
Test build #68923 has finished for PR 15954 at commit
|
Test build #68944 has finished for PR 15954 at commit
|
Test build #68945 has finished for PR 15954 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Round 1 of comments. I am still looking.
*/ | ||
@deprecated("use status.sourceStatuses", "2.0.2") | ||
def sourceStatuses: Array[SourceStatus] | ||
def recentProgress: Array[StreamingQueryProgress] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt this be recentProgresses
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, yeah maybe. Its not clear to me that progress
is inherently singular and progresses
is kind of a mouthful. It is maybe nice for Array
s to always be plural though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well. in the name StreamingQueryProgress we have effectively defined that "progress" means data from one trigger (i.e. singular). So I think its better to be progresses
.
|
||
/** | ||
* :: Experimental :: | ||
* Reports metrics on data being read from a given streaming source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should say that this information related a trigger where progress was made in processing data from sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can copy the docs from the main class: Each event relates to processing done for a single trigger of the streaming query.
* @param description Description of the source. | ||
* @param startOffset The starting offset for data being read. | ||
* @param endOffset The ending offset for data being read. | ||
* @param numRecords The number of records read from this source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this is the numrecords read from this source since the beginning or in the last trigger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we update the docs as you suggest above this will be clear.
* Holds statistics about state that is being stored for a given streaming query. | ||
*/ | ||
@Experimental | ||
class StateOperator private[sql]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StateOperator -> StateOperatorProgress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
*/ | ||
@Experimental | ||
class StateOperator private[sql]( | ||
val numEntries: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numEntries -> numTotal
would make it more consistent with numUpdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, needs docs to make it clear that numUpdated
is with reference to the last progress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to docs. I think numTotal
is less clear. Total of what? It is a count of the number of entries that the state store is holding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question then applies to numUpdated as well.
How about numRowsTotal, numRowsUpdated?
Then in future we could add sizeBytesTotal, sizeBytesUpdated, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those sound good.
Test build #68972 has finished for PR 15954 at commit
|
Test build #69276 has finished for PR 15954 at commit
|
Test build #69275 has finished for PR 15954 at commit
|
Test build #69278 has finished for PR 15954 at commit
|
Test build #69295 has started for PR 15954 at commit |
Test build #69293 has finished for PR 15954 at commit
|
Test build #69288 has finished for PR 15954 at commit
|
Test build #69290 has finished for PR 15954 at commit
|
Test build #69308 has started for PR 15954 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good! Thanks for your help getting the tests back in shape. I think the only important question to answer before we can merge is the contents of QueryTerminatedEvent
.
Returns the most recent :class:`StreamingQueryProgress` update of this streaming query. | ||
:return: a map | ||
""" | ||
return json.loads(self._jsq.lastProgress().toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use json
as above instead of relying on the fact that the toString
is json
.
* all queries that have been started in the current process. | ||
* @since 2.0.0 | ||
* Returns the unique id of this query. An id is tied to the checkpoint location and will | ||
* be the same across restarts of a given streaming query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should fix the TODO earlier, or remove this promise for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
@@ -51,7 +53,7 @@ trait StreamingQuery { | |||
def sparkSession: SparkSession | |||
|
|||
/** | |||
* Whether the query is currently active or not | |||
* Returns `true` if this query is actively running. | |||
* @since 2.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: other places have a blank line before @since
*/ | ||
@Experimental | ||
class QueryTerminatedEvent private[sql]( | ||
val queryStatus: StreamingQueryStatus, | ||
val exception: Option[String]) extends Event | ||
val lastProgress: StreamingQueryProgress, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this if no progress is ever made? null
? I would consider leaving this just the id
, because otherwise if the query dies before progress is made, now you can't get the id
at all.
|
||
object StreamingQueryManager { | ||
private val _nextId = new AtomicLong(0) | ||
def nextId: Long = _nextId.getAndIncrement() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
@@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { | |||
} | |||
} | |||
} | |||
|
|||
object StreamingQueryManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you tell me why was this moved from the StreamExecution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it private, made I feel this could have stayed in object StreamExecution
|
||
/** | ||
* :: Experimental :: | ||
* Statistics about updates made to a stateful operators in a [[StreamingQuery]] in a trigger. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: during a trigger?
* Statistics about updates made to a stateful operators in a [[StreamingQuery]] in a trigger. | ||
*/ | ||
@Experimental | ||
class StateOperatorProgress private[sql]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move SourceProgress
here or put StateOperatorProgress
in its own file. We might also consider putting them all in org.apache.spark.sql.streaming.progress
, but there might not be time.
val id: UUID, | ||
val name: String, | ||
val timestamp: Long, | ||
val batchId: Long, // TODO: epoch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably will not do this TODO
.
|
||
class QueryStatusCollector extends StreamingQueryListener { | ||
/** Collects events from the StreamingQueryListener for testing */ | ||
class EventCollector extends StreamingQueryListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be an inner class of StreamTest
. This file is pretty long/complicated as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, it was being used in multiple files at some point so I put it here. but not any more. I will put it only in StreamingQueryListener.
Test build #69352 has finished for PR 15954 at commit
|
Test build #69353 has finished for PR 15954 at commit
|
*/ | ||
@deprecated("use status.sourceStatuses", "2.0.2") | ||
def sourceStatuses: Array[SourceStatus] | ||
def recentProgresses: Array[StreamingQueryProgress] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these for the last n
triggers? Or is it last n
instantaneous progress updates, e.g. finished reading from a source etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last n
triggers.
* | ||
* @since 2.1.0 | ||
*/ | ||
def get(id: String): StreamingQuery = get(UUID.fromString(id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this I guess we can't provide API's for get(name: String)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think thats okay. A globally unique ID is a better identifier.
Test build #69346 has finished for PR 15954 at commit
|
@@ -33,25 +35,27 @@ trait StreamingQuery { | |||
* Returns the name of the query. This name is unique across all active queries. This can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this doc still true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. it is.
Test build #69355 has finished for PR 15954 at commit
|
LGTM, merging to master and 2.1 |
This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das <[email protected]> Author: Michael Armbrust <[email protected]> Closes #15954 from marmbrus/queryProgress. (cherry picked from commit c3d08e2) Signed-off-by: Michael Armbrust <[email protected]>
…erver ## What changes were proposed in this pull request? As `queryStatus` in StreamingQueryListener events was removed in apache#15954, parsing 2.0.2 structured streaming logs will throw the following errror: ``` [info] com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"]) [info] at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"]) [info] at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51) [info] at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839) [info] at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099) ... ``` This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs. ## How was this patch tested? `query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error. Author: Shixiong Zhu <[email protected]> Closes apache#16085 from zsxwing/SPARK-18655.
…erver ## What changes were proposed in this pull request? As `queryStatus` in StreamingQueryListener events was removed in #15954, parsing 2.0.2 structured streaming logs will throw the following errror: ``` [info] com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"]) [info] at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"]) [info] at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51) [info] at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839) [info] at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099) ... ``` This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs. ## How was this patch tested? `query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error. Author: Shixiong Zhu <[email protected]> Closes #16085 from zsxwing/SPARK-18655. (cherry picked from commit c4979f6) Signed-off-by: Shixiong Zhu <[email protected]>
This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das <[email protected]> Author: Michael Armbrust <[email protected]> Closes apache#15954 from marmbrus/queryProgress.
…erver ## What changes were proposed in this pull request? As `queryStatus` in StreamingQueryListener events was removed in apache#15954, parsing 2.0.2 structured streaming logs will throw the following errror: ``` [info] com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"]) [info] at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"]) [info] at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51) [info] at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839) [info] at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099) ... ``` This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs. ## How was this patch tested? `query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error. Author: Shixiong Zhu <[email protected]> Closes apache#16085 from zsxwing/SPARK-18655.
This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das <[email protected]> Author: Michael Armbrust <[email protected]> Closes apache#15954 from marmbrus/queryProgress.
This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das <[email protected]> Author: Michael Armbrust <[email protected]> Closes apache#15954 from marmbrus/queryProgress.
…erver ## What changes were proposed in this pull request? As `queryStatus` in StreamingQueryListener events was removed in apache#15954, parsing 2.0.2 structured streaming logs will throw the following errror: ``` [info] com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"]) [info] at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"]) [info] at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51) [info] at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839) [info] at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099) ... ``` This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs. ## How was this patch tested? `query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error. Author: Shixiong Zhu <[email protected]> Closes apache#16085 from zsxwing/SPARK-18655.
This PR separates the status of a
StreamingQuery
into two separate APIs:status
- describes the status of aStreamingQuery
at this moment, including what phase of processing is currently happening and if data is available.recentProgress
- an array of statistics about the most recent microbatches that have executed.A recent progress contains the following information:
Additionally, in order to make it possible to correlate progress updates across restarts, we change the
id
field from an integer that is unique with in the JVM to aUUID
that is globally unique.