Skip to content

Commit

Permalink
fix python
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Nov 21, 2016
1 parent 527c8d6 commit 213081a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
recordsRead == 3
}
)

}

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
Expand Down
319 changes: 14 additions & 305 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#

import sys
import json

if sys.version >= '3':
intlike = int
basestring = unicode = str
Expand Down Expand Up @@ -48,10 +50,9 @@ def __init__(self, jsq):
@property
@since(2.0)
def id(self):
"""The id of the streaming query. This id is unique across all queries that have been
started in the current process.
"""The id of the streaming query.
"""
return self._jsq.id()
return self._jsq.id().toString()

@property
@since(2.0)
Expand Down Expand Up @@ -87,6 +88,16 @@ def awaitTermination(self, timeout=None):
else:
return self._jsq.awaitTermination()

def recentProgress(self):
"""Returns the most recent statistics on progress that has been made in this streaming
query.
>>> sq = sdf.writeStream.format('memory').queryName('progress_query').start()
>>> sq.recentProgress()
>>> sq.stop()
"""

return [json.loads(p.json()) for p in self._jsq.recentProgress()]

@since(2.0)
def processAllAvailable(self):
"""Blocks until all available data in the source has been processed and committed to the
Expand Down Expand Up @@ -147,8 +158,6 @@ def get(self, id):
True
>>> sq.stop()
"""
if not isinstance(id, intlike):
raise ValueError("The id for the query must be an integer. Got: %s" % id)
return StreamingQuery(self._jsqm.get(id))

@since(2.0)
Expand Down Expand Up @@ -188,304 +197,6 @@ def resetTerminated(self):
"""
self._jsqm.resetTerminated()


class StreamingQueryStatus(object):
"""A class used to report information about the progress of a StreamingQuery.
.. note:: Experimental
.. versionadded:: 2.1
"""

def __init__(self, jsqs):
self._jsqs = jsqs

def __str__(self):
"""
Pretty string of this query status.
>>> print(sqs)
Status of query 'query'
Query id: 1
Status timestamp: 123
Input rate: 15.5 rows/sec
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
Committed offsets: [1, -]
"""
return self._jsqs.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def name(self):
"""
Name of the query. This name is unique across all active queries.
>>> sqs.name
u'query'
"""
return self._jsqs.name()

@property
@since(2.1)
def id(self):
"""
Id of the query. This id is unique across all queries that have been started in
the current process.
>>> int(sqs.id)
1
"""
return self._jsqs.id()

@property
@since(2.1)
def timestamp(self):
"""
Timestamp (ms) of when this query was generated.
>>> int(sqs.timestamp)
123
"""
return self._jsqs.timestamp()

@property
@since(2.1)
def inputRate(self):
"""
Current total rate (rows/sec) at which data is being generated by all the sources.
>>> sqs.inputRate
15.5
"""
return self._jsqs.inputRate()

@property
@since(2.1)
def processingRate(self):
"""
Current rate (rows/sec) at which the query is processing data from all the sources.
>>> sqs.processingRate
23.5
"""
return self._jsqs.processingRate()

@property
@since(2.1)
def latency(self):
"""
Current average latency between the data being available in source and the sink
writing the corresponding output.
>>> sqs.latency
345.0
"""
if (self._jsqs.latency().nonEmpty()):
return self._jsqs.latency().get()
else:
return None

@property
@ignore_unicode_prefix
@since(2.1)
def sourceStatuses(self):
"""
Current statuses of the sources as a list.
>>> len(sqs.sourceStatuses)
1
>>> sqs.sourceStatuses[0].description
u'MySource1'
"""
return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]

@property
@ignore_unicode_prefix
@since(2.1)
def sinkStatus(self):
"""
Current status of the sink.
>>> sqs.sinkStatus.description
u'MySink'
"""
return SinkStatus(self._jsqs.sinkStatus())

@property
@ignore_unicode_prefix
@since(2.1)
def triggerDetails(self):
"""
Low-level details of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).
If no trigger is currently active, then it will have details of the last completed trigger.
>>> sqs.triggerDetails
{u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
u'isDataPresentInTrigger': u'true'}
"""
return self._jsqs.triggerDetails()


class SourceStatus(object):
"""
Status and metrics of a streaming Source.
.. note:: Experimental
.. versionadded:: 2.1
"""

def __init__(self, jss):
self._jss = jss

def __str__(self):
"""
Pretty string of this source status.
>>> print(sqs.sourceStatuses[0])
Status of source MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
"""
return self._jss.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def description(self):
"""
Description of the source corresponding to this status.
>>> sqs.sourceStatuses[0].description
u'MySource1'
"""
return self._jss.description()

@property
@ignore_unicode_prefix
@since(2.1)
def offsetDesc(self):
"""
Description of the current offset if known.
>>> sqs.sourceStatuses[0].offsetDesc
u'0'
"""
return self._jss.offsetDesc()

@property
@since(2.1)
def inputRate(self):
"""
Current rate (rows/sec) at which data is being generated by the source.
>>> sqs.sourceStatuses[0].inputRate
15.5
"""
return self._jss.inputRate()

@property
@since(2.1)
def processingRate(self):
"""
Current rate (rows/sec) at which the query is processing data from the source.
>>> sqs.sourceStatuses[0].processingRate
23.5
"""
return self._jss.processingRate()

@property
@ignore_unicode_prefix
@since(2.1)
def triggerDetails(self):
"""
Low-level details of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).
If no trigger is currently active, then it will have details of the last completed trigger.
>>> sqs.sourceStatuses[0].triggerDetails
{u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
u'latency.getBatch.source': u'20'}
"""
return self._jss.triggerDetails()


class SinkStatus(object):
"""
Status and metrics of a streaming Sink.
.. note:: Experimental
.. versionadded:: 2.1
"""

def __init__(self, jss):
self._jss = jss

def __str__(self):
"""
Pretty string of this source status.
>>> print(sqs.sinkStatus)
Status of sink MySink
Committed offsets: [1, -]
"""
return self._jss.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def description(self):
"""
Description of the source corresponding to this status.
>>> sqs.sinkStatus.description
u'MySink'
"""
return self._jss.description()

@property
@ignore_unicode_prefix
@since(2.1)
def offsetDesc(self):
"""
Description of the current offsets up to which data has been written by the sink.
>>> sqs.sinkStatus.offsetDesc
u'[1, -]'
"""
return self._jss.offsetDesc()


class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.
Expand Down Expand Up @@ -1051,8 +762,6 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
globs['sqs'] = StreamingQueryStatus(
spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())

(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.metrics.source.{Source => CodahaleSource}
import org.apache.spark.util.Clock

/**
* Class that serves some of the metrics through Codahale/DropWizard metrics
*
* Serves metrics from a [[org.apache.spark.sql.streaming.StreamingQuery]] to
* Codahale/DropWizard metrics
*/
class StreamMetricsReporter(
stream: StreamExecution,
Expand Down
Loading

0 comments on commit 213081a

Please sign in to comment.