Skip to content

Commit

Permalink
Add page iterator to ReadRowsStream (#7680)
Browse files Browse the repository at this point in the history
* Add page iterator to ReadRowsStream

This allows readers to read blocks (called pages for compatibility with
BigQuery client library) one at a time from a stream. This enables use
cases such as progress bar support and streaming workers that expect
pandas DataFrames.
  • Loading branch information
tswast authored Apr 16, 2019
1 parent 0bccf6c commit d3bcf77
Show file tree
Hide file tree
Showing 2 changed files with 329 additions and 33 deletions.
208 changes: 182 additions & 26 deletions bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from __future__ import absolute_import

import collections
import itertools
import json

try:
Expand All @@ -38,6 +37,7 @@
google.api_core.exceptions.ServiceUnavailable,
)
_FASTAVRO_REQUIRED = "fastavro is required to parse Avro blocks"
_PANDAS_REQUIRED = "pandas is required to create a DataFrame"


class ReadRowsStream(object):
Expand Down Expand Up @@ -156,9 +156,7 @@ def rows(self, read_session):
if fastavro is None:
raise ImportError(_FASTAVRO_REQUIRED)

avro_schema, _ = _avro_schema(read_session)
blocks = (_avro_rows(block, avro_schema) for block in self)
return itertools.chain.from_iterable(blocks)
return ReadRowsIterable(self, read_session)

def to_dataframe(self, read_session, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
Expand Down Expand Up @@ -192,29 +190,186 @@ def to_dataframe(self, read_session, dtypes=None):
if fastavro is None:
raise ImportError(_FASTAVRO_REQUIRED)
if pandas is None:
raise ImportError("pandas is required to create a DataFrame")
raise ImportError(_PANDAS_REQUIRED)

if dtypes is None:
dtypes = {}
return self.rows(read_session).to_dataframe(dtypes=dtypes)


class ReadRowsIterable(object):
"""An iterable of rows from a read session.
Args:
reader (google.cloud.bigquery_storage_v1beta1.reader.ReadRowsStream):
A read rows stream.
read_session (google.cloud.bigquery_storage_v1beta1.types.ReadSession):
A read session. This is required because it contains the schema
used in the stream blocks.
"""

# This class is modelled after the google.cloud.bigquery.table.RowIterator
# and aims to be API compatible where possible.

def __init__(self, reader, read_session):
self._status = None
self._reader = reader
self._read_session = read_session

@property
def total_rows(self):
"""int: Number of estimated rows in the current stream.
May change over time.
"""
return getattr(self._status, "estimated_row_count", None)

@property
def pages(self):
"""A generator of all pages in the stream.
Returns:
types.GeneratorType[google.cloud.bigquery_storage_v1beta1.ReadRowsPage]:
A generator of pages.
"""
# Each page is an iterator of rows. But also has num_items, remaining,
# and to_dataframe.
avro_schema, column_names = _avro_schema(self._read_session)
for block in self._reader:
self._status = block.status
yield ReadRowsPage(avro_schema, column_names, block)

def __iter__(self):
"""Iterator for each row in all pages."""
for page in self.pages:
for row in page:
yield row

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
This method requires the pandas libary to create a data frame and the
fastavro library to parse row blocks.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.
Args:
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

avro_schema, column_names = _avro_schema(read_session)
frames = []
for block in self:
dataframe = _to_dataframe_with_dtypes(
_avro_rows(block, avro_schema), column_names, dtypes
)
frames.append(dataframe)
for page in self.pages:
frames.append(page.to_dataframe(dtypes=dtypes))
return pandas.concat(frames)


def _to_dataframe_with_dtypes(rows, column_names, dtypes):
columns = collections.defaultdict(list)
for row in rows:
for column in row:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=column_names)
class ReadRowsPage(object):
"""An iterator of rows from a read session block.
Args:
avro_schema (fastavro.schema):
A parsed Avro schema, using :func:`fastavro.schema.parse_schema`
column_names (Tuple[str]]):
A read session's column names (in requested order).
block (google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse):
A block of data from a read rows stream.
"""

# This class is modeled after google.api_core.page_iterator.Page and aims
# to provide API compatibility where possible.

def __init__(self, avro_schema, column_names, block):
self._avro_schema = avro_schema
self._column_names = column_names
self._block = block
self._iter_rows = None
self._num_items = None
self._remaining = None

def _parse_block(self):
"""Parse metadata and rows from the block only once."""
if self._iter_rows is not None:
return

rows = _avro_rows(self._block, self._avro_schema)
self._num_items = self._block.avro_rows.row_count
self._remaining = self._block.avro_rows.row_count
self._iter_rows = iter(rows)

@property
def num_items(self):
"""int: Total items in the page."""
self._parse_block()
return self._num_items

@property
def remaining(self):
"""int: Remaining items in the page."""
self._parse_block()
return self._remaining

def __iter__(self):
"""A ``ReadRowsPage`` is an iterator."""
return self

def next(self):
"""Get the next row in the page."""
self._parse_block()
if self._remaining > 0:
self._remaining -= 1
return six.next(self._iter_rows)

# Alias needed for Python 2/3 support.
__next__ = next

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of rows in the page.
This method requires the pandas libary to create a data frame and the
fastavro library to parse row blocks.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.
Args:
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

if dtypes is None:
dtypes = {}

columns = collections.defaultdict(list)
for row in self:
for column in row:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=self._column_names)


def _avro_schema(read_session):
Expand Down Expand Up @@ -242,12 +397,13 @@ def _avro_rows(block, avro_schema):
"""Parse all rows in a stream block.
Args:
read_session ( \
~google.cloud.bigquery_storage_v1beta1.types.ReadSession \
block ( \
~google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse \
):
The read session associated with this read rows stream. This
contains the schema, which is required to parse the data
blocks.
A block containing Avro bytes to parse into rows.
avro_schema (fastavro.schema):
A parsed Avro schema, used to deserialized the bytes in the
block.
Returns:
Iterable[Mapping]:
Expand Down
Loading

0 comments on commit d3bcf77

Please sign in to comment.