diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 18f22270feac..381ad84f0312 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -2832,29 +2832,33 @@ def _blocking_poll(self, timeout=None): self._done_timeout = timeout super(QueryJob, self)._blocking_poll(timeout=timeout) - def result(self, timeout=None, retry=DEFAULT_RETRY): + def result(self, timeout=None, page_size=None, retry=DEFAULT_RETRY): """Start the job and wait for it to complete and get the result. - :type timeout: float - :param timeout: - How long (in seconds) to wait for job to complete before raising - a :class:`concurrent.futures.TimeoutError`. - - :type retry: :class:`google.api_core.retry.Retry` - :param retry: (Optional) How to retry the call that retrieves rows. + Args: + timeout (float): + How long (in seconds) to wait for job to complete before + raising a :class:`concurrent.futures.TimeoutError`. + page_size (int): + (Optional) The maximum number of rows in each page of results + from this request. Non-positive values are ignored. + retry (google.api_core.retry.Retry): + (Optional) How to retry the call that retrieves rows. - :rtype: :class:`~google.cloud.bigquery.table.RowIterator` - :returns: - Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s. - During each page, the iterator will have the ``total_rows`` - attribute set, which counts the total number of rows **in the - result set** (this is distinct from the total number of rows in - the current page: ``iterator.page.num_items``). + Returns: + google.cloud.bigquery.table.RowIterator: + Iterator of row data + :class:`~google.cloud.bigquery.table.Row`-s. During each + page, the iterator will have the ``total_rows`` attribute + set, which counts the total number of rows **in the result + set** (this is distinct from the total number of rows in the + current page: ``iterator.page.num_items``). - :raises: - :class:`~google.cloud.exceptions.GoogleCloudError` if the job - failed or :class:`concurrent.futures.TimeoutError` if the job did - not complete in the given timeout. + Raises: + google.cloud.exceptions.GoogleCloudError: + If the job failed. + concurrent.futures.TimeoutError: + If the job did not complete in the given timeout. """ super(QueryJob, self).result(timeout=timeout) # Return an iterator instead of returning the job. @@ -2874,7 +2878,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): dest_table_ref = self.destination dest_table = Table(dest_table_ref, schema=schema) dest_table._properties["numRows"] = self._query_results.total_rows - rows = self._client.list_rows(dest_table, retry=retry) + rows = self._client.list_rows(dest_table, page_size=page_size, retry=retry) rows._preserve_order = _contains_order_by(self.query) return rows diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 2b4aa84b8faf..d04bf7c1854b 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1202,6 +1202,15 @@ def test_query_w_timeout(self): # 1 second is much too short for this query. query_job.result(timeout=1) + def test_query_w_page_size(self): + page_size = 45 + query_job = Config.CLIENT.query( + "SELECT word FROM `bigquery-public-data.samples.shakespeare`;", + job_id_prefix="test_query_w_page_size_", + ) + iterator = query_job.result(page_size=page_size) + self.assertEqual(next(iterator.pages).num_items, page_size) + def test_query_statistics(self): """ A system test to exercise some of the extended query statistics. diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index bb6f03f3efb3..abb2a2c4ec1e 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -4168,6 +4168,62 @@ def test_result_w_timeout(self): self.assertEqual(query_request[1]["query_params"]["timeoutMs"], 900) self.assertEqual(reload_request[1]["method"], "GET") + def test_result_w_page_size(self): + # Arrange + query_results_resource = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, + "totalRows": "4", + } + job_resource = self._make_resource(started=True, ended=True) + q_config = job_resource["configuration"]["query"] + q_config["destinationTable"] = { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": self.TABLE_ID, + } + tabledata_resource = { + "totalRows": 4, + "pageToken": "some-page-token", + "rows": [ + {"f": [{"v": "row1"}]}, + {"f": [{"v": "row2"}]}, + {"f": [{"v": "row3"}]}, + ], + } + tabledata_resource_page_2 = {"totalRows": 4, "rows": [{"f": [{"v": "row4"}]}]} + conn = _make_connection( + query_results_resource, tabledata_resource, tabledata_resource_page_2 + ) + client = _make_client(self.PROJECT, connection=conn) + job = self._get_target_class().from_api_repr(job_resource, client) + + # Act + result = job.result(page_size=3) + + # Assert + actual_rows = list(result) + self.assertEqual(len(actual_rows), 4) + + tabledata_path = "/projects/%s/datasets/%s/tables/%s/data" % ( + self.PROJECT, + self.DS_ID, + self.TABLE_ID, + ) + conn.api_request.assert_has_calls( + [ + mock.call( + method="GET", path=tabledata_path, query_params={"maxResults": 3} + ), + mock.call( + method="GET", + path=tabledata_path, + query_params={"pageToken": "some-page-token", "maxResults": 3}, + ), + ] + ) + def test_result_error(self): from google.cloud import exceptions