From 15297da74bc76800c6c39f858e4a9d67653146ff Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 2 Nov 2022 22:47:35 +0000 Subject: [PATCH 1/2] emit load job IDs as soon as they come up --- .../apache_beam/io/gcp/bigquery_file_loads.py | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 5209552dc1e2..febea7675390 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -437,18 +437,18 @@ def process(self, element, schema_mod_job_name_prefix): # Trigger potential schema modification by loading zero rows into the # destination table with the temporary table schema. schema_update_job_reference = self.bq_wrapper.perform_load_job( - destination=table_reference, - source_stream=io.BytesIO(), # file with zero rows - job_id=job_name, - schema=temp_table_schema, - write_disposition='WRITE_APPEND', - create_disposition='CREATE_NEVER', - additional_load_parameters=additional_parameters, - job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), - # JSON format is hardcoded because zero rows load(unlike AVRO) and - # a nested schema(unlike CSV, which a default one) is permitted. - source_format="NEWLINE_DELIMITED_JSON", - load_job_project_id=self._load_job_project_id) + destination=table_reference, + source_stream=io.BytesIO(), # file with zero rows + job_id=job_name, + schema=temp_table_schema, + write_disposition='WRITE_APPEND', + create_disposition='CREATE_NEVER', + additional_load_parameters=additional_parameters, + job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), + # JSON format is hardcoded because zero rows load(unlike AVRO) and + # a nested schema(unlike CSV, which a default one) is permitted. + source_format="NEWLINE_DELIMITED_JSON", + load_job_project_id=self._load_job_project_id) self.pending_jobs.append( GlobalWindows.windowed_value( (destination, schema_update_job_reference))) @@ -597,6 +597,8 @@ class TriggerLoadJobs(beam.DoFn): """ TEMP_TABLES = 'TemporaryTables' + ONGOING_JOBS = 'OngoingJobs' + def __init__( self, @@ -718,6 +720,7 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): source_format=self.source_format, job_labels=self.bq_io_metadata.add_additional_bq_job_labels(), load_job_project_id=self.load_job_project_id) + yield pvalue.TaggedOutput(TriggerLoadJobs.ONGOING_JOBS, (destination, job_reference)) self.pending_jobs.append( GlobalWindows.windowed_value((destination, job_reference))) @@ -1061,13 +1064,14 @@ def _load_data( load_job_project_id=self.load_job_project_id), load_job_name_pcv, *self.schema_side_inputs).with_outputs( - TriggerLoadJobs.TEMP_TABLES, main='main')) + TriggerLoadJobs.TEMP_TABLES, TriggerLoadJobs.ONGOING_JOBS, main='main')) - temp_tables_load_job_ids_pc = trigger_loads_outputs['main'] + finished_temp_tables_load_job_ids_pc = trigger_loads_outputs['main'] + temp_tables_load_job_ids_pc = trigger_loads_outputs[TriggerLoadJobs.ONGOING_JOBS] temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] schema_mod_job_ids_pc = ( - temp_tables_load_job_ids_pc + finished_temp_tables_load_job_ids_pc | beam.ParDo( UpdateDestinationSchema( project=self.project, @@ -1079,7 +1083,7 @@ def _load_data( schema_mod_job_name_pcv)) copy_job_outputs = ( - temp_tables_load_job_ids_pc + finished_temp_tables_load_job_ids_pc | beam.ParDo( TriggerCopyJobs( project=self.project, @@ -1120,7 +1124,8 @@ def _load_data( step_name=step_name, load_job_project_id=self.load_job_project_id), load_job_name_pcv, - *self.schema_side_inputs)) + *self.schema_side_inputs).with_outputs( + TriggerLoadJobs.ONGOING_JOBS, main='main'))[TriggerLoadJobs.ONGOING_JOBS] destination_load_job_ids_pc = ( (temp_tables_load_job_ids_pc, destination_load_job_ids_pc) From 9c3ce5debec886612d98ae56f17a8a20d4e45a31 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 2 Nov 2022 22:54:43 +0000 Subject: [PATCH 2/2] style fix --- .../apache_beam/io/gcp/bigquery_file_loads.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index febea7675390..f438949d428b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -599,7 +599,6 @@ class TriggerLoadJobs(beam.DoFn): TEMP_TABLES = 'TemporaryTables' ONGOING_JOBS = 'OngoingJobs' - def __init__( self, schema=None, @@ -720,7 +719,8 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): source_format=self.source_format, job_labels=self.bq_io_metadata.add_additional_bq_job_labels(), load_job_project_id=self.load_job_project_id) - yield pvalue.TaggedOutput(TriggerLoadJobs.ONGOING_JOBS, (destination, job_reference)) + yield pvalue.TaggedOutput( + TriggerLoadJobs.ONGOING_JOBS, (destination, job_reference)) self.pending_jobs.append( GlobalWindows.windowed_value((destination, job_reference))) @@ -1064,10 +1064,13 @@ def _load_data( load_job_project_id=self.load_job_project_id), load_job_name_pcv, *self.schema_side_inputs).with_outputs( - TriggerLoadJobs.TEMP_TABLES, TriggerLoadJobs.ONGOING_JOBS, main='main')) + TriggerLoadJobs.TEMP_TABLES, + TriggerLoadJobs.ONGOING_JOBS, + main='main')) finished_temp_tables_load_job_ids_pc = trigger_loads_outputs['main'] - temp_tables_load_job_ids_pc = trigger_loads_outputs[TriggerLoadJobs.ONGOING_JOBS] + temp_tables_load_job_ids_pc = trigger_loads_outputs[ + TriggerLoadJobs.ONGOING_JOBS] temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] schema_mod_job_ids_pc = ( @@ -1125,7 +1128,8 @@ def _load_data( load_job_project_id=self.load_job_project_id), load_job_name_pcv, *self.schema_side_inputs).with_outputs( - TriggerLoadJobs.ONGOING_JOBS, main='main'))[TriggerLoadJobs.ONGOING_JOBS] + TriggerLoadJobs.ONGOING_JOBS, main='main') + )[TriggerLoadJobs.ONGOING_JOBS] destination_load_job_ids_pc = ( (temp_tables_load_job_ids_pc, destination_load_job_ids_pc)