From 26f9e017dbe7621abe1877988f509dd00885bb7d Mon Sep 17 00:00:00 2001 From: jp0317 Date: Wed, 20 Dec 2023 06:28:35 +0000 Subject: [PATCH] avoid unnecessary table update --- .../hudi/gcp/bigquery/BigQuerySyncTool.java | 2 +- .../bigquery/HoodieBigQuerySyncClient.java | 12 ++++--- .../TestHoodieBigQuerySyncClient.java | 35 +++++++++++++++++++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java index 28c071e52315a..cc1eef340e911 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java @@ -115,7 +115,7 @@ public void syncHoodieTable() { private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableName) { if (bqSyncClient.tableExists(tableName)) { - LOG.info(tableName + " already exists"); + LOG.info(tableName + " already exists. Skip table creation."); return true; } return false; diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java index af56194214df3..5a23a4079ae24 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java @@ -47,6 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -182,16 +183,19 @@ public void updateTableSchema(String tableName, Schema schema, List part Table existingTable = bigquery.getTable(TableId.of(projectId, datasetName, tableName)); ExternalTableDefinition definition = existingTable.getDefinition(); Schema remoteTableSchema = definition.getSchema(); - // Add the partition fields into the schema to avoid conflicts while updating - List updatedTableFields = remoteTableSchema.getFields().stream() + List finalTableFields = new ArrayList<>(schema.getFields()); + // Add the partition fields into the schema to avoid conflicts while updating. And ensure the partition fields are at the end to + // avoid unnecessary updates. + List bqPartitionFields = remoteTableSchema.getFields().stream() .filter(field -> partitionFields.contains(field.getName())) .collect(Collectors.toList()); - updatedTableFields.addAll(schema.getFields()); - Schema finalSchema = Schema.of(updatedTableFields); + finalTableFields.addAll(bqPartitionFields); + Schema finalSchema = Schema.of(finalTableFields); boolean sameSchema = definition.getSchema() != null && definition.getSchema().equals(finalSchema); boolean samePartitionFilter = partitionFields.isEmpty() || (requirePartitionFilter == (definition.getHivePartitioningOptions().getRequirePartitionFilter() != null && definition.getHivePartitioningOptions().getRequirePartitionFilter())); if (sameSchema && samePartitionFilter) { + LOG.info("No table update is needed."); return; // No need to update schema. } ExternalTableDefinition.Builder builder = definition.toBuilder(); diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java index 37b2800b563dd..a3cae4c985a15 100644 --- a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java @@ -25,13 +25,16 @@ import org.apache.hudi.sync.common.HoodieSyncConfig; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.ExternalTableDefinition; import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.HivePartitioningOptions; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobStatus; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.Table; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -39,12 +42,17 @@ import org.junit.jupiter.api.io.TempDir; import org.mockito.ArgumentCaptor; +import java.util.ArrayList; import java.nio.file.Path; +import java.util.List; import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; public class TestHoodieBigQuerySyncClient { private static final String PROJECT_ID = "test_project"; @@ -125,4 +133,31 @@ void createTableWithManifestFile_nonPartitioned() throws Exception { String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) OPTIONS (enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", " + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", PROJECT_ID, TEST_DATASET, TEST_TABLE, MANIFEST_FILE_URI)); } + + @Test + void skipUpdatingSchema_partitioned() throws Exception { + BigQuerySyncConfig config = new BigQuerySyncConfig(properties); + client = new HoodieBigQuerySyncClient(config, mockBigQuery); + Table mockTable = mock(Table.class); + ExternalTableDefinition mockTableDefinition = mock(ExternalTableDefinition.class); + // The table schema has no change: it contains a "field" and a "partition_field". + Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING)); + List partitionFields = new ArrayList(); + partitionFields.add("partition_field"); + List bqFields = new ArrayList(); + // The "partition_field" always follows "field". + bqFields.add(Field.of("field", StandardSQLTypeName.STRING)); + bqFields.add(Field.of("partition_field", StandardSQLTypeName.STRING)); + Schema bqSchema = Schema.of(bqFields); + HivePartitioningOptions hivePartitioningOptions = HivePartitioningOptions.newBuilder().setRequirePartitionFilter(true).build(); + + when(mockBigQuery.getTable(any())).thenReturn(mockTable); + when(mockTable.getDefinition()).thenReturn(mockTableDefinition); + when(mockTableDefinition.getSchema()).thenReturn(bqSchema); + when(mockTableDefinition.getHivePartitioningOptions()).thenReturn(hivePartitioningOptions); + + client.updateTableSchema(TEST_TABLE, schema, partitionFields); + // Expect no update. + verify(mockBigQuery, never()).update(mockTable); + } }