Skip to content

Commit

Permalink
Merge pull request #23954: Emit job ids via side output in TriggerFil…
Browse files Browse the repository at this point in the history
…eLoads process to keep beam.Flatten() happy for Spark and Flink runners
  • Loading branch information
chamikaramj committed Nov 3, 2022
1 parent a4e7812 commit 6e49e08
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,18 +437,18 @@ def process(self, element, schema_mod_job_name_prefix):
# Trigger potential schema modification by loading zero rows into the
# destination table with the temporary table schema.
schema_update_job_reference = self.bq_wrapper.perform_load_job(
destination=table_reference,
source_stream=io.BytesIO(), # file with zero rows
job_id=job_name,
schema=temp_table_schema,
write_disposition='WRITE_APPEND',
create_disposition='CREATE_NEVER',
additional_load_parameters=additional_parameters,
job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
# JSON format is hardcoded because zero rows load(unlike AVRO) and
# a nested schema(unlike CSV, which a default one) is permitted.
source_format="NEWLINE_DELIMITED_JSON",
load_job_project_id=self._load_job_project_id)
destination=table_reference,
source_stream=io.BytesIO(), # file with zero rows
job_id=job_name,
schema=temp_table_schema,
write_disposition='WRITE_APPEND',
create_disposition='CREATE_NEVER',
additional_load_parameters=additional_parameters,
job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
# JSON format is hardcoded because zero rows load(unlike AVRO) and
# a nested schema(unlike CSV, which a default one) is permitted.
source_format="NEWLINE_DELIMITED_JSON",
load_job_project_id=self._load_job_project_id)
self.pending_jobs.append(
GlobalWindows.windowed_value(
(destination, schema_update_job_reference)))
Expand Down Expand Up @@ -597,6 +597,7 @@ class TriggerLoadJobs(beam.DoFn):
"""

TEMP_TABLES = 'TemporaryTables'
ONGOING_JOBS = 'OngoingJobs'

def __init__(
self,
Expand Down Expand Up @@ -718,6 +719,8 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs):
source_format=self.source_format,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels(),
load_job_project_id=self.load_job_project_id)
yield pvalue.TaggedOutput(
TriggerLoadJobs.ONGOING_JOBS, (destination, job_reference))
self.pending_jobs.append(
GlobalWindows.windowed_value((destination, job_reference)))

Expand Down Expand Up @@ -1054,13 +1057,17 @@ def _load_data(
load_job_project_id=self.load_job_project_id),
load_job_name_pcv,
*self.schema_side_inputs).with_outputs(
TriggerLoadJobs.TEMP_TABLES, main='main'))
TriggerLoadJobs.TEMP_TABLES,
TriggerLoadJobs.ONGOING_JOBS,
main='main'))

temp_tables_load_job_ids_pc = trigger_loads_outputs['main']
finished_temp_tables_load_job_ids_pc = trigger_loads_outputs['main']
temp_tables_load_job_ids_pc = trigger_loads_outputs[
TriggerLoadJobs.ONGOING_JOBS]
temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES]

schema_mod_job_ids_pc = (
temp_tables_load_job_ids_pc
finished_temp_tables_load_job_ids_pc
| beam.ParDo(
UpdateDestinationSchema(
project=self.project,
Expand All @@ -1072,7 +1079,7 @@ def _load_data(
schema_mod_job_name_pcv))

copy_job_outputs = (
temp_tables_load_job_ids_pc
finished_temp_tables_load_job_ids_pc
| beam.ParDo(
TriggerCopyJobs(
project=self.project,
Expand Down Expand Up @@ -1113,7 +1120,9 @@ def _load_data(
step_name=step_name,
load_job_project_id=self.load_job_project_id),
load_job_name_pcv,
*self.schema_side_inputs))
*self.schema_side_inputs).with_outputs(
TriggerLoadJobs.ONGOING_JOBS, main='main')
)[TriggerLoadJobs.ONGOING_JOBS]

destination_load_job_ids_pc = (
(temp_tables_load_job_ids_pc, destination_load_job_ids_pc)
Expand Down

0 comments on commit 6e49e08

Please sign in to comment.