Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18516][SQL] Split state and progress in streaming #15954

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,13 @@ class KafkaSourceSuite extends KafkaSourceTest {

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = ProcessingTime(1)),
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnLastQueryStatus { status =>
assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
assert(status.sourceStatuses(0).processingRate > 0.0)
AssertOnQuery { query =>
val recordsRead = query.recentProgress.map(_.numRecords).sum
recordsRead == 3
}
)
}
Expand Down
9 changes: 9 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
// [SPARK-18516][SQL] Split state and progress in streaming
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SourceStatus"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sinkStatus"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),

// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),
Expand Down
318 changes: 14 additions & 304 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#

import sys
import json

if sys.version >= '3':
intlike = int
basestring = unicode = str
Expand Down Expand Up @@ -48,10 +50,9 @@ def __init__(self, jsq):
@property
@since(2.0)
def id(self):
"""The id of the streaming query. This id is unique across all queries that have been
started in the current process.
"""The id of the streaming query.
"""
return self._jsq.id()
return self._jsq.id().toString()

@property
@since(2.0)
Expand Down Expand Up @@ -87,6 +88,16 @@ def awaitTermination(self, timeout=None):
else:
return self._jsq.awaitTermination()

def recentProgress(self):
"""Returns the most recent statistics on progress that has been made in this streaming
query.
>>> sq = sdf.writeStream.format('memory').queryName('progress_query').start()
>>> sq.recentProgress()
>>> sq.stop()
"""

return [json.loads(p.json()) for p in self._jsq.recentProgress()]

@since(2.0)
def processAllAvailable(self):
"""Blocks until all available data in the source has been processed and committed to the
Expand Down Expand Up @@ -147,8 +158,6 @@ def get(self, id):
True
>>> sq.stop()
"""
if not isinstance(id, intlike):
raise ValueError("The id for the query must be an integer. Got: %s" % id)
return StreamingQuery(self._jsqm.get(id))

@since(2.0)
Expand Down Expand Up @@ -189,303 +198,6 @@ def resetTerminated(self):
self._jsqm.resetTerminated()


class StreamingQueryStatus(object):
"""A class used to report information about the progress of a StreamingQuery.

.. note:: Experimental

.. versionadded:: 2.1
"""

def __init__(self, jsqs):
self._jsqs = jsqs

def __str__(self):
"""
Pretty string of this query status.

>>> print(sqs)
Status of query 'query'
Query id: 1
Status timestamp: 123
Input rate: 15.5 rows/sec
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
Committed offsets: [1, -]
"""
return self._jsqs.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def name(self):
"""
Name of the query. This name is unique across all active queries.

>>> sqs.name
u'query'
"""
return self._jsqs.name()

@property
@since(2.1)
def id(self):
"""
Id of the query. This id is unique across all queries that have been started in
the current process.

>>> int(sqs.id)
1
"""
return self._jsqs.id()

@property
@since(2.1)
def timestamp(self):
"""
Timestamp (ms) of when this query was generated.

>>> int(sqs.timestamp)
123
"""
return self._jsqs.timestamp()

@property
@since(2.1)
def inputRate(self):
"""
Current total rate (rows/sec) at which data is being generated by all the sources.

>>> sqs.inputRate
15.5
"""
return self._jsqs.inputRate()

@property
@since(2.1)
def processingRate(self):
"""
Current rate (rows/sec) at which the query is processing data from all the sources.

>>> sqs.processingRate
23.5
"""
return self._jsqs.processingRate()

@property
@since(2.1)
def latency(self):
"""
Current average latency between the data being available in source and the sink
writing the corresponding output.

>>> sqs.latency
345.0
"""
if (self._jsqs.latency().nonEmpty()):
return self._jsqs.latency().get()
else:
return None

@property
@ignore_unicode_prefix
@since(2.1)
def sourceStatuses(self):
"""
Current statuses of the sources as a list.

>>> len(sqs.sourceStatuses)
1
>>> sqs.sourceStatuses[0].description
u'MySource1'
"""
return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]

@property
@ignore_unicode_prefix
@since(2.1)
def sinkStatus(self):
"""
Current status of the sink.

>>> sqs.sinkStatus.description
u'MySink'
"""
return SinkStatus(self._jsqs.sinkStatus())

@property
@ignore_unicode_prefix
@since(2.1)
def triggerDetails(self):
"""
Low-level details of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).

If no trigger is currently active, then it will have details of the last completed trigger.

>>> sqs.triggerDetails
{u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
u'isDataPresentInTrigger': u'true'}
"""
return self._jsqs.triggerDetails()


class SourceStatus(object):
"""
Status and metrics of a streaming Source.

.. note:: Experimental

.. versionadded:: 2.1
"""

def __init__(self, jss):
self._jss = jss

def __str__(self):
"""
Pretty string of this source status.

>>> print(sqs.sourceStatuses[0])
Status of source MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
"""
return self._jss.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def description(self):
"""
Description of the source corresponding to this status.

>>> sqs.sourceStatuses[0].description
u'MySource1'
"""
return self._jss.description()

@property
@ignore_unicode_prefix
@since(2.1)
def offsetDesc(self):
"""
Description of the current offset if known.

>>> sqs.sourceStatuses[0].offsetDesc
u'0'
"""
return self._jss.offsetDesc()

@property
@since(2.1)
def inputRate(self):
"""
Current rate (rows/sec) at which data is being generated by the source.

>>> sqs.sourceStatuses[0].inputRate
15.5
"""
return self._jss.inputRate()

@property
@since(2.1)
def processingRate(self):
"""
Current rate (rows/sec) at which the query is processing data from the source.

>>> sqs.sourceStatuses[0].processingRate
23.5
"""
return self._jss.processingRate()

@property
@ignore_unicode_prefix
@since(2.1)
def triggerDetails(self):
"""
Low-level details of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).

If no trigger is currently active, then it will have details of the last completed trigger.

>>> sqs.sourceStatuses[0].triggerDetails
{u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
u'latency.getBatch.source': u'20'}
"""
return self._jss.triggerDetails()


class SinkStatus(object):
"""
Status and metrics of a streaming Sink.

.. note:: Experimental

.. versionadded:: 2.1
"""

def __init__(self, jss):
self._jss = jss

def __str__(self):
"""
Pretty string of this source status.

>>> print(sqs.sinkStatus)
Status of sink MySink
Committed offsets: [1, -]
"""
return self._jss.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def description(self):
"""
Description of the source corresponding to this status.

>>> sqs.sinkStatus.description
u'MySink'
"""
return self._jss.description()

@property
@ignore_unicode_prefix
@since(2.1)
def offsetDesc(self):
"""
Description of the current offsets up to which data has been written by the sink.

>>> sqs.sinkStatus.offsetDesc
u'[1, -]'
"""
return self._jss.offsetDesc()


class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.

Expand Down Expand Up @@ -1051,8 +763,6 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
globs['sqs'] = StreamingQueryStatus(
spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())

(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
Expand Down
Loading