From 7ae7638075b967373fa3ae2ed9512e655e275170 Mon Sep 17 00:00:00 2001 From: caican00 Date: Thu, 7 Mar 2024 16:21:57 +0800 Subject: [PATCH 1/3] [#2448] feat(spark-connector): support table setProperty and removeProperty ops for spark-connector --- .../integration/test/spark/SparkIT.java | 29 ++++++++++++++ .../connector/catalog/GravitinoCatalog.java | 40 ++++++++++++++++++- .../connector/TestTransformTableChange.java | 39 ++++++++++++++++++ 3 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestTransformTableChange.java diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java index ee6c795a966..df24bd5953d 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java @@ -260,6 +260,35 @@ void testListTable() { Assertions.assertThrows(NoSuchNamespaceException.class, () -> listTableNames("not_exists_db")); } + @Test + public void testAlterTableSetProperty() { + String tableName = "testSetProperty"; + dropTableIfExists(tableName); + + createSimpleTable(tableName); + SparkTableInfo oldProperties = getTableInfo(tableName); + Assertions.assertFalse(oldProperties.getTableProperties().containsKey("key1")); + + sql(String.format("alter table %s SET TBLPROPERTIES('key1'='value1')", tableName)); + SparkTableInfo newProperties = getTableInfo(tableName); + Assertions.assertEquals(newProperties.getTableProperties().get("key1"), "value1"); + } + + @Test + public void testAlterTableRemoveProperty() { + String tableName = "testRemoveProperty"; + dropTableIfExists(tableName); + + createSimpleTable(tableName); + sql(String.format("alter table %s SET TBLPROPERTIES('key1'='value1')", tableName)); + SparkTableInfo oldProperties = getTableInfo(tableName); + Assertions.assertTrue(oldProperties.getTableProperties().containsKey("key1")); + + sql(String.format("alter table %s UNSET TBLPROPERTIES('key1')", tableName)); + SparkTableInfo newProperties = getTableInfo(tableName); + Assertions.assertFalse(newProperties.getTableProperties().containsKey("key1")); + } + private void checkTableReadWrite(SparkTableInfo table) { String name = table.getTableIdentifier(); String insertValues = diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java index 3bb1fa8c876..25b1d3629a6 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java @@ -18,6 +18,7 @@ import com.datastrato.gravitino.spark.connector.GravitinoCatalogAdaptorFactory; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTypeConverter; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Arrays; import java.util.HashMap; @@ -164,7 +165,21 @@ public Table createTable( @Override public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException { - throw new NotSupportedException("Doesn't support altering table for now"); + com.datastrato.gravitino.rel.TableChange[] gravitinoTableChanges = + Arrays.stream(changes) + .map(GravitinoCatalog::transformTableChange) + .toArray(com.datastrato.gravitino.rel.TableChange[]::new); + try { + com.datastrato.gravitino.rel.Table table = + gravitinoCatalogClient + .asTableCatalog() + .alterTable( + NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()), + gravitinoTableChanges); + return gravitinoAdaptor.createSparkTable(ident, table, sparkCatalog, propertiesConverter); + } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { + throw new NoSuchTableException(ident); + } } @Override @@ -336,4 +351,27 @@ private String getDatabase(NameIdentifier gravitinoIdentifier) { "Only support 3 level namespace," + gravitinoIdentifier.namespace()); return gravitinoIdentifier.namespace().level(2); } + + @VisibleForTesting + public static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange change) { + if (change instanceof TableChange.SetProperty) { + return transformSetProperty((TableChange.SetProperty) change); + } else if (change instanceof TableChange.RemoveProperty) { + return transformRemoveProperty((TableChange.RemoveProperty) change); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported table change %s", change.getClass().getName())); + } + } + + private static com.datastrato.gravitino.rel.TableChange transformSetProperty( + TableChange.SetProperty sparkChange) { + return com.datastrato.gravitino.rel.TableChange.setProperty( + sparkChange.property(), sparkChange.value()); + } + + private static com.datastrato.gravitino.rel.TableChange transformRemoveProperty( + TableChange.RemoveProperty sparkChange) { + return com.datastrato.gravitino.rel.TableChange.removeProperty(sparkChange.property()); + } } diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestTransformTableChange.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestTransformTableChange.java new file mode 100644 index 00000000000..77a52dcd6a6 --- /dev/null +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestTransformTableChange.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector; + +import com.datastrato.gravitino.spark.connector.catalog.GravitinoCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestTransformTableChange { + + @Test + public void testTransformSetProperty() { + TableChange sparkSetProperty = TableChange.setProperty("key", "value"); + com.datastrato.gravitino.rel.TableChange tableChange = + GravitinoCatalog.transformTableChange(sparkSetProperty); + Assertions.assertTrue( + tableChange instanceof com.datastrato.gravitino.rel.TableChange.SetProperty); + com.datastrato.gravitino.rel.TableChange.SetProperty gravitinoSetProperty = + (com.datastrato.gravitino.rel.TableChange.SetProperty) tableChange; + Assertions.assertEquals("key", gravitinoSetProperty.getProperty()); + Assertions.assertEquals("value", gravitinoSetProperty.getValue()); + } + + @Test + public void testTransformRemoveProperty() { + TableChange sparkRemoveProperty = TableChange.removeProperty("key"); + com.datastrato.gravitino.rel.TableChange tableChange = + GravitinoCatalog.transformTableChange(sparkRemoveProperty); + Assertions.assertTrue( + tableChange instanceof com.datastrato.gravitino.rel.TableChange.RemoveProperty); + com.datastrato.gravitino.rel.TableChange.RemoveProperty gravitinoRemoveProperty = + (com.datastrato.gravitino.rel.TableChange.RemoveProperty) tableChange; + Assertions.assertEquals("key", gravitinoRemoveProperty.getProperty()); + } +} From 22c0f2e672737ff11f9d563a10e46cdf8bde6f12 Mon Sep 17 00:00:00 2001 From: caican00 Date: Thu, 7 Mar 2024 18:59:33 +0800 Subject: [PATCH 2/3] fix --- .../integration/test/spark/SparkIT.java | 35 +++++++------------ .../connector/catalog/GravitinoCatalog.java | 2 +- .../TestTransformTableChange.java | 7 ++-- 3 files changed, 16 insertions(+), 28 deletions(-) rename spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/{ => catalog}/TestTransformTableChange.java (87%) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java index df24bd5953d..918c5b0740b 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java @@ -261,32 +261,21 @@ void testListTable() { } @Test - public void testAlterTableSetProperty() { - String tableName = "testSetProperty"; + void testAlterTableSetAndRemoveProperty() { + String tableName = "test_property"; dropTableIfExists(tableName); createSimpleTable(tableName); - SparkTableInfo oldProperties = getTableInfo(tableName); - Assertions.assertFalse(oldProperties.getTableProperties().containsKey("key1")); - - sql(String.format("alter table %s SET TBLPROPERTIES('key1'='value1')", tableName)); - SparkTableInfo newProperties = getTableInfo(tableName); - Assertions.assertEquals(newProperties.getTableProperties().get("key1"), "value1"); - } - - @Test - public void testAlterTableRemoveProperty() { - String tableName = "testRemoveProperty"; - dropTableIfExists(tableName); - - createSimpleTable(tableName); - sql(String.format("alter table %s SET TBLPROPERTIES('key1'='value1')", tableName)); - SparkTableInfo oldProperties = getTableInfo(tableName); - Assertions.assertTrue(oldProperties.getTableProperties().containsKey("key1")); - - sql(String.format("alter table %s UNSET TBLPROPERTIES('key1')", tableName)); - SparkTableInfo newProperties = getTableInfo(tableName); - Assertions.assertFalse(newProperties.getTableProperties().containsKey("key1")); + sql( + String.format( + "ALTER TABLE %s SET TBLPROPERTIES('key1'='value1', 'key2'='value2')", tableName)); + Map oldProperties = getTableInfo(tableName).getTableProperties(); + Assertions.assertTrue(oldProperties.containsKey("key1") && oldProperties.containsKey("key2")); + + sql(String.format("ALTER TABLE %s UNSET TBLPROPERTIES('key1')", tableName)); + Map newProperties = getTableInfo(tableName).getTableProperties(); + Assertions.assertFalse(newProperties.containsKey("key1")); + Assertions.assertTrue(newProperties.containsKey("key2")); } private void checkTableReadWrite(SparkTableInfo table) { diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java index 25b1d3629a6..d247e57cbe0 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java @@ -353,7 +353,7 @@ private String getDatabase(NameIdentifier gravitinoIdentifier) { } @VisibleForTesting - public static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange change) { + static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange change) { if (change instanceof TableChange.SetProperty) { return transformSetProperty((TableChange.SetProperty) change); } else if (change instanceof TableChange.RemoveProperty) { diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestTransformTableChange.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java similarity index 87% rename from spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestTransformTableChange.java rename to spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java index 77a52dcd6a6..3b35d0ee459 100644 --- a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestTransformTableChange.java +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java @@ -3,9 +3,8 @@ * This software is licensed under the Apache License version 2. */ -package com.datastrato.gravitino.spark.connector; +package com.datastrato.gravitino.spark.connector.catalog; -import com.datastrato.gravitino.spark.connector.catalog.GravitinoCatalog; import org.apache.spark.sql.connector.catalog.TableChange; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -13,7 +12,7 @@ public class TestTransformTableChange { @Test - public void testTransformSetProperty() { + void testTransformSetProperty() { TableChange sparkSetProperty = TableChange.setProperty("key", "value"); com.datastrato.gravitino.rel.TableChange tableChange = GravitinoCatalog.transformTableChange(sparkSetProperty); @@ -26,7 +25,7 @@ public void testTransformSetProperty() { } @Test - public void testTransformRemoveProperty() { + void testTransformRemoveProperty() { TableChange sparkRemoveProperty = TableChange.removeProperty("key"); com.datastrato.gravitino.rel.TableChange tableChange = GravitinoCatalog.transformTableChange(sparkRemoveProperty); From 188bcddb697a068d72e278f856089ccdf3ef9180 Mon Sep 17 00:00:00 2001 From: caican00 Date: Thu, 7 Mar 2024 19:21:50 +0800 Subject: [PATCH 3/3] fix --- .../connector/catalog/GravitinoCatalog.java | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java index d247e57cbe0..71c4f600969 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java @@ -355,23 +355,15 @@ private String getDatabase(NameIdentifier gravitinoIdentifier) { @VisibleForTesting static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange change) { if (change instanceof TableChange.SetProperty) { - return transformSetProperty((TableChange.SetProperty) change); + TableChange.SetProperty setProperty = (TableChange.SetProperty) change; + return com.datastrato.gravitino.rel.TableChange.setProperty( + setProperty.property(), setProperty.value()); } else if (change instanceof TableChange.RemoveProperty) { - return transformRemoveProperty((TableChange.RemoveProperty) change); + TableChange.RemoveProperty removeProperty = (TableChange.RemoveProperty) change; + return com.datastrato.gravitino.rel.TableChange.removeProperty(removeProperty.property()); } else { throw new UnsupportedOperationException( String.format("Unsupported table change %s", change.getClass().getName())); } } - - private static com.datastrato.gravitino.rel.TableChange transformSetProperty( - TableChange.SetProperty sparkChange) { - return com.datastrato.gravitino.rel.TableChange.setProperty( - sparkChange.property(), sparkChange.value()); - } - - private static com.datastrato.gravitino.rel.TableChange transformRemoveProperty( - TableChange.RemoveProperty sparkChange) { - return com.datastrato.gravitino.rel.TableChange.removeProperty(sparkChange.property()); - } }