From 650232cf28e20c5a8d7d76c9cb30e4e904ac3334 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Thu, 13 Feb 2025 10:21:40 +0800 Subject: [PATCH] [#6361] feat(paimon):Support specifying primary keys during create paimon table by flink (#6362) ### What changes were proposed in this pull request? Support specifying primary keys during create paimon table by flink ### Why are the changes needed? Fix: #6361 ### Does this PR introduce _any_ user-facing change? None ### How was this patch tested? Add testCreateTableWithPrimaryKey case in org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT --- .../flink/connector/catalog/BaseCatalog.java | 61 ++++++++++++- .../integration/test/FlinkCommonIT.java | 90 +++++++++++++++++++ .../test/hive/FlinkHiveCatalogIT.java | 5 ++ .../test/iceberg/FlinkIcebergCatalogIT.java | 5 ++ 4 files changed, 160 insertions(+), 1 deletion(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index fd8e118ee49..e9320c786cd 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.compress.utils.Lists; @@ -40,6 +41,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -75,7 +77,11 @@ import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; /** * The BaseCatalog that provides a default implementation for all methods in the {@link @@ -276,8 +282,21 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig propertiesConverter.toGravitinoTableProperties(table.getOptions()); Transform[] partitions = partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys()); + try { - catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions); + + Index[] indices = getGrivatinoIndices(resolvedTable); + catalog() + .asTableCatalog() + .createTable( + identifier, + columns, + comment, + properties, + partitions, + Distributions.NONE, + new SortOrder[0], + indices); } catch (NoSuchSchemaException e) { throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e); } catch (TableAlreadyExistsException e) { @@ -289,6 +308,20 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } + private static Index[] getGrivatinoIndices(ResolvedCatalogBaseTable resolvedTable) { + Optional primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey(); + List primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null); + if (primaryColumns == null) { + return new Index[0]; + } + String[][] primaryField = + primaryColumns.stream() + .map(primaryColumn -> new String[] {primaryColumn}) + .toArray(String[][]::new); + Index primary = Indexes.primary("primary", primaryField); + return new Index[] {primary}; + } + /** * The method only is used to change the comments. To alter columns, use the other alterTable API * and provide a list of TableChanges. @@ -521,12 +554,38 @@ protected CatalogBaseTable toFlinkTable(Table table) { .column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull()) .withComment(column.comment()); } + Optional> flinkPrimaryKey = getFlinkPrimaryKey(table); + flinkPrimaryKey.ifPresent(builder::primaryKey); Map flinkTableProperties = propertiesConverter.toFlinkTableProperties(table.properties()); List partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning()); return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties); } + private static Optional> getFlinkPrimaryKey(Table table) { + List primaryKeyList = + Arrays.stream(table.index()) + .filter(index -> index.type() == Index.IndexType.PRIMARY_KEY) + .collect(Collectors.toList()); + if (primaryKeyList.isEmpty()) { + return Optional.empty(); + } + Preconditions.checkArgument( + primaryKeyList.size() == 1, "More than one primary key is not supported."); + List primaryKeyFieldList = + Arrays.stream(primaryKeyList.get(0).fieldNames()) + .map( + fieldNames -> { + Preconditions.checkArgument( + fieldNames.length == 1, "The primary key columns should not be nested."); + return fieldNames[0]; + }) + .collect(Collectors.toList()); + Preconditions.checkArgument( + !primaryKeyFieldList.isEmpty(), "The primary key must contain at least one field."); + return Optional.of(primaryKeyFieldList); + } + private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) { return Column.of( column.getName(), diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index b45e5f46ec2..8ff6f8db7a2 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -51,6 +51,7 @@ import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.indexes.Index; import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -80,6 +81,10 @@ protected boolean supportGetSchemaWithoutCommentAndOption() { protected abstract boolean supportDropCascade(); + protected boolean supportsPrimaryKey() { + return true; + } + @Test public void testCreateSchema() { doWithCatalog( @@ -280,6 +285,91 @@ public void testCreateSimpleTable() { supportDropCascade()); } + @Test + @EnabledIf("supportsPrimaryKey") + public void testCreateTableWithPrimaryKey() { + String databaseName = "test_create_table_with_primary_key_db"; + String tableName = "test_create_primary_key_table"; + String comment = "test comment"; + String key = "test key"; + String value = "test value"; + + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + sql( + "CREATE TABLE %s " + + "(aa int, " + + " bb int," + + " cc int," + + " PRIMARY KEY (aa,bb) NOT ENFORCED" + + ")" + + " COMMENT '%s' WITH (" + + "'%s' = '%s')", + tableName, comment, key, value); + Table table = + catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); + Assertions.assertEquals(1, table.index().length); + Index index = table.index()[0]; + Assertions.assertEquals("aa", index.fieldNames()[0][0]); + Assertions.assertEquals("bb", index.fieldNames()[1][0]); + + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(1,2,3)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 3)); + + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(1,2,4)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4)); + + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(1,3,4)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(2)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, 2, 4), + Row.of(1, 3, 4)); + + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(2,2,4)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(3)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, 2, 4), + Row.of(1, 3, 4), + Row.of(2, 2, 4)); + }, + true, + supportDropCascade()); + } + @Test @EnabledIf("supportTableOperation") public void testListTables() { diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 7792068e249..3add18211f1 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -71,6 +71,11 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog hiveCatalog; + @Override + protected boolean supportsPrimaryKey() { + return false; + } + @BeforeAll void hiveStartUp() { initDefaultHiveCatalog(); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index cedf0e8d591..f8a3cdf2e17 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -63,6 +63,11 @@ public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog icebergCatalog; + @Override + protected boolean supportsPrimaryKey() { + return false; + } + @BeforeAll public void before() { Preconditions.checkNotNull(metalake);