diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 5209552dc1e2..f438949d428b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -437,18 +437,18 @@ def process(self, element, schema_mod_job_name_prefix): # Trigger potential schema modification by loading zero rows into the # destination table with the temporary table schema. schema_update_job_reference = self.bq_wrapper.perform_load_job( - destination=table_reference, - source_stream=io.BytesIO(), # file with zero rows - job_id=job_name, - schema=temp_table_schema, - write_disposition='WRITE_APPEND', - create_disposition='CREATE_NEVER', - additional_load_parameters=additional_parameters, - job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), - # JSON format is hardcoded because zero rows load(unlike AVRO) and - # a nested schema(unlike CSV, which a default one) is permitted. - source_format="NEWLINE_DELIMITED_JSON", - load_job_project_id=self._load_job_project_id) + destination=table_reference, + source_stream=io.BytesIO(), # file with zero rows + job_id=job_name, + schema=temp_table_schema, + write_disposition='WRITE_APPEND', + create_disposition='CREATE_NEVER', + additional_load_parameters=additional_parameters, + job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), + # JSON format is hardcoded because zero rows load(unlike AVRO) and + # a nested schema(unlike CSV, which a default one) is permitted. + source_format="NEWLINE_DELIMITED_JSON", + load_job_project_id=self._load_job_project_id) self.pending_jobs.append( GlobalWindows.windowed_value( (destination, schema_update_job_reference))) @@ -597,6 +597,7 @@ class TriggerLoadJobs(beam.DoFn): """ TEMP_TABLES = 'TemporaryTables' + ONGOING_JOBS = 'OngoingJobs' def __init__( self, @@ -718,6 +719,8 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): source_format=self.source_format, job_labels=self.bq_io_metadata.add_additional_bq_job_labels(), load_job_project_id=self.load_job_project_id) + yield pvalue.TaggedOutput( + TriggerLoadJobs.ONGOING_JOBS, (destination, job_reference)) self.pending_jobs.append( GlobalWindows.windowed_value((destination, job_reference))) @@ -1061,13 +1064,17 @@ def _load_data( load_job_project_id=self.load_job_project_id), load_job_name_pcv, *self.schema_side_inputs).with_outputs( - TriggerLoadJobs.TEMP_TABLES, main='main')) + TriggerLoadJobs.TEMP_TABLES, + TriggerLoadJobs.ONGOING_JOBS, + main='main')) - temp_tables_load_job_ids_pc = trigger_loads_outputs['main'] + finished_temp_tables_load_job_ids_pc = trigger_loads_outputs['main'] + temp_tables_load_job_ids_pc = trigger_loads_outputs[ + TriggerLoadJobs.ONGOING_JOBS] temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] schema_mod_job_ids_pc = ( - temp_tables_load_job_ids_pc + finished_temp_tables_load_job_ids_pc | beam.ParDo( UpdateDestinationSchema( project=self.project, @@ -1079,7 +1086,7 @@ def _load_data( schema_mod_job_name_pcv)) copy_job_outputs = ( - temp_tables_load_job_ids_pc + finished_temp_tables_load_job_ids_pc | beam.ParDo( TriggerCopyJobs( project=self.project, @@ -1120,7 +1127,9 @@ def _load_data( step_name=step_name, load_job_project_id=self.load_job_project_id), load_job_name_pcv, - *self.schema_side_inputs)) + *self.schema_side_inputs).with_outputs( + TriggerLoadJobs.ONGOING_JOBS, main='main') + )[TriggerLoadJobs.ONGOING_JOBS] destination_load_job_ids_pc = ( (temp_tables_load_job_ids_pc, destination_load_job_ids_pc)