From 3a1725e121f99519e839477dd0a52df041d205bf Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Wed, 28 Jun 2023 15:55:21 +0530 Subject: [PATCH 1/9] Fixing Issue - Provided project_id parameter not getting used to submit bigquery job --- airflow/providers/google/cloud/transfers/bigquery_to_gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 7ec62db9bfbd0..e46df569aa274 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -183,7 +183,7 @@ def _submit_job( return hook.insert_job( configuration=configuration, - project_id=configuration["extract"]["sourceTable"]["projectId"], + project_id=self.project_id or configuration["extract"]["sourceTable"]["projectId"], location=self.location, job_id=job_id, timeout=self.result_timeout, From 002987e31226be4fd87a9194cb288abf3daee907 Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Thu, 29 Jun 2023 20:57:54 +0530 Subject: [PATCH 2/9] updating test --- .../google/cloud/transfers/test_bigquery_to_gcs.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py index b1d0563f58eb4..a2b19fdc49e6b 100644 --- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py @@ -31,6 +31,7 @@ TEST_DATASET = "test-dataset" TEST_TABLE_ID = "test-table-id" PROJECT_ID = "test-project-id" +JOB_PROJECT_ID = "job-project-id" class TestBigQueryToGCSOperator: @@ -66,7 +67,7 @@ def test_execute(self, mock_hook): mock_hook.return_value.split_tablename.return_value = (PROJECT_ID, TEST_DATASET, TEST_TABLE_ID) mock_hook.return_value.generate_job_id.return_value = real_job_id mock_hook.return_value.insert_job.return_value = MagicMock(job_id="real_job_id", error_result=False) - mock_hook.return_value.project_id = PROJECT_ID + mock_hook.return_value.project_id = JOB_PROJECT_ID operator = BigQueryToGCSOperator( task_id=TASK_ID, @@ -77,13 +78,14 @@ def test_execute(self, mock_hook): field_delimiter=field_delimiter, print_header=print_header, labels=labels, + project_id=JOB_PROJECT_ID ) operator.execute(context=mock.MagicMock()) mock_hook.return_value.insert_job.assert_called_once_with( job_id="123456_hash", configuration=expected_configuration, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, location=None, timeout=None, retry=DEFAULT_RETRY, @@ -122,10 +124,10 @@ def test_execute_deferrable_mode(self, mock_hook): mock_hook.return_value.split_tablename.return_value = (PROJECT_ID, TEST_DATASET, TEST_TABLE_ID) mock_hook.return_value.generate_job_id.return_value = real_job_id mock_hook.return_value.insert_job.return_value = MagicMock(job_id="real_job_id", error_result=False) - mock_hook.return_value.project_id = PROJECT_ID + mock_hook.return_value.project_id = JOB_PROJECT_ID operator = BigQueryToGCSOperator( - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, task_id=TASK_ID, source_project_dataset_table=source_project_dataset_table, destination_cloud_storage_uris=destination_cloud_storage_uris, @@ -146,7 +148,7 @@ def test_execute_deferrable_mode(self, mock_hook): mock_hook.return_value.insert_job.assert_called_once_with( configuration=expected_configuration, job_id="123456_hash", - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, location=None, timeout=None, retry=DEFAULT_RETRY, From fbc5d108b2170aeafb0919e46eb16a3e813ede1d Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Tue, 4 Jul 2023 10:26:34 +0530 Subject: [PATCH 3/9] changing fallback job project_id from table's (storage) project to hook's project --- airflow/providers/google/cloud/transfers/bigquery_to_gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index e46df569aa274..f8de2f6d6fcd9 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -183,7 +183,7 @@ def _submit_job( return hook.insert_job( configuration=configuration, - project_id=self.project_id or configuration["extract"]["sourceTable"]["projectId"], + project_id=self.project_id or hook.project_id, location=self.location, job_id=job_id, timeout=self.result_timeout, From 2361ab002bb79bad4f7cfef2b2193b1c7eb42a6d Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Tue, 4 Jul 2023 17:09:56 +0530 Subject: [PATCH 4/9] fix code formatting issue in test_bigquery_to_gcs.py --- tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py index a2b19fdc49e6b..b7bf8bef62d8c 100644 --- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py @@ -78,7 +78,7 @@ def test_execute(self, mock_hook): field_delimiter=field_delimiter, print_header=print_header, labels=labels, - project_id=JOB_PROJECT_ID + project_id=JOB_PROJECT_ID, ) operator.execute(context=mock.MagicMock()) From a829b9bb627053b5f8b6e68053afdadf297d2426 Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Tue, 4 Jul 2023 17:11:02 +0530 Subject: [PATCH 5/9] fix project_id parameter ignore/override issue in gcs_to_bigquery --- .../google/cloud/transfers/gcs_to_bigquery.py | 29 +++--- .../cloud/transfers/test_gcs_to_bigquery.py | 92 +++++++++++++------ 2 files changed, 74 insertions(+), 47 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 88b6d09323708..d13a2466aad74 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -301,7 +301,7 @@ def _submit_job( # Submit a new job without waiting for it to complete. return hook.insert_job( configuration=self.configuration, - project_id=self.project_id, + project_id=self.project_id or hook.project_id, location=self.location, job_id=job_id, timeout=self.result_timeout, @@ -359,7 +359,7 @@ def execute(self, context: Context): if self.external_table: self.log.info("Creating a new BigQuery table for storing data...") - table_obj_api_repr = self._create_empty_table() + table_obj_api_repr = self._create_external_table() BigQueryTableLink.persist( context=context, @@ -381,7 +381,7 @@ def execute(self, context: Context): except Conflict: # If the job already exists retrieve it job = self.hook.get_job( - project_id=self.hook.project_id, + project_id=self.project_id or self.hook.project_id, location=self.location, job_id=job_id, ) @@ -414,12 +414,12 @@ def execute(self, context: Context): persist_kwargs = { "context": context, "task_instance": self, - "project_id": self.hook.project_id, "table_id": table, } if not isinstance(table, str): persist_kwargs["table_id"] = table["tableId"] persist_kwargs["dataset_id"] = table["datasetId"] + persist_kwargs["project_id"] = table["projectId"] BigQueryTableLink.persist(**persist_kwargs) self.job_id = job.job_id @@ -430,7 +430,7 @@ def execute(self, context: Context): trigger=BigQueryInsertJobTrigger( conn_id=self.gcp_conn_id, job_id=self.job_id, - project_id=self.hook.project_id, + project_id=self.project_id or self.hook.project_id, ), method_name="execute_complete", ) @@ -475,7 +475,7 @@ def _find_max_value_in_column(self): } } try: - job_id = hook.insert_job(configuration=self.configuration, project_id=hook.project_id) + job_id = hook.insert_job(configuration=self.configuration, project_id=self.project_id or hook.project_id) rows = list(hook.get_job(job_id=job_id, location=self.location).result()) except BadRequest as e: if "Unrecognized name:" in e.message: @@ -498,12 +498,7 @@ def _find_max_value_in_column(self): else: raise RuntimeError(f"The {select_command} returned no rows!") - def _create_empty_table(self): - self.project_id, dataset_id, table_id = self.hook.split_tablename( - table_input=self.destination_project_dataset_table, - default_project_id=self.project_id or self.hook.project_id, - ) - + def _create_external_table(self): external_config_api_repr = { "autodetect": self.autodetect, "sourceFormat": self.source_format, @@ -549,7 +544,7 @@ def _create_empty_table(self): # build table definition table = Table( - table_ref=TableReference.from_string(self.destination_project_dataset_table, self.project_id) + table_ref=TableReference.from_string(self.destination_project_dataset_table, self.hook.project_id) ) table.external_data_configuration = external_config if self.labels: @@ -567,7 +562,7 @@ def _create_empty_table(self): self.log.info("Creating external table: %s", self.destination_project_dataset_table) self.hook.create_empty_table( table_resource=table_obj_api_repr, - project_id=self.project_id, + project_id=table.project, location=self.location, exists_ok=True, ) @@ -575,9 +570,9 @@ def _create_empty_table(self): return table_obj_api_repr def _use_existing_table(self): - self.project_id, destination_dataset, destination_table = self.hook.split_tablename( + destination_project_id, destination_dataset, destination_table = self.hook.split_tablename( table_input=self.destination_project_dataset_table, - default_project_id=self.project_id or self.hook.project_id, + default_project_id=self.hook.project_id, var_name="destination_project_dataset_table", ) @@ -597,7 +592,7 @@ def _use_existing_table(self): "autodetect": self.autodetect, "createDisposition": self.create_disposition, "destinationTable": { - "projectId": self.project_id, + "projectId": destination_project_id, "datasetId": destination_dataset, "tableId": destination_table, }, diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py index 80958f281f630..1a0f38183754d 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py @@ -40,6 +40,7 @@ PROJECT_ID = "test-project" DATASET = "dataset" TABLE = "table" +JOB_PROJECT_ID = "job-project-id" WRITE_DISPOSITION = "WRITE_TRUNCATE" MAX_ID_KEY = "id" TEST_DATASET_LOCATION = "US" @@ -85,6 +86,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): schema_fields=SCHEMA_FIELDS, max_id_key=MAX_ID_KEY, external_table=True, + project_id=JOB_PROJECT_ID, ) result = operator.execute(context=MagicMock()) @@ -93,7 +95,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -123,7 +125,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): "schemaUpdateOptions": [], } }, - project_id=hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ) @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) @@ -145,6 +147,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook max_id_key=MAX_ID_KEY, write_disposition=WRITE_DISPOSITION, external_table=False, + project_id=JOB_PROJECT_ID, ) result = operator.execute(context=MagicMock()) @@ -172,7 +175,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ), @@ -184,7 +187,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook "schemaUpdateOptions": [], } }, - project_id=hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ), ] @@ -208,6 +211,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): max_id_key=MAX_ID_KEY, write_disposition=WRITE_DISPOSITION, external_table=False, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -233,7 +237,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ), @@ -245,7 +249,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): "schemaUpdateOptions": [], } }, - project_id=hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ), ] @@ -269,13 +273,14 @@ def test_labels_external_table_should_execute_successfully(self, hook): write_disposition=WRITE_DISPOSITION, external_table=True, labels=LABELS, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": LABELS, @@ -316,6 +321,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): schema_fields=SCHEMA_FIELDS, external_table=False, labels=LABELS, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -342,7 +348,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ) @@ -368,13 +374,14 @@ def test_description_external_table_should_execute_successfully(self, hook): schema_fields=SCHEMA_FIELDS, description=DESCRIPTION, external_table=True, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -416,6 +423,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho write_disposition=WRITE_DISPOSITION, external_table=False, description=DESCRIPTION, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -441,7 +449,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho fieldDelimiter=",", ), }, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, location=None, job_id=pytest.real_job_id, timeout=None, @@ -467,6 +475,7 @@ def test_source_objs_as_list_external_table_should_execute_successfully(self, ho write_disposition=WRITE_DISPOSITION, destination_project_dataset_table=TEST_EXPLICIT_DEST, external_table=True, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -474,7 +483,7 @@ def test_source_objs_as_list_external_table_should_execute_successfully(self, ho hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -515,6 +524,7 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully( write_disposition=WRITE_DISPOSITION, destination_project_dataset_table=TEST_EXPLICIT_DEST, external_table=False, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -545,7 +555,7 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully( job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ) @@ -569,6 +579,7 @@ def test_source_objs_as_string_external_table_should_execute_successfully(self, write_disposition=WRITE_DISPOSITION, destination_project_dataset_table=TEST_EXPLICIT_DEST, external_table=True, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -576,7 +587,7 @@ def test_source_objs_as_string_external_table_should_execute_successfully(self, hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -615,6 +626,7 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull destination_project_dataset_table=TEST_EXPLICIT_DEST, write_disposition=WRITE_DISPOSITION, external_table=False, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -645,7 +657,7 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ) @@ -672,6 +684,7 @@ def test_schema_obj_external_table_should_execute_successfully(self, bq_hook, gc write_disposition=WRITE_DISPOSITION, destination_project_dataset_table=TEST_EXPLICIT_DEST, external_table=True, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -679,7 +692,7 @@ def test_schema_obj_external_table_should_execute_successfully(self, bq_hook, gc bq_hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -723,6 +736,7 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_ destination_project_dataset_table=TEST_EXPLICIT_DEST, write_disposition=WRITE_DISPOSITION, external_table=False, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -746,7 +760,7 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_ "encoding": "UTF-8", } }, - project_id=bq_hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, location=None, job_id=pytest.real_job_id, timeout=None, @@ -774,6 +788,7 @@ def test_autodetect_none_external_table_should_execute_successfully(self, hook): destination_project_dataset_table=TEST_EXPLICIT_DEST, external_table=True, autodetect=None, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -781,7 +796,7 @@ def test_autodetect_none_external_table_should_execute_successfully(self, hook): hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -820,6 +835,7 @@ def test_autodetect_none_without_external_table_should_execute_successfully(self write_disposition=WRITE_DISPOSITION, autodetect=None, external_table=False, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -842,7 +858,7 @@ def test_autodetect_none_without_external_table_should_execute_successfully(self "encoding": "UTF-8", } }, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, location=None, job_id=pytest.real_job_id, timeout=None, @@ -870,6 +886,7 @@ def test_execute_should_throw_ex_when_no_bucket_specified(self, hook): max_id_key=MAX_ID_KEY, write_disposition=WRITE_DISPOSITION, external_table=False, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -890,6 +907,7 @@ def test_execute_should_throw_ex_when_no_source_objects_specified(self, hook): max_id_key=MAX_ID_KEY, write_disposition=WRITE_DISPOSITION, external_table=False, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -912,6 +930,7 @@ def test_execute_should_throw_ex_when_no_destination_project_dataset_table_speci max_id_key=MAX_ID_KEY, write_disposition=WRITE_DISPOSITION, external_table=False, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -942,6 +961,7 @@ def test_source_format_check_should_throw_ex_when_incorrect_source_type( external_table=False, autodetect=False, source_format="incorrect", + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -973,6 +993,7 @@ def test_schema_fields_integer_scanner_external_table_should_execute_successfull max_id_key=MAX_ID_KEY, external_table=True, autodetect=True, + project_id=JOB_PROJECT_ID, ) result = operator.execute(context=MagicMock()) @@ -981,7 +1002,7 @@ def test_schema_fields_integer_scanner_external_table_should_execute_successfull bq_hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -1010,7 +1031,7 @@ def test_schema_fields_integer_scanner_external_table_should_execute_successfull "schemaUpdateOptions": [], } }, - project_id=bq_hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ) @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) @@ -1041,6 +1062,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc max_id_key=MAX_ID_KEY, external_table=False, autodetect=True, + project_id=JOB_PROJECT_ID, ) result = operator.execute(context=MagicMock()) @@ -1067,7 +1089,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc job_id=pytest.real_job_id, location=None, nowait=True, - project_id=bq_hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ), @@ -1079,7 +1101,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc "schemaUpdateOptions": [], } }, - project_id=bq_hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ), ] @@ -1104,6 +1126,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self, schema_fields=SCHEMA_FIELDS_INT, external_table=False, autodetect=True, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) @@ -1129,7 +1152,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self, job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=JOB_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ) @@ -1156,13 +1179,14 @@ def test_schema_fields_external_table_should_execute_successfully(self, hook): schema_fields=SCHEMA_FIELDS_INT, external_table=True, autodetect=True, + project_id=JOB_PROJECT_ID, ) operator.execute(context=MagicMock()) hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=JOB_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -1208,6 +1232,7 @@ def test_execute_without_external_table_async_should_execute_successfully(self, external_table=False, autodetect=True, deferrable=True, + project_id=JOB_PROJECT_ID, ) with pytest.raises(TaskDeferred) as exc: @@ -1233,6 +1258,7 @@ def test_execute_without_external_table_async_should_throw_ex_when_event_status_ external_table=False, autodetect=True, deferrable=True, + project_id=JOB_PROJECT_ID, ) operator.execute_complete( context=None, event={"status": "error", "message": "test failure message"} @@ -1253,6 +1279,7 @@ def test_execute_logging_without_external_table_async_should_execute_successfull external_table=False, autodetect=True, deferrable=True, + project_id=JOB_PROJECT_ID, ) with mock.patch.object(operator.log, "info") as mock_log_info: operator.execute_complete( @@ -1286,6 +1313,7 @@ def test_execute_without_external_table_generate_job_id_async_should_execute_suc external_table=False, autodetect=True, deferrable=True, + project_id=JOB_PROJECT_ID, ) with pytest.raises(TaskDeferred): @@ -1326,6 +1354,7 @@ def test_execute_without_external_table_reattach_async_should_execute_successful external_table=False, autodetect=True, deferrable=True, + project_id=JOB_PROJECT_ID, ) with pytest.raises(TaskDeferred): @@ -1334,7 +1363,7 @@ def test_execute_without_external_table_reattach_async_should_execute_successful hook.return_value.get_job.assert_called_once_with( location=TEST_DATASET_LOCATION, job_id=pytest.real_job_id, - project_id=hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ) job._begin.assert_called_once_with() @@ -1365,6 +1394,7 @@ def test_execute_without_external_table_force_rerun_async_should_execute_success external_table=False, autodetect=True, deferrable=True, + project_id=JOB_PROJECT_ID, ) with pytest.raises(AirflowException) as exc: @@ -1381,7 +1411,7 @@ def test_execute_without_external_table_force_rerun_async_should_execute_success hook.return_value.get_job.assert_called_once_with( location=TEST_DATASET_LOCATION, job_id=pytest.real_job_id, - project_id=hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ) @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) @@ -1406,6 +1436,7 @@ def test_schema_fields_without_external_table_async_should_execute_successfully( external_table=False, autodetect=True, deferrable=True, + project_id=JOB_PROJECT_ID, ) with pytest.raises(TaskDeferred): @@ -1437,7 +1468,7 @@ def test_schema_fields_without_external_table_async_should_execute_successfully( schema={"fields": SCHEMA_FIELDS}, ), }, - project_id=bq_hook.return_value.project_id, + project_id=JOB_PROJECT_ID, location=None, job_id=pytest.real_job_id, timeout=None, @@ -1452,7 +1483,7 @@ def test_schema_fields_without_external_table_async_should_execute_successfully( "schemaUpdateOptions": [], } }, - project_id=bq_hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ), ] @@ -1482,6 +1513,7 @@ def test_schema_fields_int_without_external_table_async_should_execute_successfu external_table=False, autodetect=True, deferrable=True, + project_id=JOB_PROJECT_ID, ) with pytest.raises(TaskDeferred): @@ -1512,7 +1544,7 @@ def test_schema_fields_int_without_external_table_async_should_execute_successfu encoding="UTF-8", ), }, - project_id=bq_hook.return_value.project_id, + project_id=JOB_PROJECT_ID, location=None, job_id=pytest.real_job_id, timeout=None, @@ -1527,7 +1559,7 @@ def test_schema_fields_int_without_external_table_async_should_execute_successfu "schemaUpdateOptions": [], } }, - project_id=bq_hook.return_value.project_id, + project_id=JOB_PROJECT_ID, ), ] From 0abf530897b91d315968539b683f5e407b0e344e Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Tue, 4 Jul 2023 17:32:16 +0530 Subject: [PATCH 6/9] prefer project_id param in case of deferred runs --- airflow/providers/google/cloud/transfers/bigquery_to_gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index f8de2f6d6fcd9..9d1249d775ad2 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -254,7 +254,7 @@ def execute(self, context: Context): trigger=BigQueryInsertJobTrigger( conn_id=self.gcp_conn_id, job_id=job_id, - project_id=self.hook.project_id, + project_id=self.project_id or self.hook.project_id, ), method_name="execute_complete", ) From 2ee064ad8073ce7d0df5b54010f88a0bd7b4b70e Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Tue, 4 Jul 2023 17:39:20 +0530 Subject: [PATCH 7/9] removing project_id param as fallback for bq table storage. Now hook's project_id is the only fallback if table project is not specified. --- airflow/providers/google/cloud/transfers/bigquery_to_gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 9d1249d775ad2..7a0f9248df27a 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -145,7 +145,7 @@ def _handle_job_error(job: BigQueryJob | UnknownJob) -> None: def _prepare_configuration(self): source_project, source_dataset, source_table = self.hook.split_tablename( table_input=self.source_project_dataset_table, - default_project_id=self.project_id or self.hook.project_id, + default_project_id=self.hook.project_id, var_name="source_project_dataset_table", ) From 072f1c94d099636d1fe49eae5b2fc8dc6c9586a1 Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Tue, 4 Jul 2023 19:03:47 +0530 Subject: [PATCH 8/9] fixing static check failure --- airflow/providers/google/cloud/transfers/gcs_to_bigquery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index d13a2466aad74..cba176b56e00b 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -475,7 +475,9 @@ def _find_max_value_in_column(self): } } try: - job_id = hook.insert_job(configuration=self.configuration, project_id=self.project_id or hook.project_id) + job_id = hook.insert_job( + configuration=self.configuration, project_id=self.project_id or hook.project_id + ) rows = list(hook.get_job(job_id=job_id, location=self.location).result()) except BadRequest as e: if "Unrecognized name:" in e.message: From d4c1ec0c4e485c509876c63bc2aec1cfabab555e Mon Sep 17 00:00:00 2001 From: avinashpandeshwar Date: Thu, 6 Jul 2023 09:40:47 +0530 Subject: [PATCH 9/9] fixed invalid project reference in _create_external_table function --- airflow/providers/google/cloud/transfers/gcs_to_bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 8b68b1a9453d6..46c5adc95b88a 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -564,7 +564,7 @@ def _create_external_table(self): self.log.info("Creating external table: %s", self.destination_project_dataset_table) self.hook.create_empty_table( table_resource=table_obj_api_repr, - project_id=table.project, + project_id=self.project_id or self.hook.project_id, location=self.location, exists_ok=True, )