Skip to content

Commit

Permalink
Use PipelineOptions for constructing BigQueryWrapper when estimat…
Browse files Browse the repository at this point in the history
…ing BigQuery table size (#26622) (#26662)

* Add `BigQueryWrapper` factory method with `PipelineOptions` (#26622)

* Pass `PipelineOptions` to `BigQueryWrapper` when estimating table size (#26622)

* Reformat `bigquery_tools.py` (#26622)
  • Loading branch information
dopieralad authored May 31, 2023
1 parent 5bd0c08 commit 018d626
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def display_data(self):
}

def estimate_size(self):
bq = bigquery_tools.BigQueryWrapper()
bq = bigquery_tools.BigQueryWrapper.from_pipeline_options(self.options)
if self.table_reference is not None:
table_ref = self.table_reference
if (isinstance(self.table_reference, vp.ValueProvider) and
Expand Down
23 changes: 16 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,7 @@ class BigQueryWrapper(object):
HISTOGRAM_METRIC_LOGGER = MetricLogger()

def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
self.client = client or bigquery.BigqueryV2(
http=get_new_http(),
credentials=auth.get_service_credentials(PipelineOptions()),
response_encoding='utf8',
additional_http_headers={
"user-agent": "apache-beam-%s" % apache_beam.__version__
})
self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions())
self.gcp_bq_client = client or gcp_bigquery.Client(
client_info=ClientInfo(
user_agent="apache-beam-%s" % apache_beam.__version__))
Expand Down Expand Up @@ -1350,6 +1344,21 @@ def convert_row_to_dict(self, row, schema):
result[field.name] = self._convert_cell_value_to_dict(value, field)
return result

@staticmethod
def from_pipeline_options(pipeline_options: PipelineOptions):
return BigQueryWrapper(
client=BigQueryWrapper._bigquery_client(pipeline_options))

@staticmethod
def _bigquery_client(pipeline_options: PipelineOptions):
return bigquery.BigqueryV2(
http=get_new_http(),
credentials=auth.get_service_credentials(pipeline_options),
response_encoding='utf8',
additional_http_headers={
"user-agent": "apache-beam-%s" % apache_beam.__version__
})


class RowAsDictJsonCoder(coders.Coder):
"""A coder for a table row (represented as a dict) to/from a JSON string.
Expand Down

0 comments on commit 018d626

Please sign in to comment.