Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PipelineOptions for constructing BigQueryWrapper when estimating BigQuery table size (#26622) #26662

Merged
merged 3 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def display_data(self):
}

def estimate_size(self):
bq = bigquery_tools.BigQueryWrapper()
bq = bigquery_tools.BigQueryWrapper.from_pipeline_options(self.options)
if self.table_reference is not None:
table_ref = self.table_reference
if (isinstance(self.table_reference, vp.ValueProvider) and
Expand Down
23 changes: 16 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,7 @@ class BigQueryWrapper(object):
HISTOGRAM_METRIC_LOGGER = MetricLogger()

def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
self.client = client or bigquery.BigqueryV2(
http=get_new_http(),
credentials=auth.get_service_credentials(PipelineOptions()),
response_encoding='utf8',
additional_http_headers={
"user-agent": "apache-beam-%s" % apache_beam.__version__
})
self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions())
self.gcp_bq_client = client or gcp_bigquery.Client(
client_info=ClientInfo(
user_agent="apache-beam-%s" % apache_beam.__version__))
Expand Down Expand Up @@ -1350,6 +1344,21 @@ def convert_row_to_dict(self, row, schema):
result[field.name] = self._convert_cell_value_to_dict(value, field)
return result

@staticmethod
def from_pipeline_options(pipeline_options: PipelineOptions):
return BigQueryWrapper(
client=BigQueryWrapper._bigquery_client(pipeline_options))

@staticmethod
def _bigquery_client(pipeline_options: PipelineOptions):
return bigquery.BigqueryV2(
http=get_new_http(),
credentials=auth.get_service_credentials(pipeline_options),
response_encoding='utf8',
additional_http_headers={
"user-agent": "apache-beam-%s" % apache_beam.__version__
})


class RowAsDictJsonCoder(coders.Coder):
"""A coder for a table row (represented as a dict) to/from a JSON string.
Expand Down