From d3bcf77cad8f16b118886ae0bb65c788ef843c5a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 16 Apr 2019 13:35:52 -0700 Subject: [PATCH] Add page iterator to ReadRowsStream (#7680) * Add page iterator to ReadRowsStream This allows readers to read blocks (called pages for compatibility with BigQuery client library) one at a time from a stream. This enables use cases such as progress bar support and streaming workers that expect pandas DataFrames. --- .../cloud/bigquery_storage_v1beta1/reader.py | 208 +++++++++++++++--- bigquery_storage/tests/unit/test_reader.py | 154 ++++++++++++- 2 files changed, 329 insertions(+), 33 deletions(-) diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index 64748b0bc7cf..8c81619500c9 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -15,7 +15,6 @@ from __future__ import absolute_import import collections -import itertools import json try: @@ -38,6 +37,7 @@ google.api_core.exceptions.ServiceUnavailable, ) _FASTAVRO_REQUIRED = "fastavro is required to parse Avro blocks" +_PANDAS_REQUIRED = "pandas is required to create a DataFrame" class ReadRowsStream(object): @@ -156,9 +156,7 @@ def rows(self, read_session): if fastavro is None: raise ImportError(_FASTAVRO_REQUIRED) - avro_schema, _ = _avro_schema(read_session) - blocks = (_avro_rows(block, avro_schema) for block in self) - return itertools.chain.from_iterable(blocks) + return ReadRowsIterable(self, read_session) def to_dataframe(self, read_session, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. @@ -192,29 +190,186 @@ def to_dataframe(self, read_session, dtypes=None): if fastavro is None: raise ImportError(_FASTAVRO_REQUIRED) if pandas is None: - raise ImportError("pandas is required to create a DataFrame") + raise ImportError(_PANDAS_REQUIRED) - if dtypes is None: - dtypes = {} + return self.rows(read_session).to_dataframe(dtypes=dtypes) + + +class ReadRowsIterable(object): + """An iterable of rows from a read session. + + Args: + reader (google.cloud.bigquery_storage_v1beta1.reader.ReadRowsStream): + A read rows stream. + read_session (google.cloud.bigquery_storage_v1beta1.types.ReadSession): + A read session. This is required because it contains the schema + used in the stream blocks. + """ + + # This class is modelled after the google.cloud.bigquery.table.RowIterator + # and aims to be API compatible where possible. + + def __init__(self, reader, read_session): + self._status = None + self._reader = reader + self._read_session = read_session + + @property + def total_rows(self): + """int: Number of estimated rows in the current stream. + + May change over time. + """ + return getattr(self._status, "estimated_row_count", None) + + @property + def pages(self): + """A generator of all pages in the stream. + + Returns: + types.GeneratorType[google.cloud.bigquery_storage_v1beta1.ReadRowsPage]: + A generator of pages. + """ + # Each page is an iterator of rows. But also has num_items, remaining, + # and to_dataframe. + avro_schema, column_names = _avro_schema(self._read_session) + for block in self._reader: + self._status = block.status + yield ReadRowsPage(avro_schema, column_names, block) + + def __iter__(self): + """Iterator for each row in all pages.""" + for page in self.pages: + for row in page: + yield row + + def to_dataframe(self, dtypes=None): + """Create a :class:`pandas.DataFrame` of all rows in the stream. + + This method requires the pandas libary to create a data frame and the + fastavro library to parse row blocks. + + .. warning:: + DATETIME columns are not supported. They are currently parsed as + strings in the fastavro library. + + Args: + dtypes ( \ + Map[str, Union[str, pandas.Series.dtype]] \ + ): + Optional. A dictionary of column names pandas ``dtype``s. The + provided ``dtype`` is used when constructing the series for + the column specified. Otherwise, the default pandas behavior + is used. + + Returns: + pandas.DataFrame: + A data frame of all rows in the stream. + """ + if pandas is None: + raise ImportError(_PANDAS_REQUIRED) - avro_schema, column_names = _avro_schema(read_session) frames = [] - for block in self: - dataframe = _to_dataframe_with_dtypes( - _avro_rows(block, avro_schema), column_names, dtypes - ) - frames.append(dataframe) + for page in self.pages: + frames.append(page.to_dataframe(dtypes=dtypes)) return pandas.concat(frames) -def _to_dataframe_with_dtypes(rows, column_names, dtypes): - columns = collections.defaultdict(list) - for row in rows: - for column in row: - columns[column].append(row[column]) - for column in dtypes: - columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) - return pandas.DataFrame(columns, columns=column_names) +class ReadRowsPage(object): + """An iterator of rows from a read session block. + + Args: + avro_schema (fastavro.schema): + A parsed Avro schema, using :func:`fastavro.schema.parse_schema` + column_names (Tuple[str]]): + A read session's column names (in requested order). + block (google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse): + A block of data from a read rows stream. + """ + + # This class is modeled after google.api_core.page_iterator.Page and aims + # to provide API compatibility where possible. + + def __init__(self, avro_schema, column_names, block): + self._avro_schema = avro_schema + self._column_names = column_names + self._block = block + self._iter_rows = None + self._num_items = None + self._remaining = None + + def _parse_block(self): + """Parse metadata and rows from the block only once.""" + if self._iter_rows is not None: + return + + rows = _avro_rows(self._block, self._avro_schema) + self._num_items = self._block.avro_rows.row_count + self._remaining = self._block.avro_rows.row_count + self._iter_rows = iter(rows) + + @property + def num_items(self): + """int: Total items in the page.""" + self._parse_block() + return self._num_items + + @property + def remaining(self): + """int: Remaining items in the page.""" + self._parse_block() + return self._remaining + + def __iter__(self): + """A ``ReadRowsPage`` is an iterator.""" + return self + + def next(self): + """Get the next row in the page.""" + self._parse_block() + if self._remaining > 0: + self._remaining -= 1 + return six.next(self._iter_rows) + + # Alias needed for Python 2/3 support. + __next__ = next + + def to_dataframe(self, dtypes=None): + """Create a :class:`pandas.DataFrame` of rows in the page. + + This method requires the pandas libary to create a data frame and the + fastavro library to parse row blocks. + + .. warning:: + DATETIME columns are not supported. They are currently parsed as + strings in the fastavro library. + + Args: + dtypes ( \ + Map[str, Union[str, pandas.Series.dtype]] \ + ): + Optional. A dictionary of column names pandas ``dtype``s. The + provided ``dtype`` is used when constructing the series for + the column specified. Otherwise, the default pandas behavior + is used. + + Returns: + pandas.DataFrame: + A data frame of all rows in the stream. + """ + if pandas is None: + raise ImportError(_PANDAS_REQUIRED) + + if dtypes is None: + dtypes = {} + + columns = collections.defaultdict(list) + for row in self: + for column in row: + columns[column].append(row[column]) + for column in dtypes: + columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) + return pandas.DataFrame(columns, columns=self._column_names) def _avro_schema(read_session): @@ -242,12 +397,13 @@ def _avro_rows(block, avro_schema): """Parse all rows in a stream block. Args: - read_session ( \ - ~google.cloud.bigquery_storage_v1beta1.types.ReadSession \ + block ( \ + ~google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse \ ): - The read session associated with this read rows stream. This - contains the schema, which is required to parse the data - blocks. + A block containing Avro bytes to parse into rows. + avro_schema (fastavro.schema): + A parsed Avro schema, used to deserialized the bytes in the + block. Returns: Iterable[Mapping]: diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index b191a034fff0..0c96a40554cd 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -186,8 +186,9 @@ def test_rows_w_empty_stream(class_under_test, mock_client): [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} ) - got = tuple(reader.rows(read_session)) - assert got == () + got = reader.rows(read_session) + assert got.total_rows is None + assert tuple(got) == () def test_rows_w_scalars(class_under_test, mock_client): @@ -217,6 +218,10 @@ def test_rows_w_reconnect(class_under_test, mock_client): ) bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]] avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema) + + for block in avro_blocks_2: + block.status.estimated_row_count = 7 + mock_client.read_rows.return_value = avro_blocks_2 stream_position = bigquery_storage_v1beta1.types.StreamPosition( stream={"name": "test"} @@ -228,7 +233,7 @@ def test_rows_w_reconnect(class_under_test, mock_client): stream_position, {"metadata": {"test-key": "test-value"}}, ) - got = tuple(reader.rows(read_session)) + got = reader.rows(read_session) expected = tuple( itertools.chain( @@ -237,7 +242,8 @@ def test_rows_w_reconnect(class_under_test, mock_client): ) ) - assert got == expected + assert tuple(got) == expected + assert got.total_rows == 7 mock_client.read_rows.assert_called_once_with( bigquery_storage_v1beta1.types.StreamPosition( stream={"name": "test"}, offset=4 @@ -246,18 +252,88 @@ def test_rows_w_reconnect(class_under_test, mock_client): ) +def test_rows_w_reconnect_by_page(class_under_test, mock_client): + bq_columns = [{"name": "int_col", "type": "int64"}] + avro_schema = _bq_to_avro_schema(bq_columns) + read_session = _generate_read_session(avro_schema) + bq_blocks_1 = [ + [{"int_col": 123}, {"int_col": 234}], + [{"int_col": 345}, {"int_col": 456}], + ] + avro_blocks_1 = _bq_to_avro_blocks(bq_blocks_1, avro_schema) + bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]] + avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema) + + avro_blocks_1[0].status.estimated_row_count = 8 + avro_blocks_1[1].status.estimated_row_count = 6 + avro_blocks_2[0].status.estimated_row_count = 9 + avro_blocks_2[1].status.estimated_row_count = 7 + + mock_client.read_rows.return_value = avro_blocks_2 + stream_position = bigquery_storage_v1beta1.types.StreamPosition( + stream={"name": "test"} + ) + + reader = class_under_test( + _avro_blocks_w_deadline(avro_blocks_1), + mock_client, + stream_position, + {"metadata": {"test-key": "test-value"}}, + ) + got = reader.rows(read_session) + pages = iter(got.pages) + + assert got.total_rows is None + + page_1 = next(pages) + assert got.total_rows == 8 + assert page_1.num_items == 2 + assert page_1.remaining == 2 + assert tuple(page_1) == tuple(bq_blocks_1[0]) + assert page_1.num_items == 2 + assert page_1.remaining == 0 + + page_2 = next(pages) + assert got.total_rows == 6 + assert next(page_2) == bq_blocks_1[1][0] + assert page_2.num_items == 2 + assert page_2.remaining == 1 + assert next(page_2) == bq_blocks_1[1][1] + + page_3 = next(pages) + assert tuple(page_3) == tuple(bq_blocks_2[0]) + assert page_3.num_items == 2 + assert page_3.remaining == 0 + assert got.total_rows == 9 + + page_4 = next(pages) + assert got.total_rows == 7 + assert tuple(page_4) == tuple(bq_blocks_2[1]) + assert page_4.num_items == 1 + assert page_4.remaining == 0 + + def test_to_dataframe_no_pandas_raises_import_error( mut, class_under_test, mock_client, monkeypatch ): monkeypatch.setattr(mut, "pandas", None) + avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) + read_session = _generate_read_session(avro_schema) + avro_blocks = _bq_to_avro_blocks(SCALAR_BLOCKS, avro_schema) + reader = class_under_test( - [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + avro_blocks, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} ) - read_session = bigquery_storage_v1beta1.types.ReadSession() with pytest.raises(ImportError): reader.to_dataframe(read_session) + with pytest.raises(ImportError): + reader.rows(read_session).to_dataframe() + + with pytest.raises(ImportError): + next(reader.rows(read_session).pages).to_dataframe() + def test_to_dataframe_no_fastavro_raises_import_error( mut, class_under_test, mock_client, monkeypatch @@ -305,7 +381,6 @@ def test_to_dataframe_w_scalars(class_under_test): def test_to_dataframe_w_dtypes(class_under_test): - # TODOTODOTODOTODO avro_schema = _bq_to_avro_schema( [ {"name": "bigfloat", "type": "float64"}, @@ -337,6 +412,71 @@ def test_to_dataframe_w_dtypes(class_under_test): ) +def test_to_dataframe_by_page(class_under_test, mock_client): + bq_columns = [ + {"name": "int_col", "type": "int64"}, + {"name": "bool_col", "type": "bool"}, + ] + avro_schema = _bq_to_avro_schema(bq_columns) + read_session = _generate_read_session(avro_schema) + block_1 = [{"int_col": 123, "bool_col": True}, {"int_col": 234, "bool_col": False}] + block_2 = [{"int_col": 345, "bool_col": True}, {"int_col": 456, "bool_col": False}] + block_3 = [{"int_col": 567, "bool_col": True}, {"int_col": 789, "bool_col": False}] + block_4 = [{"int_col": 890, "bool_col": True}] + # Break blocks into two groups to test that iteration continues across + # reconnection. + bq_blocks_1 = [block_1, block_2] + bq_blocks_2 = [block_3, block_4] + avro_blocks_1 = _bq_to_avro_blocks(bq_blocks_1, avro_schema) + avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema) + + mock_client.read_rows.return_value = avro_blocks_2 + stream_position = bigquery_storage_v1beta1.types.StreamPosition( + stream={"name": "test"} + ) + + reader = class_under_test( + _avro_blocks_w_deadline(avro_blocks_1), + mock_client, + stream_position, + {"metadata": {"test-key": "test-value"}}, + ) + got = reader.rows(read_session) + pages = iter(got.pages) + + page_1 = next(pages) + pandas.testing.assert_frame_equal( + page_1.to_dataframe().reset_index(drop=True), + pandas.DataFrame(block_1, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_2 = next(pages) + pandas.testing.assert_frame_equal( + page_2.to_dataframe().reset_index(drop=True), + pandas.DataFrame(block_2, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_3 = next(pages) + pandas.testing.assert_frame_equal( + page_3.to_dataframe().reset_index(drop=True), + pandas.DataFrame(block_3, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_4 = next(pages) + pandas.testing.assert_frame_equal( + page_4.to_dataframe().reset_index(drop=True), + pandas.DataFrame(block_4, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + def test_copy_stream_position(mut): read_position = bigquery_storage_v1beta1.types.StreamPosition( stream={"name": "test"}, offset=41