Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add job_id, location, project, and query_id properties on RowIterator #1733

Merged
merged 3 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3843,6 +3843,8 @@ def list_rows(
# tables can be fetched without a column filter.
selected_fields=selected_fields,
total_rows=getattr(table, "num_rows", None),
project=table.project,
location=table.location,
)
return row_iterator

Expand All @@ -3859,6 +3861,7 @@ def _list_rows_from_query_results(
page_size: Optional[int] = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
query_id: Optional[str] = None,
) -> RowIterator:
"""List the rows of a completed query.
See
Expand Down Expand Up @@ -3898,6 +3901,9 @@ def _list_rows_from_query_results(
would otherwise be a successful response.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
query_id (Optional[str]):
[Preview] ID of a completed query. This ID is auto-generated
and not guaranteed to be populated.
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
Expand Down Expand Up @@ -3928,6 +3934,10 @@ def _list_rows_from_query_results(
table=destination,
extra_params=params,
total_rows=total_rows,
project=project,
location=location,
job_id=job_id,
query_id=query_id,
)
return row_iterator

Expand Down
24 changes: 22 additions & 2 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,15 @@ def query(self):
self._properties, ["configuration", "query", "query"]
)

@property
def query_id(self) -> Optional[str]:
"""[Preview] ID of a completed query.

This ID is auto-generated and not guaranteed to be populated.
"""
query_results = self._query_results
return query_results.query_id if query_results is not None else None
Linchin marked this conversation as resolved.
Show resolved Hide resolved

@property
def query_parameters(self):
"""See
Expand Down Expand Up @@ -1525,7 +1534,12 @@ def result( # type: ignore # (complaints about the overloaded signature)
provided and the job is not retryable.
"""
if self.dry_run:
return _EmptyRowIterator()
return _EmptyRowIterator(
project=self.project,
location=self.location,
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
)
try:
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
Expand Down Expand Up @@ -1594,7 +1608,12 @@ def do_get_result():
# indicate success and avoid calling tabledata.list on a table which
# can't be read (such as a view table).
if self._query_results.total_rows is None:
return _EmptyRowIterator()
return _EmptyRowIterator(
location=self.location,
project=self.project,
job_id=self.job_id,
query_id=self.query_id,
)

rows = self._client._list_rows_from_query_results(
self.job_id,
Expand All @@ -1608,6 +1627,7 @@ def do_get_result():
start_index=start_index,
retry=retry,
timeout=timeout,
query_id=self.query_id,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
8 changes: 8 additions & 0 deletions google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,14 @@ def job_id(self):
"""
return self._properties.get("jobReference", {}).get("jobId")

@property
def query_id(self) -> Optional[str]:
"""[Preview] ID of a completed query.

This ID is auto-generated and not guaranteed to be populated.
"""
return self._properties.get("queryId")

@property
def page_token(self):
"""Token for fetching next bach of results.
Expand Down
49 changes: 46 additions & 3 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,10 @@ def __init__(
selected_fields=None,
total_rows=None,
first_page_response=None,
location: Optional[str] = None,
job_id: Optional[str] = None,
query_id: Optional[str] = None,
project: Optional[str] = None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1575,12 +1579,51 @@ def __init__(
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
self._project = client.project if client is not None else None
self._schema = schema
self._selected_fields = selected_fields
self._table = table
self._total_rows = total_rows
self._first_page_response = first_page_response
self._location = location
self._job_id = job_id
self._query_id = query_id
self._project = project

@property
def _bqstorage_project(self) -> Optional[str]:
"""GCP Project ID where BQ Storage API will bill to (if applicable)."""
tswast marked this conversation as resolved.
Show resolved Hide resolved
client = self.client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what's the difference between self.client.project and self._project. When the client isn't of storage type, does self.client.project still reflect the storage project id?

Copy link
Contributor Author

@tswast tswast Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference in other contexts is usually described as "billing project" versus "data project". I've renamed this private property to reflect that. This is easiest to understand in the context of public datasets.

I have permission to query tables in the bigquery-public-data project and even download the data in bulk with the BigQuery Storage Read API, but I don't have permission to run queries in that project or start a BigQuery Storage Read API session from that project. Instead, I have start the query or bq storage read session in my own project and reference the tables in the other project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow I see! so self._project is the project for the data/table, and self.client.project a.k.a. billing project is the user's own project. So I guess this billing project doesn't just apply to storage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, though for queries we're using jobs.getQueryResults, so the project will always be the billing project since that's where temp results for queries are stored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tabledata.list is free, but technically we should be overriding the x-goog-user-project header so that the quota is charged to the billing project. (I don't think we're actually doing that, though)

return client.project if client is not None else None

@property
def job_id(self) -> Optional[str]:
"""ID of the query job (if applicable).

To get the job metadata, call
``job = client.get_job(rows.job_id, location=rows.location)``.
"""
return self._job_id

@property
def location(self) -> Optional[str]:
"""Location where the query executed (if applicable).

See: https://cloud.google.com/bigquery/docs/locations
"""
return self._location

@property
def project(self) -> Optional[str]:
"""GCP Project ID where these rows are read from."""
return self._project

@property
def query_id(self) -> Optional[str]:
"""[Preview] ID of a completed query.

This ID is auto-generated and not guaranteed to be populated.
"""
return self._query_id

def _is_completely_cached(self):
"""Check if all results are completely cached.
Expand Down Expand Up @@ -1723,7 +1766,7 @@ def to_arrow_iterable(

bqstorage_download = functools.partial(
_pandas_helpers.download_arrow_bqstorage,
self._project,
self._bqstorage_project,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we now have two possible project values, does this mean a RowIterator can correspond to different projects for storage APIs compared to other APIs?

self._table,
bqstorage_client,
preserve_order=self._preserve_order,
Expand Down Expand Up @@ -1903,7 +1946,7 @@ def to_dataframe_iterable(
column_names = [field.name for field in self._schema]
bqstorage_download = functools.partial(
_pandas_helpers.download_dataframe_bqstorage,
self._project,
self._bqstorage_project,
self._table,
bqstorage_client,
column_names,
Expand Down
18 changes: 17 additions & 1 deletion tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ def test_result(self):
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
"queryId": "abc-def",
}
job_resource = self._make_resource(started=True, location="EU")
job_resource_done = self._make_resource(started=True, ended=True, location="EU")
Expand Down Expand Up @@ -980,6 +981,10 @@ def test_result(self):
rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(result.job_id, self.JOB_ID)
self.assertEqual(result.location, "EU")
self.assertEqual(result.project, self.PROJECT)
self.assertEqual(result.query_id, "abc-def")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 1)
Expand Down Expand Up @@ -1023,6 +1028,12 @@ def test_result_dry_run(self):
calls = conn.api_request.mock_calls
self.assertIsInstance(result, _EmptyRowIterator)
self.assertEqual(calls, [])
self.assertEqual(result.location, "EU")
self.assertEqual(result.project, self.PROJECT)
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
self.assertIsNone(result.job_id)
self.assertIsNone(result.query_id)

def test_result_with_done_job_calls_get_query_results(self):
query_resource_done = {
Expand Down Expand Up @@ -1180,16 +1191,21 @@ def test_result_w_empty_schema(self):
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": []},
"queryId": "xyz-abc",
}
connection = make_connection(query_resource, query_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
resource = self._make_resource(ended=True, location="asia-northeast1")
job = self._get_target_class().from_api_repr(resource, client)

result = job.result()

self.assertIsInstance(result, _EmptyRowIterator)
self.assertEqual(list(result), [])
self.assertEqual(result.project, self.PROJECT)
self.assertEqual(result.job_id, self.JOB_ID)
self.assertEqual(result.location, "asia-northeast1")
self.assertEqual(result.query_id, "xyz-abc")

def test_result_invokes_begins(self):
begun_resource = self._make_resource()
Expand Down
12 changes: 8 additions & 4 deletions tests/unit/job/test_query_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg):
[name_array, age_array], schema=arrow_schema
)
connection = make_connection(query_resource)
client = _make_client(connection=connection)
client = _make_client(connection=connection, project="bqstorage-billing-project")
job = target_class.from_api_repr(resource, client)
session = bigquery_storage.types.ReadSession()
session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
Expand Down Expand Up @@ -597,7 +597,9 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg):
**table_read_options_kwarg,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
# The billing project can differ from the data project. Make sure we
# are charging to the billing project, not the data project.
parent="projects/bqstorage-billing-project",
read_session=expected_session,
max_stream_count=0, # Use default number of streams for best performance.
)
Expand All @@ -618,7 +620,7 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression():
"schema": {"fields": [{"name": "name", "type": "STRING", "mode": "NULLABLE"}]},
}
connection = make_connection(query_resource)
client = _make_client(connection=connection)
client = _make_client(connection=connection, project="bqstorage-billing-project")
job = target_class.from_api_repr(resource, client)
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
Expand Down Expand Up @@ -646,7 +648,9 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression():
data_format=bigquery_storage.DataFormat.ARROW,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
# The billing project can differ from the data project. Make sure we
# are charging to the billing project, not the data project.
parent="projects/bqstorage-billing-project",
read_session=expected_session,
max_stream_count=0,
)
Expand Down
11 changes: 10 additions & 1 deletion tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6401,11 +6401,16 @@ def test_list_rows(self):
age = SchemaField("age", "INTEGER", mode="NULLABLE")
joined = SchemaField("joined", "TIMESTAMP", mode="NULLABLE")
table = Table(self.TABLE_REF, schema=[full_name, age, joined])
table._properties["location"] = "us-central1"
table._properties["numRows"] = 7

iterator = client.list_rows(table, timeout=7.5)

# Check that initial total_rows is populated from the table.
# Check that initial RowIterator is populated from the table metadata.
self.assertIsNone(iterator.job_id)
self.assertEqual(iterator.location, "us-central1")
self.assertEqual(iterator.project, table.project)
self.assertIsNone(iterator.query_id)
self.assertEqual(iterator.total_rows, 7)
page = next(iterator.pages)
rows = list(page)
Expand Down Expand Up @@ -6521,6 +6526,10 @@ def test_list_rows_empty_table(self):
selected_fields=[],
)

self.assertIsNone(rows.job_id)
self.assertIsNone(rows.location)
self.assertEqual(rows.project, self.TABLE_REF.project)
self.assertIsNone(rows.query_id)
# When a table reference / string and selected_fields is provided,
# total_rows can't be populated until iteration starts.
self.assertIsNone(rows.total_rows)
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,16 @@ def test_page_token_present(self):
query = self._make_one(resource)
self.assertEqual(query.page_token, "TOKEN")

def test_query_id_missing(self):
query = self._make_one(self._make_resource())
self.assertIsNone(query.query_id)

def test_query_id_present(self):
resource = self._make_resource()
resource["queryId"] = "test-query-id"
query = self._make_one(resource)
self.assertEqual(query.query_id, "test-query-id")

def test_total_rows_present_integer(self):
resource = self._make_resource()
resource["totalRows"] = 42
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,38 @@ def test_constructor_with_dict_schema(self):
]
self.assertEqual(iterator.schema, expected_schema)

def test_job_id_missing(self):
rows = self._make_one()
self.assertIsNone(rows.job_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tswast

Just curious about testing philosophy.
For this collection of tests that are pretty much the same (i.e. all the tests that .assertIsNone()) is there a benefit in your mind to having each test broken out separately versus providing a parameterized list of inputs and expected outputs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I matched the existing pattern in this test. I do think we could probably combine a lot of these into a single parametrized test.

I worry slightly about the required getattr(property_name) being too complex for new contributors to understand. In general tests should avoid anything even as complex as an if statement. https://testing.googleblog.com/2014/07/testing-on-toilet-dont-put-logic-in.html In this case, getattr is a grey area. I wouldn't necessarily count it as "logic"


def test_job_id_present(self):
rows = self._make_one(job_id="abc-123")
self.assertEqual(rows.job_id, "abc-123")

def test_location_missing(self):
rows = self._make_one()
self.assertIsNone(rows.location)

def test_location_present(self):
rows = self._make_one(location="asia-northeast1")
self.assertEqual(rows.location, "asia-northeast1")

def test_project_missing(self):
rows = self._make_one()
self.assertIsNone(rows.project)

def test_project_present(self):
rows = self._make_one(project="test-project")
self.assertEqual(rows.project, "test-project")

def test_query_id_missing(self):
rows = self._make_one()
self.assertIsNone(rows.query_id)

def test_query_id_present(self):
rows = self._make_one(query_id="xyz-987")
self.assertEqual(rows.query_id, "xyz-987")

def test_iterate(self):
from google.cloud.bigquery.schema import SchemaField

Expand Down