diff --git a/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java b/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java index b7c89865cf0..092738ccd49 100644 --- a/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java +++ b/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java @@ -11,6 +11,9 @@ import com.datastrato.gravitino.catalog.FilesetDispatcher; import com.datastrato.gravitino.catalog.FilesetNormalizeDispatcher; import com.datastrato.gravitino.catalog.FilesetOperationDispatcher; +import com.datastrato.gravitino.catalog.PartitionDispatcher; +import com.datastrato.gravitino.catalog.PartitionNormalizeDispatcher; +import com.datastrato.gravitino.catalog.PartitionOperationDispatcher; import com.datastrato.gravitino.catalog.SchemaDispatcher; import com.datastrato.gravitino.catalog.SchemaNormalizeDispatcher; import com.datastrato.gravitino.catalog.SchemaOperationDispatcher; @@ -59,6 +62,8 @@ public class GravitinoEnv { private TableDispatcher tableDispatcher; + private PartitionDispatcher partitionDispatcher; + private FilesetDispatcher filesetDispatcher; private TopicDispatcher topicDispatcher; @@ -169,6 +174,11 @@ public void initialize(Config config) { new TableNormalizeDispatcher(tableOperationDispatcher); this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher); + PartitionOperationDispatcher partitionOperationDispatcher = + new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator); + // todo: support PartitionEventDispatcher + this.partitionDispatcher = new PartitionNormalizeDispatcher(partitionOperationDispatcher); + FilesetOperationDispatcher filesetOperationDispatcher = new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator); FilesetNormalizeDispatcher filesetNormalizeDispatcher = @@ -244,6 +254,10 @@ public TableDispatcher tableDispatcher() { return tableDispatcher; } + public PartitionDispatcher partitionDispatcher() { + return partitionDispatcher; + } + /** * Get the FilesetDispatcher associated with the Gravitino environment. * diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CapabilityHelpers.java b/core/src/main/java/com/datastrato/gravitino/catalog/CapabilityHelpers.java index 285fc1a5e05..8d108c8f49d 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CapabilityHelpers.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CapabilityHelpers.java @@ -389,7 +389,7 @@ private static String applyCapabilitiesOnName( return standardizeName; } - private static String applyCaseSensitiveOnName( + public static String applyCaseSensitiveOnName( Capability.Scope scope, String name, Capability capabilities) { return capabilities.caseSensitiveOnName(scope).supported() ? name : name.toLowerCase(); } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java index b2527e07657..71c33f3dd85 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java @@ -37,7 +37,9 @@ import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaEntity; +import com.datastrato.gravitino.rel.SupportsPartitions; import com.datastrato.gravitino.rel.SupportsSchemas; +import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.rel.TableCatalog; import com.datastrato.gravitino.storage.IdGenerator; import com.datastrato.gravitino.utils.IsolatedClassLoader; @@ -135,6 +137,19 @@ public R doWithTopicOps(ThrowableFunction fn) throws Except }); } + public R doWithPartitionOps( + NameIdentifier tableIdent, ThrowableFunction fn) throws Exception { + return classLoader.withClassLoader( + cl -> { + Preconditions.checkArgument( + asTables() != null, "Catalog does not support table operations"); + Table table = asTables().loadTable(tableIdent); + Preconditions.checkArgument( + table.supportPartitions() != null, "Table does not support partition operations"); + return fn.apply(table.supportPartitions()); + }); + } + public R doWithPropertiesMeta(ThrowableFunction fn) throws Exception { return classLoader.withClassLoader(cl -> fn.apply(catalog.ops())); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/OperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/OperationDispatcher.java index 276b66dfa80..20bcdc5c809 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/OperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/OperationDispatcher.java @@ -19,6 +19,7 @@ import com.datastrato.gravitino.file.FilesetChange; import com.datastrato.gravitino.messaging.TopicChange; import com.datastrato.gravitino.rel.SchemaChange; +import com.datastrato.gravitino.rel.SupportsPartitions; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.storage.IdGenerator; import com.datastrato.gravitino.utils.ThrowableFunction; @@ -60,6 +61,24 @@ public OperationDispatcher( this.idGenerator = idGenerator; } + R doWithTable( + NameIdentifier tableIdent, ThrowableFunction fn, Class ex) + throws E { + try { + NameIdentifier catalogIdent = getCatalogIdentifier(tableIdent); + CatalogManager.CatalogWrapper c = catalogManager.loadCatalogAndWrap(catalogIdent); + return c.doWithPartitionOps(tableIdent, fn); + } catch (Throwable throwable) { + if (ex.isInstance(throwable)) { + throw ex.cast(throwable); + } + if (RuntimeException.class.isAssignableFrom(throwable.getClass())) { + throw (RuntimeException) throwable; + } + throw new RuntimeException(throwable); + } + } + R doWithCatalog( NameIdentifier ident, ThrowableFunction fn, Class ex) throws E { diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/PartitionDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/PartitionDispatcher.java new file mode 100644 index 00000000000..13047de9346 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/catalog/PartitionDispatcher.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.exceptions.NoSuchPartitionException; +import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException; +import com.datastrato.gravitino.rel.SupportsPartitions; +import com.datastrato.gravitino.rel.partitions.Partition; + +/** + * {@code PartitionDispatcher} interface is a wrapper around the {@link SupportsPartitions} + * interface, adding {@link NameIdentifier} of table to the method parameters for find out the + * catalog class loader. + */ +public interface PartitionDispatcher { + + /** + * List the names of all partitions in the table. + * + * @param tableIdent The identifier of the table. + * @return The names of all partitions in the table. + */ + String[] listPartitionNames(NameIdentifier tableIdent); + + /** + * List all partitions in the table. + * + * @param tableIdent The identifier of the table. + * @return The list of partitions. + */ + Partition[] listPartitions(NameIdentifier tableIdent); + + /** + * Get a partition by name from the table. + * + * @param tableIdent The identifier of the table. + * @param partitionName The name of the partition. + * @return The partition. + * @throws NoSuchPartitionException + */ + Partition getPartition(NameIdentifier tableIdent, String partitionName) + throws NoSuchPartitionException; + + /** + * Check if a partition exists in the table. + * + * @param tableIdent The identifier of the table. + * @param partitionName The name of the partition. + * @return True if the partition exists, false otherwise. + */ + default boolean partitionExists(NameIdentifier tableIdent, String partitionName) { + try { + getPartition(tableIdent, partitionName); + return true; + } catch (NoSuchPartitionException e) { + return false; + } + } + + /** + * Add a partition to the table. + * + * @param tableIdent The identifier of the table. + * @param partition The partition to add. + * @return The added partition. + * @throws PartitionAlreadyExistsException If the partition already exists. + */ + Partition addPartition(NameIdentifier tableIdent, Partition partition) + throws PartitionAlreadyExistsException; + + /** + * Drop a partition from the table by name. + * + * @param tableIdent The identifier of the table. + * @param partitionName The name of the partition. + * @return True if the partition was dropped, false if the partition does not exist. + */ + boolean dropPartition(NameIdentifier tableIdent, String partitionName); + + /** + * Purge a partition from the table by name. + * + * @param tableIdent The identifier of the table. + * @param partitionName The name of the partition. + * @return True if the partition was purged, false if the partition does not exist. + * @throws UnsupportedOperationException If partition purging is not supported. + */ + default boolean purgePartition(NameIdentifier tableIdent, String partitionName) + throws UnsupportedOperationException { + throw new UnsupportedOperationException("Partition purging is not supported"); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/PartitionNormalizeDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/PartitionNormalizeDispatcher.java new file mode 100644 index 00000000000..4d45aa7a0b2 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/catalog/PartitionNormalizeDispatcher.java @@ -0,0 +1,136 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import static com.datastrato.gravitino.catalog.CapabilityHelpers.applyCaseSensitiveOnName; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.connector.capability.Capability; +import com.datastrato.gravitino.exceptions.NoSuchPartitionException; +import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException; +import com.datastrato.gravitino.rel.partitions.IdentityPartition; +import com.datastrato.gravitino.rel.partitions.ListPartition; +import com.datastrato.gravitino.rel.partitions.Partition; +import com.datastrato.gravitino.rel.partitions.Partitions; +import com.datastrato.gravitino.rel.partitions.RangePartition; +import java.util.Arrays; + +public class PartitionNormalizeDispatcher implements PartitionDispatcher { + + private final PartitionOperationDispatcher dispatcher; + + public PartitionNormalizeDispatcher(PartitionOperationDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + @Override + public String[] listPartitionNames(NameIdentifier tableIdent) { + String[] partitionNames = + dispatcher.listPartitionNames( + CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher)); + return applyCaseSensitive(tableIdent, partitionNames); + } + + @Override + public Partition[] listPartitions(NameIdentifier tableIdent) { + Partition[] partitions = + dispatcher.listPartitions( + CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher)); + return applyCaseSensitive(tableIdent, partitions); + } + + @Override + public Partition getPartition(NameIdentifier tableIdent, String partitionName) + throws NoSuchPartitionException { + return dispatcher.getPartition( + CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher), + applyCaseSensitiveOnName( + Capability.Scope.PARTITION, + partitionName, + dispatcher.getCatalogCapability(tableIdent))); + } + + @Override + public Partition addPartition(NameIdentifier tableIdent, Partition partition) + throws PartitionAlreadyExistsException { + return dispatcher.addPartition( + CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher), + applyCaseSensitive(tableIdent, partition)); + } + + @Override + public boolean dropPartition(NameIdentifier tableIdent, String partitionName) { + return dispatcher.dropPartition( + CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher), + applyCaseSensitiveOnName( + Capability.Scope.PARTITION, + partitionName, + dispatcher.getCatalogCapability(tableIdent))); + } + + @Override + public boolean purgePartition(NameIdentifier tableIdent, String partitionName) + throws UnsupportedOperationException { + return dispatcher.purgePartition( + CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher), + applyCaseSensitiveOnName( + Capability.Scope.PARTITION, + partitionName, + dispatcher.getCatalogCapability(tableIdent))); + } + + private String[] applyCaseSensitive(NameIdentifier tableIdent, String[] partitionNames) { + Capability capabilities = dispatcher.getCatalogCapability(tableIdent); + return Arrays.stream(partitionNames) + .map( + partitionName -> + applyCaseSensitiveOnName(Capability.Scope.PARTITION, partitionName, capabilities)) + .toArray(String[]::new); + } + + private Partition[] applyCaseSensitive(NameIdentifier tableIdent, Partition[] partitions) { + boolean caseSensitive = + dispatcher + .getCatalogCapability(tableIdent) + .caseSensitiveOnName(Capability.Scope.PARTITION) + .supported(); + return Arrays.stream(partitions) + .map(partition -> applyCaseSensitive(partition, caseSensitive)) + .toArray(Partition[]::new); + } + + private Partition applyCaseSensitive(NameIdentifier tableIdent, Partition partition) { + boolean caseSensitive = + dispatcher + .getCatalogCapability(tableIdent) + .caseSensitiveOnName(Capability.Scope.PARTITION) + .supported(); + return applyCaseSensitive(partition, caseSensitive); + } + + private Partition applyCaseSensitive(Partition partition, boolean caseSensitive) { + String newName = caseSensitive ? partition.name() : partition.name().toLowerCase(); + if (partition instanceof IdentityPartition) { + IdentityPartition identityPartition = (IdentityPartition) partition; + return Partitions.identity( + newName, + identityPartition.fieldNames(), + identityPartition.values(), + identityPartition.properties()); + + } else if (partition instanceof ListPartition) { + ListPartition listPartition = (ListPartition) partition; + return Partitions.list(newName, listPartition.lists(), listPartition.properties()); + + } else if (partition instanceof RangePartition) { + RangePartition rangePartition = (RangePartition) partition; + return Partitions.range( + newName, rangePartition.upper(), rangePartition.lower(), rangePartition.properties()); + + } else { + throw new IllegalArgumentException("Unknown partition type: " + partition.getClass()); + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/PartitionOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/PartitionOperationDispatcher.java new file mode 100644 index 00000000000..720376218b1 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/catalog/PartitionOperationDispatcher.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.EntityStore; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.exceptions.NoSuchPartitionException; +import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException; +import com.datastrato.gravitino.rel.SupportsPartitions; +import com.datastrato.gravitino.rel.partitions.Partition; +import com.datastrato.gravitino.storage.IdGenerator; + +public class PartitionOperationDispatcher extends OperationDispatcher + implements PartitionDispatcher { + + /** + * Creates a new PartitionOperationDispatcher. + * + * @param catalogManager The CatalogManager instance to be used for partition operations. + * @param store The EntityStore instance to be used for partition operations. + * @param idGenerator The IdGenerator instance to be used for partition operations. + */ + public PartitionOperationDispatcher( + CatalogManager catalogManager, EntityStore store, IdGenerator idGenerator) { + super(catalogManager, store, idGenerator); + } + + @Override + public String[] listPartitionNames(NameIdentifier tableIdent) { + return doWithTable( + tableIdent, SupportsPartitions::listPartitionNames, NoSuchTableException.class); + } + + @Override + public Partition[] listPartitions(NameIdentifier tableIdent) { + return doWithTable(tableIdent, SupportsPartitions::listPartitions, NoSuchTableException.class); + } + + @Override + public Partition getPartition(NameIdentifier tableIdent, String partitionName) + throws NoSuchPartitionException { + return doWithTable( + tableIdent, p -> p.getPartition(partitionName), NoSuchPartitionException.class); + } + + @Override + public Partition addPartition(NameIdentifier tableIdent, Partition partition) + throws PartitionAlreadyExistsException { + return doWithTable( + tableIdent, p -> p.addPartition(partition), PartitionAlreadyExistsException.class); + } + + @Override + public boolean dropPartition(NameIdentifier tableIdent, String partitionName) { + return doWithTable( + tableIdent, p -> p.dropPartition(partitionName), NoSuchPartitionException.class); + } + + @Override + public boolean purgePartition(NameIdentifier tableIdent, String partitionName) + throws UnsupportedOperationException { + return doWithTable( + tableIdent, p -> p.purgePartition(partitionName), NoSuchPartitionException.class); + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/TestTable.java b/core/src/test/java/com/datastrato/gravitino/TestTable.java index 7b01c050591..af90d2a3a88 100644 --- a/core/src/test/java/com/datastrato/gravitino/TestTable.java +++ b/core/src/test/java/com/datastrato/gravitino/TestTable.java @@ -6,6 +6,7 @@ import com.datastrato.gravitino.connector.BaseTable; import com.datastrato.gravitino.connector.TableOperations; +import com.datastrato.gravitino.rel.SupportsPartitions; import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) @@ -13,7 +14,12 @@ public class TestTable extends BaseTable { @Override protected TableOperations newOps() { - throw new UnsupportedOperationException("TestTable does not support TableOperations."); + return new TestTableOperations(); + } + + @Override + public SupportsPartitions supportPartitions() throws UnsupportedOperationException { + return (SupportsPartitions) ops(); } public static class Builder extends BaseTable.BaseTableBuilder { @@ -33,6 +39,7 @@ protected TestTable internalBuild() { table.sortOrders = sortOrders; table.partitioning = partitioning; table.indexes = indexes; + table.proxyPlugin = proxyPlugin; return table; } } diff --git a/core/src/test/java/com/datastrato/gravitino/TestTableOperations.java b/core/src/test/java/com/datastrato/gravitino/TestTableOperations.java new file mode 100644 index 00000000000..320f171f32b --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/TestTableOperations.java @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino; + +import com.datastrato.gravitino.connector.TableOperations; +import com.datastrato.gravitino.exceptions.NoSuchPartitionException; +import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException; +import com.datastrato.gravitino.rel.SupportsPartitions; +import com.datastrato.gravitino.rel.partitions.Partition; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; + +public class TestTableOperations implements TableOperations, SupportsPartitions { + + private static final Map partitions = Maps.newHashMap(); + + @Override + public String[] listPartitionNames() { + return partitions.keySet().toArray(new String[0]); + } + + @Override + public Partition[] listPartitions() { + return partitions.values().toArray(new Partition[0]); + } + + @Override + public Partition getPartition(String partitionName) throws NoSuchPartitionException { + if (!partitions.containsKey(partitionName)) { + throw new NoSuchPartitionException("Partition not found: %s", partitionName); + } + return partitions.get(partitionName); + } + + @Override + public Partition addPartition(Partition partition) throws PartitionAlreadyExistsException { + if (partitions.containsKey(partition.name())) { + throw new PartitionAlreadyExistsException("Partition already exists: %s", partition.name()); + } + partitions.put(partition.name(), partition); + return partition; + } + + @Override + public boolean dropPartition(String partitionName) { + if (!partitions.containsKey(partitionName)) { + return false; + } + partitions.remove(partitionName); + return true; + } + + @Override + public void close() throws IOException { + partitions.clear(); + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestPartitionNormalizeDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestPartitionNormalizeDispatcher.java new file mode 100644 index 00000000000..746afb2d2cd --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestPartitionNormalizeDispatcher.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.expressions.literals.Literal; +import com.datastrato.gravitino.rel.expressions.literals.Literals; +import com.datastrato.gravitino.rel.partitions.Partition; +import com.datastrato.gravitino.rel.partitions.Partitions; +import com.datastrato.gravitino.rel.types.Types; +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class TestPartitionNormalizeDispatcher extends TestOperationDispatcher { + + private static PartitionNormalizeDispatcher partitionNormalizeDispatcher; + private static final String SCHEMA = "test_partition_normalize_schema"; + private static final NameIdentifier TABLE = + NameIdentifier.ofTable(metalake, catalog, SCHEMA, "TEST_PARTITION_NORMALIZE_TABLE"); + + @BeforeAll + public static void initialize() { + TestPartitionOperationDispatcher.prepareTable(); + partitionNormalizeDispatcher = + new PartitionNormalizeDispatcher( + TestPartitionOperationDispatcher.partitionOperationDispatcher); + NameIdentifier schemaIdent = NameIdentifier.ofSchema(metalake, catalog, SCHEMA); + TestPartitionOperationDispatcher.schemaOperationDispatcher.createSchema( + schemaIdent, "comment", null); + NameIdentifier tableIdent = + NameIdentifier.ofTable(metalake, catalog, SCHEMA, "test_partition_normalize_table"); + TestPartitionOperationDispatcher.tableOperationDispatcher.createTable( + tableIdent, + new Column[] { + Column.of("col1", Types.StringType.get()), Column.of("col2", Types.StringType.get()) + }, + "comment", + null); + } + + @Test + public void testNameCaseInsensitive() { + Partition partition = + Partitions.identity( + "pNAME", + new String[][] {{"col1"}}, + new Literal[] {Literals.stringLiteral("v1")}, + Maps.newHashMap()); + // test case-insensitive in adding + Partition addedPartition = partitionNormalizeDispatcher.addPartition(TABLE, partition); + Assertions.assertEquals(partition.name().toLowerCase(), addedPartition.name()); + + // test case-insensitive in getting + Partition gotPartition = partitionNormalizeDispatcher.getPartition(TABLE, partition.name()); + Assertions.assertEquals(partition.name().toLowerCase(), gotPartition.name()); + + // test case-insensitive in listing names + String[] partitionNames = partitionNormalizeDispatcher.listPartitionNames(TABLE); + Assertions.assertEquals(partition.name().toLowerCase(), partitionNames[0]); + + // test case-insensitive in listing partitions + Partition[] listedPartitions = partitionNormalizeDispatcher.listPartitions(TABLE); + Assertions.assertEquals(partition.name().toLowerCase(), listedPartitions[0].name()); + + // test case-insensitive in existence check + Assertions.assertTrue(partitionNormalizeDispatcher.partitionExists(TABLE, partition.name())); + + // test case-insensitive in dropping + Assertions.assertTrue( + partitionNormalizeDispatcher.dropPartition(TABLE, partition.name().toUpperCase())); + Assertions.assertFalse(partitionNormalizeDispatcher.partitionExists(TABLE, partition.name())); + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestPartitionOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestPartitionOperationDispatcher.java new file mode 100644 index 00000000000..926eb15e7ce --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestPartitionOperationDispatcher.java @@ -0,0 +1,135 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.expressions.literals.Literal; +import com.datastrato.gravitino.rel.expressions.literals.Literals; +import com.datastrato.gravitino.rel.partitions.Partition; +import com.datastrato.gravitino.rel.partitions.Partitions; +import com.datastrato.gravitino.rel.types.Types; +import com.datastrato.gravitino.utils.IsolatedClassLoader; +import com.google.common.collect.Maps; +import java.util.Arrays; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class TestPartitionOperationDispatcher extends TestOperationDispatcher { + static SchemaOperationDispatcher schemaOperationDispatcher; + static TableOperationDispatcher tableOperationDispatcher; + static PartitionOperationDispatcher partitionOperationDispatcher; + + private static final String SCHEMA = "test_partition_schema"; + private static final String TABLE = "test_partition_table"; + private static final NameIdentifier TABLE_IDENT = + NameIdentifier.ofTable(metalake, catalog, SCHEMA, TABLE); + private static final Partition PARTITION = + Partitions.identity( + "p1", + new String[][] {{"col1"}}, + new Literal[] {Literals.stringLiteral("v1")}, + Maps.newHashMap()); + + @BeforeAll + public static void initialize() { + prepareTable(); + partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION); + + // Assert that the custom class loader is used + ClassLoader classLoader = + partitionOperationDispatcher.doWithTable( + TABLE_IDENT, + s -> Thread.currentThread().getContextClassLoader(), + RuntimeException.class); + Assertions.assertInstanceOf( + IsolatedClassLoader.CUSTOM_CLASS_LOADER_CLASS, + classLoader, + "Custom class loader is not used"); + } + + protected static void prepareTable() { + schemaOperationDispatcher = + new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator); + tableOperationDispatcher = + new TableOperationDispatcher(catalogManager, entityStore, idGenerator); + partitionOperationDispatcher = + new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator); + + NameIdentifier schemaIdent = NameIdentifier.ofSchema(metalake, catalog, SCHEMA); + schemaOperationDispatcher.createSchema(schemaIdent, "comment", null); + Column[] columns = + new Column[] { + Column.of("col1", Types.StringType.get()), Column.of("col2", Types.StringType.get()) + }; + tableOperationDispatcher.createTable(TABLE_IDENT, columns, "comment", null); + } + + @Test + public void testListPartitionNames() { + String[] partitionNames = partitionOperationDispatcher.listPartitionNames(TABLE_IDENT); + Assertions.assertTrue(Arrays.asList(partitionNames).contains(PARTITION.name())); + } + + @Test + public void testListPartitions() { + Partition[] partitions = partitionOperationDispatcher.listPartitions(TABLE_IDENT); + Assertions.assertTrue(Arrays.asList(partitions).contains(PARTITION)); + } + + @Test + public void testGetPartition() { + Partition p = partitionOperationDispatcher.getPartition(TABLE_IDENT, PARTITION.name()); + Assertions.assertEquals(PARTITION, p); + } + + @Test + public void testPartitionExists() { + Assertions.assertTrue( + partitionOperationDispatcher.partitionExists(TABLE_IDENT, PARTITION.name())); + } + + @Test + public void testAddPartition() { + Partition newPartition = + Partitions.identity( + "p2", + new String[][] {{"col1"}}, + new Literal[] {Literals.stringLiteral("v2")}, + Maps.newHashMap()); + partitionOperationDispatcher.addPartition(TABLE_IDENT, newPartition); + Assertions.assertTrue( + partitionOperationDispatcher.partitionExists(TABLE_IDENT, newPartition.name())); + } + + @Test + public void testDropPartition() { + Partition testDrop = + Partitions.identity( + "p3", + new String[][] {{"col1"}}, + new Literal[] {Literals.stringLiteral("v2")}, + Maps.newHashMap()); + partitionOperationDispatcher.addPartition(TABLE_IDENT, testDrop); + Assertions.assertTrue( + partitionOperationDispatcher.partitionExists(TABLE_IDENT, testDrop.name())); + + Assertions.assertTrue( + partitionOperationDispatcher.partitionExists(TABLE_IDENT, testDrop.name())); + + boolean dropped = partitionOperationDispatcher.dropPartition(TABLE_IDENT, testDrop.name()); + Assertions.assertTrue(dropped); + Assertions.assertFalse( + partitionOperationDispatcher.partitionExists(TABLE_IDENT, testDrop.name())); + } + + @Test + public void testPurgePartition() { + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> partitionOperationDispatcher.purgePartition(TABLE_IDENT, PARTITION.name())); + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicNormalizeDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicNormalizeDispatcher.java index ec81f9ff036..fa75929443e 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicNormalizeDispatcher.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicNormalizeDispatcher.java @@ -45,7 +45,7 @@ public void testNameCaseInsensitive() { // test case-insensitive in listing NameIdentifier[] idents = topicNormalizeDispatcher.listTopics(topicNs); - Assertions.assertEquals(1, idents.length); + Assertions.assertEquals(topicIdent.name().toLowerCase(), idents[0].name()); // test case-insensitive in altering Topic alteredTopic = diff --git a/server/src/main/java/com/datastrato/gravitino/server/GravitinoServer.java b/server/src/main/java/com/datastrato/gravitino/server/GravitinoServer.java index d54f3c8fdf3..ec2aef1a265 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/GravitinoServer.java +++ b/server/src/main/java/com/datastrato/gravitino/server/GravitinoServer.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.GravitinoEnv; import com.datastrato.gravitino.catalog.CatalogDispatcher; import com.datastrato.gravitino.catalog.FilesetDispatcher; +import com.datastrato.gravitino.catalog.PartitionDispatcher; import com.datastrato.gravitino.catalog.SchemaDispatcher; import com.datastrato.gravitino.catalog.TableDispatcher; import com.datastrato.gravitino.catalog.TopicDispatcher; @@ -82,6 +83,7 @@ protected void configure() { bind(gravitinoEnv.schemaDispatcher()).to(SchemaDispatcher.class).ranked(1); bind(gravitinoEnv.tableDispatcher()).to(TableDispatcher.class).ranked(1); + bind(gravitinoEnv.partitionDispatcher()).to(PartitionDispatcher.class).ranked(1); bind(gravitinoEnv.filesetDispatcher()).to(FilesetDispatcher.class).ranked(1); bind(gravitinoEnv.topicDispatcher()).to(TopicDispatcher.class).ranked(1); } diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/PartitionOperations.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/PartitionOperations.java index 0b8e17ad6fc..3d30d6c3b36 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/web/rest/PartitionOperations.java +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/PartitionOperations.java @@ -10,7 +10,7 @@ import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; import com.datastrato.gravitino.NameIdentifier; -import com.datastrato.gravitino.catalog.TableDispatcher; +import com.datastrato.gravitino.catalog.PartitionDispatcher; import com.datastrato.gravitino.dto.rel.partitions.PartitionDTO; import com.datastrato.gravitino.dto.requests.AddPartitionsRequest; import com.datastrato.gravitino.dto.responses.DropResponse; @@ -21,7 +21,6 @@ import com.datastrato.gravitino.lock.LockType; import com.datastrato.gravitino.lock.TreeLockUtils; import com.datastrato.gravitino.metrics.MetricNames; -import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.rel.partitions.Partition; import com.datastrato.gravitino.server.web.Utils; import com.google.common.base.Preconditions; @@ -44,11 +43,11 @@ public class PartitionOperations { private static final Logger LOG = LoggerFactory.getLogger(PartitionOperations.class); - private final TableDispatcher dispatcher; + private final PartitionDispatcher dispatcher; @Context private HttpServletRequest httpRequest; @Inject - public PartitionOperations(TableDispatcher dispatcher) { + public PartitionOperations(PartitionDispatcher dispatcher) { this.dispatcher = dispatcher; } @@ -71,12 +70,11 @@ public Response listPartitionNames( tableIdent, LockType.READ, () -> { - Table loadTable = dispatcher.loadTable(tableIdent); if (verbose) { - Partition[] partitions = loadTable.supportPartitions().listPartitions(); + Partition[] partitions = dispatcher.listPartitions(tableIdent); return Utils.ok(new PartitionListResponse(toDTOs(partitions))); } else { - String[] partitionNames = loadTable.supportPartitions().listPartitionNames(); + String[] partitionNames = dispatcher.listPartitionNames(tableIdent); return Utils.ok(new PartitionNameListResponse((partitionNames))); } }); @@ -106,8 +104,7 @@ public Response getPartition( tableIdent, LockType.READ, () -> { - Table loadTable = dispatcher.loadTable(tableIdent); - Partition p = loadTable.supportPartitions().getPartition(partition); + Partition p = dispatcher.getPartition(tableIdent, partition); return Utils.ok(new PartitionResponse(DTOConverters.toDTO(p))); }); }); @@ -138,11 +135,8 @@ public Response addPartitions( tableIdent, LockType.WRITE, () -> { - Table loadTable = dispatcher.loadTable(tableIdent); Partition p = - loadTable - .supportPartitions() - .addPartition(fromDTO(request.getPartitions()[0])); + dispatcher.addPartition(tableIdent, fromDTO(request.getPartitions()[0])); return Utils.ok( new PartitionListResponse(new PartitionDTO[] {DTOConverters.toDTO(p)})); }); @@ -173,11 +167,10 @@ public Response dropPartition( tableIdent, LockType.WRITE, () -> { - Table loadTable = dispatcher.loadTable(tableIdent); boolean dropped = purge - ? loadTable.supportPartitions().purgePartition(partition) - : loadTable.supportPartitions().dropPartition(partition); + ? dispatcher.purgePartition(tableIdent, partition) + : dispatcher.dropPartition(tableIdent, partition); if (!dropped) { LOG.warn( "Failed to drop partition {} under table {} under schema {}", diff --git a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestPartitionOperations.java b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestPartitionOperations.java index a6f7b178b70..9de31fb91f5 100644 --- a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestPartitionOperations.java +++ b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestPartitionOperations.java @@ -7,8 +7,6 @@ import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; -import static com.datastrato.gravitino.server.web.rest.TestTableOperations.mockColumn; -import static com.datastrato.gravitino.server.web.rest.TestTableOperations.mockTable; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -16,8 +14,8 @@ import com.datastrato.gravitino.Config; import com.datastrato.gravitino.GravitinoEnv; -import com.datastrato.gravitino.catalog.TableDispatcher; -import com.datastrato.gravitino.catalog.TableOperationDispatcher; +import com.datastrato.gravitino.catalog.PartitionDispatcher; +import com.datastrato.gravitino.catalog.PartitionOperationDispatcher; import com.datastrato.gravitino.dto.rel.partitions.PartitionDTO; import com.datastrato.gravitino.dto.requests.AddPartitionsRequest; import com.datastrato.gravitino.dto.responses.DropResponse; @@ -30,21 +28,13 @@ import com.datastrato.gravitino.exceptions.NoSuchPartitionException; import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException; import com.datastrato.gravitino.lock.LockManager; -import com.datastrato.gravitino.rel.Column; -import com.datastrato.gravitino.rel.SupportsPartitions; -import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.rel.expressions.literals.Literal; import com.datastrato.gravitino.rel.expressions.literals.Literals; -import com.datastrato.gravitino.rel.expressions.transforms.Transform; -import com.datastrato.gravitino.rel.expressions.transforms.Transforms; import com.datastrato.gravitino.rel.partitions.Partition; import com.datastrato.gravitino.rel.partitions.Partitions; -import com.datastrato.gravitino.rel.types.Types; import com.datastrato.gravitino.rest.RESTUtils; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.io.IOException; -import java.util.Map; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.client.Entity; import javax.ws.rs.core.Application; @@ -76,8 +66,7 @@ public class TestPartitionOperations extends JerseyTest { new String[][] {colName}, new Literal[] {Literals.stringLiteral("v2")}, Maps.newHashMap()); - private static final Map partitions = - ImmutableMap.of(partitionNames[0], partition1, partitionNames[1], partition2); + private static final Partition[] partitions = new Partition[] {partition1, partition2}; private static class MockServletRequestFactory extends ServletRequestFactoryBase { @Override @@ -88,7 +77,7 @@ public HttpServletRequest get() { } } - private TableOperationDispatcher dispatcher = mock(TableOperationDispatcher.class); + private PartitionOperationDispatcher dispatcher = mock(PartitionOperationDispatcher.class); private final String metalake = "metalake1"; private final String catalog = "catalog1"; private final String schema = "schema1"; @@ -118,7 +107,7 @@ protected Application configure() { new AbstractBinder() { @Override protected void configure() { - bind(dispatcher).to(TableDispatcher.class).ranked(2); + bind(dispatcher).to(PartitionDispatcher.class).ranked(2); bindFactory(MockServletRequestFactory.class).to(HttpServletRequest.class); } }); @@ -132,70 +121,9 @@ private String partitionPath(String metalake, String catalog, String schema, Str metalake, catalog, schema, table); } - private Table mockPartitionedTable() { - Column[] columns = - new Column[] { - mockColumn("col1", Types.StringType.get()), mockColumn("col2", Types.ByteType.get()) - }; - String comment = "mock comment"; - Map properties = ImmutableMap.of("k1", "v1"); - Transform[] transforms = new Transform[] {Transforms.identity("col1")}; - return mockPartitionedTable(table, columns, comment, properties, transforms, partitionNames); - } - - @SuppressWarnings("FormatStringAnnotation") - private Table mockPartitionedTable( - String tableName, - Column[] columns, - String comment, - Map properties, - Transform[] transforms, - String[] partitionNames) { - Table mockedTable = mockTable(tableName, columns, comment, properties, transforms); - when(mockedTable.supportPartitions()) - .thenReturn( - new SupportsPartitions() { - @Override - public String[] listPartitionNames() { - return partitionNames; - } - - @Override - public Partition[] listPartitions() { - return partitions.values().toArray(new Partition[0]); - } - - @Override - public Partition getPartition(String partitionName) throws NoSuchPartitionException { - Partition partition = partitions.get(partitionName); - if (partition == null) { - throw new NoSuchPartitionException(partitionName); - } - return partition; - } - - @Override - public Partition addPartition(Partition partition) - throws PartitionAlreadyExistsException { - if (partitions.containsKey(partition.name())) { - throw new PartitionAlreadyExistsException(partition.name()); - } else { - return partition; - } - } - - @Override - public boolean dropPartition(String partitionName) { - return partitions.containsKey(partitionName); - } - }); - when(dispatcher.loadTable(any())).thenReturn(mockedTable); - return mockedTable; - } - @Test public void testListPartitionNames() { - Table mockedTable = mockPartitionedTable(); + when(dispatcher.listPartitionNames(any())).thenReturn(partitionNames); Response resp = target(partitionPath(metalake, catalog, schema, table)) @@ -215,7 +143,7 @@ public void testListPartitionNames() { Assertions.assertEquals(partitionNames[1], names[1]); // Test throws exception - doThrow(new RuntimeException("test exception")).when(mockedTable).supportPartitions(); + doThrow(new RuntimeException("test exception")).when(dispatcher).listPartitionNames(any()); Response resp2 = target(partitionPath(metalake, catalog, schema, table)) .request(MediaType.APPLICATION_JSON_TYPE) @@ -233,7 +161,7 @@ public void testListPartitionNames() { @Test public void testListPartitions() { - Table mockedTable = mockPartitionedTable(); + when(dispatcher.listPartitions(any())).thenReturn(partitions); Response resp = target(partitionPath(metalake, catalog, schema, table)) @@ -254,7 +182,7 @@ public void testListPartitions() { Assertions.assertEquals(DTOConverters.toDTO(partition2), partitions[1]); // Test throws exception - doThrow(new RuntimeException("test exception")).when(mockedTable).supportPartitions(); + doThrow(new RuntimeException("test exception")).when(dispatcher).listPartitions(any()); Response resp2 = target(partitionPath(metalake, catalog, schema, table)) .queryParam("details", "true") @@ -273,7 +201,7 @@ public void testListPartitions() { @Test public void testGetPartition() { - mockPartitionedTable(); + when(dispatcher.getPartition(any(), any())).thenReturn(partition1); Response resp = target(partitionPath(metalake, catalog, schema, table) + partitionNames[0]) @@ -291,6 +219,8 @@ public void testGetPartition() { Assertions.assertEquals(DTOConverters.toDTO(partition1), partition); // Test throws exception + doThrow(new NoSuchPartitionException("p3")).when(dispatcher).getPartition(any(), any()); + Response resp2 = target(partitionPath(metalake, catalog, schema, table) + "p3") .request(MediaType.APPLICATION_JSON_TYPE) @@ -306,17 +236,10 @@ public void testGetPartition() { @Test public void testAddPartition() { - mockPartitionedTable(); - - Partition newPartition = - Partitions.identity( - "p3", - new String[][] {colName}, - new Literal[] {Literals.stringLiteral("v3")}, - Maps.newHashMap()); + when(dispatcher.addPartition(any(), any())).thenReturn(partition1); AddPartitionsRequest req = - new AddPartitionsRequest(new PartitionDTO[] {DTOConverters.toDTO(newPartition)}); + new AddPartitionsRequest(new PartitionDTO[] {DTOConverters.toDTO(partition1)}); Response resp = target(partitionPath(metalake, catalog, schema, table)) .request(MediaType.APPLICATION_JSON_TYPE) @@ -331,9 +254,13 @@ public void testAddPartition() { Partition[] partition = partitionResp.getPartitions(); Assertions.assertEquals(1, partition.length); - Assertions.assertEquals(DTOConverters.toDTO(newPartition), partition[0]); + Assertions.assertEquals(DTOConverters.toDTO(partition1), partition[0]); // Test throws exception + doThrow(new PartitionAlreadyExistsException("mock error")) + .when(dispatcher) + .addPartition(any(), any()); + req = new AddPartitionsRequest(new PartitionDTO[] {DTOConverters.toDTO(partition1)}); Response resp2 = target(partitionPath(metalake, catalog, schema, table)) @@ -347,14 +274,12 @@ public void testAddPartition() { Assertions.assertEquals(ErrorConstants.ALREADY_EXISTS_CODE, errorResp2.getCode()); Assertions.assertEquals( PartitionAlreadyExistsException.class.getSimpleName(), errorResp2.getType()); - Assertions.assertTrue(errorResp2.getMessage().contains(partition1.name())); + Assertions.assertTrue(errorResp2.getMessage().contains("mock error")); } @Test public void testDropPartition() { - mockPartitionedTable(); - - // drop exist partition with ifExists=ture + when(dispatcher.dropPartition(any(), any())).thenReturn(true); Response resp = target(partitionPath(metalake, catalog, schema, table) + "p1") .queryParam("purge", "false") @@ -370,6 +295,7 @@ public void testDropPartition() { Assertions.assertTrue(dropResponse.dropped()); // Test drop no-exist partition and return false + when(dispatcher.dropPartition(any(), any())).thenReturn(false); Response resp1 = target(partitionPath(metalake, catalog, schema, table) + "p5") .queryParam("purge", "false") @@ -380,6 +306,7 @@ public void testDropPartition() { Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp1.getStatus()); Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + doThrow(new RuntimeException("test exception")).when(dispatcher).dropPartition(any(), any()); DropResponse noExistDropResponse = resp1.readEntity(DropResponse.class); Assertions.assertEquals(0, noExistDropResponse.getCode()); Assertions.assertFalse(noExistDropResponse.dropped());