Skip to content

Commit

Permalink
[#6361] feat(paimon):Support specifying primary keys during create pa…
Browse files Browse the repository at this point in the history
…imon table by flink (#6362)

### What changes were proposed in this pull request?

Support specifying primary keys during create paimon table by flink

### Why are the changes needed?


Fix: #6361 

### Does this PR introduce _any_ user-facing change?

None

### How was this patch tested?

Add testCreateTableWithPrimaryKey case in
org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT
  • Loading branch information
hdygxsj authored Feb 13, 2025
1 parent 4348089 commit 650232c
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
Expand All @@ -40,6 +41,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand Down Expand Up @@ -75,7 +77,11 @@
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;

/**
* The BaseCatalog that provides a default implementation for all methods in the {@link
Expand Down Expand Up @@ -276,8 +282,21 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
propertiesConverter.toGravitinoTableProperties(table.getOptions());
Transform[] partitions =
partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys());

try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions);

Index[] indices = getGrivatinoIndices(resolvedTable);
catalog()
.asTableCatalog()
.createTable(
identifier,
columns,
comment,
properties,
partitions,
Distributions.NONE,
new SortOrder[0],
indices);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
Expand All @@ -289,6 +308,20 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private static Index[] getGrivatinoIndices(ResolvedCatalogBaseTable<?> resolvedTable) {
Optional<UniqueConstraint> primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey();
List<String> primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null);
if (primaryColumns == null) {
return new Index[0];
}
String[][] primaryField =
primaryColumns.stream()
.map(primaryColumn -> new String[] {primaryColumn})
.toArray(String[][]::new);
Index primary = Indexes.primary("primary", primaryField);
return new Index[] {primary};
}

/**
* The method only is used to change the comments. To alter columns, use the other alterTable API
* and provide a list of TableChanges.
Expand Down Expand Up @@ -521,12 +554,38 @@ protected CatalogBaseTable toFlinkTable(Table table) {
.column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull())
.withComment(column.comment());
}
Optional<List<String>> flinkPrimaryKey = getFlinkPrimaryKey(table);
flinkPrimaryKey.ifPresent(builder::primaryKey);
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
}

private static Optional<List<String>> getFlinkPrimaryKey(Table table) {
List<Index> primaryKeyList =
Arrays.stream(table.index())
.filter(index -> index.type() == Index.IndexType.PRIMARY_KEY)
.collect(Collectors.toList());
if (primaryKeyList.isEmpty()) {
return Optional.empty();
}
Preconditions.checkArgument(
primaryKeyList.size() == 1, "More than one primary key is not supported.");
List<String> primaryKeyFieldList =
Arrays.stream(primaryKeyList.get(0).fieldNames())
.map(
fieldNames -> {
Preconditions.checkArgument(
fieldNames.length == 1, "The primary key columns should not be nested.");
return fieldNames[0];
})
.collect(Collectors.toList());
Preconditions.checkArgument(
!primaryKeyFieldList.isEmpty(), "The primary key must contain at least one field.");
return Optional.of(primaryKeyFieldList);
}

private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
return Column.of(
column.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -80,6 +81,10 @@ protected boolean supportGetSchemaWithoutCommentAndOption() {

protected abstract boolean supportDropCascade();

protected boolean supportsPrimaryKey() {
return true;
}

@Test
public void testCreateSchema() {
doWithCatalog(
Expand Down Expand Up @@ -280,6 +285,91 @@ public void testCreateSimpleTable() {
supportDropCascade());
}

@Test
@EnabledIf("supportsPrimaryKey")
public void testCreateTableWithPrimaryKey() {
String databaseName = "test_create_table_with_primary_key_db";
String tableName = "test_create_primary_key_table";
String comment = "test comment";
String key = "test key";
String value = "test value";

doWithSchema(
currentCatalog(),
databaseName,
catalog -> {
sql(
"CREATE TABLE %s "
+ "(aa int, "
+ " bb int,"
+ " cc int,"
+ " PRIMARY KEY (aa,bb) NOT ENFORCED"
+ ")"
+ " COMMENT '%s' WITH ("
+ "'%s' = '%s')",
tableName, comment, key, value);
Table table =
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
Assertions.assertEquals(1, table.index().length);
Index index = table.index()[0];
Assertions.assertEquals("aa", index.fieldNames()[0][0]);
Assertions.assertEquals("bb", index.fieldNames()[1][0]);

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,2,3)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 3));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,2,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,3,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(2));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1, 2, 4),
Row.of(1, 3, 4));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(2,2,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(3));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1, 2, 4),
Row.of(1, 3, 4),
Row.of(2, 2, 4));
},
true,
supportDropCascade());
}

@Test
@EnabledIf("supportTableOperation")
public void testListTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog hiveCatalog;

@Override
protected boolean supportsPrimaryKey() {
return false;
}

@BeforeAll
void hiveStartUp() {
initDefaultHiveCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog icebergCatalog;

@Override
protected boolean supportsPrimaryKey() {
return false;
}

@BeforeAll
public void before() {
Preconditions.checkNotNull(metalake);
Expand Down

0 comments on commit 650232c

Please sign in to comment.