Skip to content

Commit

Permalink
[HUDI-7242] Avoid unnecessary bigquery table update when using sync t…
Browse files Browse the repository at this point in the history
…ool (#10374)

Co-authored-by: jp0317 <[email protected]>
  • Loading branch information
jp0317 and jp0317 authored Dec 22, 2023
1 parent 55fe68e commit f0356de
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void syncHoodieTable() {

private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableName) {
if (bqSyncClient.tableExists(tableName)) {
LOG.info(tableName + " already exists");
LOG.info(tableName + " already exists. Skip table creation.");
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -182,16 +183,19 @@ public void updateTableSchema(String tableName, Schema schema, List<String> part
Table existingTable = bigquery.getTable(TableId.of(projectId, datasetName, tableName));
ExternalTableDefinition definition = existingTable.getDefinition();
Schema remoteTableSchema = definition.getSchema();
// Add the partition fields into the schema to avoid conflicts while updating
List<Field> updatedTableFields = remoteTableSchema.getFields().stream()
List<Field> finalTableFields = new ArrayList<>(schema.getFields());
// Add the partition fields into the schema to avoid conflicts while updating. And ensure the partition fields are at the end to
// avoid unnecessary updates.
List<Field> bqPartitionFields = remoteTableSchema.getFields().stream()
.filter(field -> partitionFields.contains(field.getName()))
.collect(Collectors.toList());
updatedTableFields.addAll(schema.getFields());
Schema finalSchema = Schema.of(updatedTableFields);
finalTableFields.addAll(bqPartitionFields);
Schema finalSchema = Schema.of(finalTableFields);
boolean sameSchema = definition.getSchema() != null && definition.getSchema().equals(finalSchema);
boolean samePartitionFilter = partitionFields.isEmpty()
|| (requirePartitionFilter == (definition.getHivePartitioningOptions().getRequirePartitionFilter() != null && definition.getHivePartitioningOptions().getRequirePartitionFilter()));
if (sameSchema && samePartitionFilter) {
LOG.info("No table update is needed.");
return; // No need to update schema.
}
ExternalTableDefinition.Builder builder = definition.toBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,34 @@
import org.apache.hudi.sync.common.HoodieSyncConfig;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.HivePartitioningOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;

public class TestHoodieBigQuerySyncClient {
private static final String PROJECT_ID = "test_project";
Expand Down Expand Up @@ -125,4 +133,31 @@ void createTableWithManifestFile_nonPartitioned() throws Exception {
String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) OPTIONS (enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", "
+ "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", PROJECT_ID, TEST_DATASET, TEST_TABLE, MANIFEST_FILE_URI));
}

@Test
void skipUpdatingSchema_partitioned() throws Exception {
BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
client = new HoodieBigQuerySyncClient(config, mockBigQuery);
Table mockTable = mock(Table.class);
ExternalTableDefinition mockTableDefinition = mock(ExternalTableDefinition.class);
// The table schema has no change: it contains a "field" and a "partition_field".
Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
List<String> partitionFields = new ArrayList<String>();
partitionFields.add("partition_field");
List<Field> bqFields = new ArrayList<Field>();
// The "partition_field" always follows "field".
bqFields.add(Field.of("field", StandardSQLTypeName.STRING));
bqFields.add(Field.of("partition_field", StandardSQLTypeName.STRING));
Schema bqSchema = Schema.of(bqFields);
HivePartitioningOptions hivePartitioningOptions = HivePartitioningOptions.newBuilder().setRequirePartitionFilter(true).build();

when(mockBigQuery.getTable(any())).thenReturn(mockTable);
when(mockTable.getDefinition()).thenReturn(mockTableDefinition);
when(mockTableDefinition.getSchema()).thenReturn(bqSchema);
when(mockTableDefinition.getHivePartitioningOptions()).thenReturn(hivePartitioningOptions);

client.updateTableSchema(TEST_TABLE, schema, partitionFields);
// Expect no update.
verify(mockBigQuery, never()).update(mockTable);
}
}

0 comments on commit f0356de

Please sign in to comment.