Skip to content

Commit

Permalink
Cherry-pick request apache#17566 from [BEAM-14146] Python Streaming j…
Browse files Browse the repository at this point in the history
…ob failing to drain with BigQueryIO write errors (apache#17595)

[BEAM-14146] Python Streaming job failing to drain with BigQueryIO write errors

Co-authored-by: Pablo <[email protected]>
  • Loading branch information
y1chi and pabloem authored May 10, 2022
1 parent 8aa77e6 commit 6e2cca1
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 17 deletions.
18 changes: 3 additions & 15 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,9 @@ def _insert_load_job(
job_labels=None):

if not source_uris and not source_stream:
raise ValueError(
'Either a non-empty list of fully-qualified source URIs must be '
'provided via the source_uris parameter or an open file object must '
'be provided via the source_stream parameter. Got neither.')
_LOGGER.warning(
'Both source URIs and source stream are not provided. BigQuery load '
'job will not load any data.')

if source_uris and source_stream:
raise ValueError(
Expand Down Expand Up @@ -1002,17 +1001,6 @@ def perform_load_job(
Returns:
bigquery.JobReference with the information about the job that was started.
"""
if not source_uris and not source_stream:
raise ValueError(
'Either a non-empty list of fully-qualified source URIs must be '
'provided via the source_uris parameter or an open file object must '
'be provided via the source_stream parameter. Got neither.')

if source_uris and source_stream:
raise ValueError(
'Only one of source_uris and source_stream may be specified. '
'Got both.')

project_id = (
destination.projectId
if load_job_project_id is None else load_job_project_id)
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ def test_perform_load_job_source_mutual_exclusivity(self):
source_stream=io.BytesIO())

# Neither source_uri nor source_stream specified.
with self.assertRaises(ValueError):
wrapper.perform_load_job(destination='P:D.T', job_id='J')
wrapper.perform_load_job(
destination=parse_table_reference('project:dataset.table'), job_id='J')

def test_perform_load_job_with_source_stream(self):
client = mock.Mock()
Expand Down

0 comments on commit 6e2cca1

Please sign in to comment.