diff --git a/CHANGES.md b/CHANGES.md index 18b7ed989fb6..35eee824a6c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -85,12 +85,13 @@ * Avoids Cassandra syntax error when user-defined query has no where clause in it (Java) ([#24829](https://github.com/apache/beam/issues/24829)). * Fixed JDBC connection failures (Java) during handshake due to deprecated TLSv1(.1) protocol for the JDK. ([#24623](https://github.com/apache/beam/issues/24623)) +* Fixed Python BigQuery Batch Load write may truncate valid data when deposition sets to WRITE_TRUNCATE and incoming data is large (Python) ([#24623](https://github.com/apache/beam/issues/24535)). ## Known Issues * ([#X](https://github.com/apache/beam/issues/X)). -# [2.44.0] - Unreleased +# [2.44.0] - 2023-01-12 ## Highlights diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 9f889dbd0231..0e06dc94c9aa 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -1093,7 +1093,7 @@ def _load_data( load_job_project_id=self.load_job_project_id), schema_mod_job_name_pcv)) - if self.create_disposition == 'WRITE_TRUNCATE': + if self.write_disposition == 'WRITE_TRUNCATE': # All loads going to the same table must be processed together so that # the truncation happens only once. See BEAM-24535. finished_temp_tables_load_job_ids_list_pc = ( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 0c0e136eae4b..797ea0333ec0 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -715,6 +715,47 @@ def test_multiple_partition_files(self): equal_to([6]), label='CheckCopyJobCount') + @mock.patch( + 'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process', + wraps=lambda *x: None) + def test_multiple_partition_files_write_truncate(self, mock_call_process): + destination = 'project1:dataset1.table1' + + job_reference = bigquery_api.JobReference() + job_reference.projectId = 'project1' + job_reference.jobId = 'job_name1' + result_job = mock.Mock() + result_job.jobReference = job_reference + + mock_job = mock.Mock() + mock_job.status.state = 'DONE' + mock_job.status.errorResult = None + mock_job.jobReference = job_reference + + bq_client = mock.Mock() + bq_client.jobs.Get.return_value = mock_job + + bq_client.jobs.Insert.return_value = result_job + bq_client.tables.Delete.return_value = None + + with TestPipeline('DirectRunner') as p: + _ = ( + p + | beam.Create(_ELEMENTS, reshuffle=False) + | bqfl.BigQueryBatchFileLoads( + destination, + custom_gcs_temp_location=self._new_tempdir(), + test_client=bq_client, + validate=False, + temp_file_format=bigquery_tools.FileFormat.JSON, + max_file_size=45, + max_partition_size=80, + max_files_per_partition=2, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) + + # TriggerCopyJob only processes once + self.assertEqual(mock_call_process.call_count, 1) + @parameterized.expand([ param(is_streaming=False, with_auto_sharding=False), param(is_streaming=True, with_auto_sharding=False),