From ff6f6203d470e6b5c534bf2a5cb547e76e6b4d2b Mon Sep 17 00:00:00 2001 From: XiaoZ <57973980+xiaozcy@users.noreply.github.com> Date: Tue, 14 May 2024 13:35:30 +0800 Subject: [PATCH] [#3347] improvement(jdbc-doris): support creating Doris table with partition (#3350) ### What changes were proposed in this pull request? support creating Doris table with partition ### Why are the changes needed? Fix: #3347 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? IT --------- Co-authored-by: zhanghan18 --- .../doris/operation/DorisTableOperations.java | 71 ++++++++++++++++--- .../test/DorisTableOperationsIT.java | 70 ++++++++++++++++++ 2 files changed, 130 insertions(+), 11 deletions(-) diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java index 27dd6912865..3a1100e576b 100644 --- a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java @@ -19,10 +19,11 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.distributions.Strategy; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.rel.indexes.Indexes; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.sql.Connection; import java.sql.PreparedStatement; @@ -35,8 +36,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; @@ -73,7 +76,7 @@ protected String generateCreateTableSql( Map properties, Transform[] partitioning, Distribution distribution, - com.datastrato.gravitino.rel.indexes.Index[] indexes) { + Index[] indexes) { validateIncrementCol(columns); validateDistribution(distribution, columns); @@ -107,6 +110,9 @@ protected String generateCreateTableSql( sqlBuilder.append(" COMMENT \"").append(comment).append("\""); } + // Add Partition Info + appendPartitionSql(partitioning, columns, sqlBuilder); + // Add distribution info if (distribution.strategy() == Strategy.HASH) { sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY HASH("); @@ -126,12 +132,6 @@ protected String generateCreateTableSql( // Add table properties sqlBuilder.append(NEW_LINE).append(DorisUtils.generatePropertiesSql(properties)); - // Add Partition Info - if (partitioning != null && partitioning.length > 0) { - // TODO: Add partitioning support - throw new UnsupportedOperationException("Currently we do not support Partitioning in Doris"); - } - // Return the generated SQL statement String result = sqlBuilder.toString(); @@ -169,9 +169,7 @@ private static void validateDistribution(Distribution distribution, JdbcColumn[] } } - @VisibleForTesting - static void appendIndexesSql( - com.datastrato.gravitino.rel.indexes.Index[] indexes, StringBuilder sqlBuilder) { + private static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) { if (indexes.length == 0) { return; @@ -194,6 +192,57 @@ static void appendIndexesSql( sqlBuilder.append(",").append(NEW_LINE).append(indexSql); } + private static void appendPartitionSql( + Transform[] partitioning, JdbcColumn[] columns, StringBuilder sqlBuilder) { + if (ArrayUtils.isEmpty(partitioning)) { + return; + } + Preconditions.checkArgument( + partitioning.length == 1, "Composite partition type is not supported"); + + StringBuilder partitionSqlBuilder = new StringBuilder(); + Set columnNames = + Arrays.stream(columns).map(JdbcColumn::name).collect(Collectors.toSet()); + + if (partitioning[0] instanceof Transforms.RangeTransform) { + partitionSqlBuilder.append(NEW_LINE).append(" PARTITION BY RANGE("); + // TODO support multi-column range partitioning in doris + Transforms.RangeTransform rangePartition = (Transforms.RangeTransform) partitioning[0]; + + Preconditions.checkArgument( + rangePartition.fieldName().length == 1, "Doris partition does not support nested field"); + Preconditions.checkArgument( + columnNames.contains(rangePartition.fieldName()[0]), + "The partition field must be one of the columns"); + + String partitionColumn = BACK_QUOTE + rangePartition.fieldName()[0] + BACK_QUOTE; + // TODO we currently do not support pre-assign partition when creating range partitioning + partitionSqlBuilder.append(partitionColumn).append(") () "); + } else if (partitioning[0] instanceof Transforms.ListTransform) { + Transforms.ListTransform listPartition = (Transforms.ListTransform) partitioning[0]; + partitionSqlBuilder.append(" PARTITION BY LIST("); + + ImmutableList.Builder partitionColumnsBuilder = ImmutableList.builder(); + String[][] filedNames = listPartition.fieldNames(); + for (String[] filedName : filedNames) { + Preconditions.checkArgument( + filedName.length == 1, "Doris partition does not support nested field"); + Preconditions.checkArgument( + columnNames.contains(filedName[0]), "The partition field must be one of the columns"); + + partitionColumnsBuilder.add(BACK_QUOTE + filedName[0] + BACK_QUOTE); + } + String partitionColumns = + partitionColumnsBuilder.build().stream().collect(Collectors.joining(",")); + // TODO we currently do not support pre-assign partition when creating list partitioning table + partitionSqlBuilder.append(partitionColumns).append(") () "); + } else { + throw new IllegalArgumentException("Unsupported partition type of Doris"); + } + + sqlBuilder.append(partitionSqlBuilder); + } + @Override protected boolean getAutoIncrementInfo(ResultSet resultSet) throws SQLException { return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT")); diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java index a2855cb392a..1b6967f5c76 100644 --- a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java @@ -12,6 +12,8 @@ import com.datastrato.gravitino.rel.expressions.NamedReference; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.rel.indexes.Indexes; import com.datastrato.gravitino.rel.types.Type; @@ -411,4 +413,72 @@ public void testCreateNotSupportTypeTable() { "Couldn't convert Gravitino type %s to Doris type", type.simpleString()))); } } + + @Test + public void testCreateTableWithPartition() { + String tableComment = "partition_table_comment"; + JdbcColumn col1 = + JdbcColumn.builder() + .withName("col_1") + .withType(Types.IntegerType.get()) + .withNullable(false) + .build(); + JdbcColumn col2 = + JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build(); + JdbcColumn col3 = + JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build(); + JdbcColumn col4 = + JdbcColumn.builder() + .withName("col_4") + .withType(Types.DateType.get()) + .withNullable(false) + .build(); + List columns = Arrays.asList(col1, col2, col3, col4); + Distribution distribution = + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + + // create table with range partition + String rangePartitionTableName = GravitinoITUtils.genRandomName("range_partition_table"); + Transform[] rangePartition = new Transform[] {Transforms.range(new String[] {col4.name()})}; + TABLE_OPERATIONS.create( + databaseName, + rangePartitionTableName, + columns.toArray(new JdbcColumn[] {}), + tableComment, + createProperties(), + rangePartition, + distribution, + indexes); + JdbcTable rangePartitionTable = TABLE_OPERATIONS.load(databaseName, rangePartitionTableName); + assertionsTableInfo( + rangePartitionTableName, + tableComment, + columns, + Collections.emptyMap(), + null, + rangePartitionTable); + + // create table with list partition + String listPartitionTableName = GravitinoITUtils.genRandomName("list_partition_table"); + Transform[] listPartition = + new Transform[] {Transforms.list(new String[] {col1.name()}, new String[] {col4.name()})}; + TABLE_OPERATIONS.create( + databaseName, + listPartitionTableName, + columns.toArray(new JdbcColumn[] {}), + tableComment, + createProperties(), + listPartition, + distribution, + indexes); + JdbcTable listPartitionTable = TABLE_OPERATIONS.load(databaseName, listPartitionTableName); + assertionsTableInfo( + listPartitionTableName, + tableComment, + columns, + Collections.emptyMap(), + null, + listPartitionTable); + } }