Skip to content

Commit

Permalink
[SPARK-18516][SQL] Split state and progress in streaming
Browse files Browse the repository at this point in the history
This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
 - `recentProgress` - an array of statistics about the most recent microbatches that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
    "triggerExecution" : 276,
    "queryPlanning" : 3,
    "getBatch" : 5,
    "getOffset" : 3,
    "addBatch" : 234,
    "walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-14]]",
    "startOffset" : {
      "topic-14" : {
        "2" : 0,
        "4" : 1,
        "1" : 0,
        "3" : 0,
        "0" : 0
      }
    },
    "endOffset" : {
      "topic-14" : {
        "2" : 1,
        "4" : 2,
        "1" : 0,
        "3" : 0,
        "0" : 1
      }
    },
    "numRecords" : 3,
    "inputRowsPerSecond" : 230.76923076923077,
    "processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.

Author: Tathagata Das <[email protected]>
Author: Michael Armbrust <[email protected]>

Closes #15954 from marmbrus/queryProgress.

(cherry picked from commit c3d08e2)
Signed-off-by: Michael Armbrust <[email protected]>
  • Loading branch information
tdas authored and marmbrus committed Nov 30, 2016
1 parent 045ae29 commit 28b57c8
Show file tree
Hide file tree
Showing 26 changed files with 1,087 additions and 1,752 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest {

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = ProcessingTime(1)),
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnLastQueryStatus { status =>
assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
assert(status.sourceStatuses(0).processingRate > 0.0)
AssertOnQuery { query =>
val recordsRead = query.recentProgresses.map(_.numInputRows).sum
recordsRead == 3
}
)
}
Expand Down
11 changes: 11 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),

// [SPARK-18516][SQL] Split state and progress in streaming
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SourceStatus"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SinkStatus"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sinkStatus"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),

// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"),
Expand Down
Loading

0 comments on commit 28b57c8

Please sign in to comment.