Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add page iterator to ReadRowsStream #7680

Merged
merged 3 commits into from
Apr 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 182 additions & 26 deletions bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from __future__ import absolute_import

import collections
import itertools
import json

try:
Expand All @@ -38,6 +37,7 @@
google.api_core.exceptions.ServiceUnavailable,
)
_FASTAVRO_REQUIRED = "fastavro is required to parse Avro blocks"
_PANDAS_REQUIRED = "pandas is required to create a DataFrame"


class ReadRowsStream(object):
Expand Down Expand Up @@ -156,9 +156,7 @@ def rows(self, read_session):
if fastavro is None:
raise ImportError(_FASTAVRO_REQUIRED)

avro_schema, _ = _avro_schema(read_session)
blocks = (_avro_rows(block, avro_schema) for block in self)
return itertools.chain.from_iterable(blocks)
return ReadRowsIterable(self, read_session)

def to_dataframe(self, read_session, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
Expand Down Expand Up @@ -192,29 +190,186 @@ def to_dataframe(self, read_session, dtypes=None):
if fastavro is None:
raise ImportError(_FASTAVRO_REQUIRED)
if pandas is None:
raise ImportError("pandas is required to create a DataFrame")
raise ImportError(_PANDAS_REQUIRED)

if dtypes is None:
dtypes = {}
return self.rows(read_session).to_dataframe(dtypes=dtypes)


class ReadRowsIterable(object):
"""An iterable of rows from a read session.

Args:
reader (google.cloud.bigquery_storage_v1beta1.reader.ReadRowsStream):
A read rows stream.
read_session (google.cloud.bigquery_storage_v1beta1.types.ReadSession):
A read session. This is required because it contains the schema
used in the stream blocks.
"""

# This class is modelled after the google.cloud.bigquery.table.RowIterator
# and aims to be API compatible where possible.

def __init__(self, reader, read_session):
self._status = None
self._reader = reader
self._read_session = read_session

@property
def total_rows(self):
"""int: Number of estimated rows in the current stream.

May change over time.
"""
return getattr(self._status, "estimated_row_count", None)

@property
def pages(self):
"""A generator of all pages in the stream.

Returns:
types.GeneratorType[google.cloud.bigquery_storage_v1beta1.ReadRowsPage]:
A generator of pages.
"""
# Each page is an iterator of rows. But also has num_items, remaining,
# and to_dataframe.
avro_schema, column_names = _avro_schema(self._read_session)
for block in self._reader:
self._status = block.status
yield ReadRowsPage(avro_schema, column_names, block)

def __iter__(self):
"""Iterator for each row in all pages."""
for page in self.pages:
for row in page:
yield row

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.

This method requires the pandas libary to create a data frame and the
fastavro library to parse row blocks.

.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.

Args:
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.

Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

avro_schema, column_names = _avro_schema(read_session)
frames = []
for block in self:
dataframe = _to_dataframe_with_dtypes(
_avro_rows(block, avro_schema), column_names, dtypes
)
frames.append(dataframe)
for page in self.pages:
frames.append(page.to_dataframe(dtypes=dtypes))
return pandas.concat(frames)


def _to_dataframe_with_dtypes(rows, column_names, dtypes):
columns = collections.defaultdict(list)
for row in rows:
for column in row:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=column_names)
class ReadRowsPage(object):
"""An iterator of rows from a read session block.

Args:
avro_schema (fastavro.schema):
A parsed Avro schema, using :func:`fastavro.schema.parse_schema`
column_names (Tuple[str]]):
A read session's column names (in requested order).
block (google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse):
A block of data from a read rows stream.
"""

# This class is modeled after google.api_core.page_iterator.Page and aims
# to provide API compatibility where possible.

def __init__(self, avro_schema, column_names, block):
self._avro_schema = avro_schema
self._column_names = column_names
self._block = block
self._iter_rows = None
self._num_items = None
self._remaining = None

def _parse_block(self):
"""Parse metadata and rows from the block only once."""
if self._iter_rows is not None:
return

rows = _avro_rows(self._block, self._avro_schema)
self._num_items = self._block.avro_rows.row_count
self._remaining = self._block.avro_rows.row_count
self._iter_rows = iter(rows)

@property
def num_items(self):
"""int: Total items in the page."""
self._parse_block()
return self._num_items

@property
def remaining(self):
"""int: Remaining items in the page."""
self._parse_block()
return self._remaining

def __iter__(self):
"""A ``ReadRowsPage`` is an iterator."""
return self

def next(self):
"""Get the next row in the page."""
self._parse_block()
if self._remaining > 0:
self._remaining -= 1
return six.next(self._iter_rows)

# Alias needed for Python 2/3 support.
__next__ = next

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of rows in the page.

This method requires the pandas libary to create a data frame and the
fastavro library to parse row blocks.

.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.

Args:
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.

Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

if dtypes is None:
dtypes = {}

columns = collections.defaultdict(list)
for row in self:
for column in row:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=self._column_names)


def _avro_schema(read_session):
Expand Down Expand Up @@ -242,12 +397,13 @@ def _avro_rows(block, avro_schema):
"""Parse all rows in a stream block.

Args:
read_session ( \
~google.cloud.bigquery_storage_v1beta1.types.ReadSession \
block ( \
~google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse \
):
The read session associated with this read rows stream. This
contains the schema, which is required to parse the data
blocks.
A block containing Avro bytes to parse into rows.
avro_schema (fastavro.schema):
A parsed Avro schema, used to deserialized the bytes in the
block.

Returns:
Iterable[Mapping]:
Expand Down
Loading