Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6361] feat(paimon):Support specifying primary keys during create paimon table by flink #6362

Merged
merged 14 commits into from
Feb 13, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,10 @@ private List<String> getPrimaryKeysFromIndexes(Index[] indexes) {
if (indexes == null || indexes.length == 0) {
return Collections.emptyList();
}

Preconditions.checkArgument(
indexes.length == 1, "Paimon only supports no more than one Index.");

Index primaryKeyIndex = indexes[0];
Arrays.stream(primaryKeyIndex.fieldNames())
.forEach(
filedName ->
Preconditions.checkArgument(
filedName != null && filedName.length == 1,
"The primary key columns should not be nested."));

return Arrays.stream(primaryKeyIndex.fieldNames())
.map(fieldName -> fieldName[0])
.collect(Collectors.toList());
return Arrays.stream(primaryKeyIndex.fieldNames()).map(e -> e[0]).collect(Collectors.toList());
}

private static Index[] constructIndexesFromPrimaryKeys(Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
Expand All @@ -40,6 +41,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand Down Expand Up @@ -75,7 +77,11 @@
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;

/**
* The BaseCatalog that provides a default implementation for all methods in the {@link
Expand Down Expand Up @@ -276,8 +282,21 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
propertiesConverter.toGravitinoTableProperties(table.getOptions());
Transform[] partitions =
partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys());

try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions);

Index[] indices = getIndices(resolvedTable);
catalog()
.asTableCatalog()
.createTable(
identifier,
columns,
comment,
properties,
partitions,
Distributions.NONE,
new SortOrder[0],
indices);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
Expand All @@ -289,6 +308,18 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private static Index[] getIndices(ResolvedCatalogBaseTable<?> resolvedTable) {
Optional<UniqueConstraint> primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey();
List<String> primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null);
if (primaryColumns == null) {
return new Index[0];
}
String[][] primaryField =
primaryColumns.stream().map(e -> new String[] {e}).toArray(String[][]::new);
Index primary = Indexes.primary("primary", primaryField);
return new Index[] {primary};
}

/**
* The method only is used to change the comments. To alter columns, use the other alterTable API
* and provide a list of TableChanges.
Expand Down Expand Up @@ -521,6 +552,11 @@ protected CatalogBaseTable toFlinkTable(Table table) {
.column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull())
.withComment(column.comment());
}
Index[] indices = table.index();
if (indices != null && indices.length == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So .... this means the primary key always has a SINGLE column, right?

Copy link
Contributor Author

@hdygxsj hdygxsj Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. this means that only one primary key is supported. The Index object also stores field information and there can be multiple fields.
image

builder.primaryKey(
Arrays.stream(indices[0].fieldNames()).map(arr -> arr[0]).collect(Collectors.toList()));
}
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -80,6 +81,10 @@ protected boolean supportGetSchemaWithoutCommentAndOption() {

protected abstract boolean supportDropCascade();

protected boolean supportCreateTableWithPrimaryKey() {
return true;
}

@Test
public void testCreateSchema() {
doWithCatalog(
Expand Down Expand Up @@ -280,6 +285,91 @@ public void testCreateSimpleTable() {
supportDropCascade());
}

@Test
@EnabledIf("supportCreateTableWithPrimaryKey")
public void testCreateTableWithPrimaryKey() {
String databaseName = "test_create_table_with_primary_key_db";
String tableName = "test_create_primary_key_table";
String comment = "test comment";
String key = "test key";
String value = "test value";

doWithSchema(
currentCatalog(),
databaseName,
catalog -> {
sql(
"CREATE TABLE %s "
+ "(aa int, "
+ " bb int,"
+ " cc int,"
+ " PRIMARY KEY (aa,bb) NOT ENFORCED"
+ ")"
+ " COMMENT '%s' WITH ("
+ "'%s' = '%s')",
tableName, comment, key, value);
Table table =
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
Assertions.assertEquals(1, table.index().length);
Index index = table.index()[0];
Assertions.assertEquals("aa", index.fieldNames()[0][0]);
Assertions.assertEquals("bb", index.fieldNames()[1][0]);

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,2,3)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 3));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,2,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,3,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(2));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1, 2, 4),
Row.of(1, 3, 4));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(2,2,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(3));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1, 2, 4),
Row.of(1, 3, 4),
Row.of(2, 2, 4));
},
true,
supportDropCascade());
}

@Test
@EnabledIf("supportTableOperation")
public void testListTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog hiveCatalog;

@Override
protected boolean supportCreateTableWithPrimaryKey() {
return false;
}

@BeforeAll
void hiveStartUp() {
initDefaultHiveCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog icebergCatalog;

@Override
protected boolean supportCreateTableWithPrimaryKey() {
return false;
}

@BeforeAll
public void before() {
Preconditions.checkNotNull(metalake);
Expand Down
Loading