Skip to content

Commit

Permalink
Support unpartitioned tables in Kudu
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Jan 11, 2025
1 parent fb075d0 commit 61a5a9e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.base.util.JsonUtils.parseJson;
import static io.trino.plugin.kudu.properties.RangePartitionDefinition.EMPTY_RANGE_PARTITION;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
Expand Down Expand Up @@ -133,7 +134,8 @@ public static PartitionDesign getPartitionDesign(Map<String, Object> tableProper
requireNonNull(tableProperties);

@SuppressWarnings("unchecked")
List<String> hashColumns = (List<String>) tableProperties.get(PARTITION_BY_HASH_COLUMNS);
List<String> hashColumns = (List<String>) tableProperties.getOrDefault(PARTITION_BY_HASH_COLUMNS, ImmutableList.of());

@SuppressWarnings("unchecked")
List<String> hashColumns2 = (List<String>) tableProperties.getOrDefault(PARTITION_BY_HASH_COLUMNS_2, ImmutableList.of());

Expand All @@ -153,11 +155,15 @@ else if (!hashColumns2.isEmpty()) {
}

@SuppressWarnings("unchecked")
List<String> rangeColumns = (List<String>) tableProperties.get(PARTITION_BY_RANGE_COLUMNS);
List<String> rangeColumns = (List<String>) tableProperties.getOrDefault(PARTITION_BY_RANGE_COLUMNS, ImmutableList.of());
if (!rangeColumns.isEmpty()) {
design.setRange(new RangePartitionDefinition(rangeColumns));
}

if (!design.hasPartitions()) {
design.setRange(EMPTY_RANGE_PARTITION);
}

return design;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

public record RangePartitionDefinition(List<String> columns)
{
public static final RangePartitionDefinition EMPTY_RANGE_PARTITION = new RangePartitionDefinition(ImmutableList.of());

public RangePartitionDefinition
{
columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ public static String createKuduTableForWrites(String createTable)
format("WITH (partition_by_hash_columns = ARRAY['%s'], partition_by_hash_buckets = 2)", column);
}

@Test
public void testUnpartitionedTable()
{
String tableName = "test_unpartitioned_table" + randomNameSuffix();
// success create the table without partition keys
assertUpdate("CREATE TABLE " + tableName + " (id int WITH (primary_key=true), val varchar)");

assertQuery("SELECT COUNT(*) FROM " + tableName, "VALUES 0");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'hello'), (2, 'world')", 2);
assertQuery("SELECT COUNT(*) FROM " + tableName, "VALUES 2");

assertUpdate("DELETE FROM " + tableName + " WHERE id = 1", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES (2, 'world')");

assertThat(computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("CREATE TABLE kudu.default.%s (\n" +
" id integer COMMENT '' WITH (primary_key = true),\n" +
" val varchar COMMENT ''\n" +
")\n" +
"WITH (\n" +
" number_of_replicas = 1,\n" +
" range_partitions = '[]'\n" +
")", tableName));

assertUpdate("DROP TABLE " + tableName);
}

@Test
@Override
public void testCreateSchema()
Expand Down Expand Up @@ -300,7 +327,7 @@ public void testAddNotNullColumnToEmptyTable()
{
// TODO: Enable this test
assertThatThrownBy(super::testAddNotNullColumnToEmptyTable)
.hasMessage("Table partitioning must be specified using setRangePartitionColumns or addHashPartitions");
.hasMessage("must specify at least one key column");
abort("TODO");
}

Expand Down Expand Up @@ -750,7 +777,7 @@ public void testWrittenStats()
public void testVarcharCastToDateInPredicate()
{
assertThatThrownBy(super::testVarcharCastToDateInPredicate)
.hasStackTraceContaining("Table partitioning must be specified using setRangePartitionColumns or addHashPartitions");
.hasStackTraceContaining("must specify at least one key column");

abort("TODO: implement the test for Kudu");
}
Expand Down

0 comments on commit 61a5a9e

Please sign in to comment.