diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index bec07775f7a..642c256dc8c 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -314,6 +314,11 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty */ @Override public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { + NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels()); + if (!schemaExists(schemaIdent)) { + throw new NoSuchSchemaException("Schema (database) does not exist %s", namespace); + } + try { ListTablesResponse listTablesResponse = icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace)); diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index ffdc62e653b..d7253f5112a 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -30,7 +30,6 @@ dependencies { testImplementation(project(":server")) testImplementation(project(":server-common")) testImplementation(project(":spark-connector")) { - exclude("org.apache.iceberg") exclude("org.apache.hadoop", "hadoop-client-api") exclude("org.apache.hadoop", "hadoop-client-runtime") } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java index 731836370fe..04f1c0dbc05 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker; import com.google.common.collect.ImmutableMap; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -15,6 +16,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -22,14 +24,17 @@ import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIf; -import org.junit.platform.commons.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class SparkCommonIT extends SparkEnvIT { + private static final Logger LOG = LoggerFactory.getLogger(SparkCommonIT.class); // To generate test data for write&read table. protected static final Map typeConstant = @@ -61,14 +66,36 @@ private static String getInsertWithPartitionSql( // Whether supports [CLUSTERED BY col_name3 SORTED BY col_name INTO num_buckets BUCKETS] protected abstract boolean supportsSparkSQLClusteredBy(); - // Use a custom database not the original default database because SparkIT couldn't read&write - // data to tables in default database. The main reason is default database location is + protected abstract boolean supportsPartition(); + + // Use a custom database not the original default database because SparkCommonIT couldn't + // read&write data to tables in default database. The main reason is default database location is // determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address // not real HDFS address. The location of tables created under default database is like // hdfs://localhost:9000/xxx which couldn't read write data from SparkCommonIT. Will use default // database after spark connector support Alter database xx set location command. @BeforeAll - void initDefaultDatabase() { + void initDefaultDatabase() throws IOException { + // In embedded mode, derby acts as the backend database for the hive metastore + // and creates a directory named metastore_db to store metadata, + // supporting only one connection at a time. + // Previously, only SparkHiveCatalogIT accessed derby without any exceptions. + // Now, SparkIcebergCatalogIT exists at the same time. + // This exception about `ERROR XSDB6: Another instance of Derby may have already + // booted the database {GRAVITINO_HOME}/integration-test/metastore_db` will occur when + // SparkIcebergCatalogIT is initialized after the Sparkhivecatalogit is executed. + // The main reason is that the lock file in the metastore_db directory is not cleaned so that a + // new connection cannot be created, + // so a clean operation is done here to ensure that a new connection can be created. + File hiveLocalMetaStorePath = new File("metastore_db"); + try { + if (hiveLocalMetaStorePath.exists()) { + FileUtils.deleteDirectory(hiveLocalMetaStorePath); + } + } catch (IOException e) { + LOG.error(String.format("delete director %s failed.", hiveLocalMetaStorePath), e); + throw e; + } sql("USE " + getCatalogName()); createDatabaseIfNotExists(getDefaultDatabase()); } @@ -79,6 +106,26 @@ void init() { sql("USE " + getDefaultDatabase()); } + @AfterAll + void cleanUp() { + sql("USE " + getCatalogName()); + getDatabases() + .forEach(database -> sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", database))); + } + + @Test + void testListTables() { + String tableName = "t_list"; + dropTableIfExists(tableName); + Set tableNames = listTableNames(); + Assertions.assertFalse(tableNames.contains(tableName)); + createSimpleTable(tableName); + tableNames = listTableNames(); + Assertions.assertTrue(tableNames.contains(tableName)); + Assertions.assertThrowsExactly( + NoSuchNamespaceException.class, () -> sql("SHOW TABLES IN nonexistent_schema")); + } + @Test void testLoadCatalogs() { Set catalogs = getCatalogs(); @@ -89,20 +136,20 @@ void testLoadCatalogs() { void testCreateAndLoadSchema() { String testDatabaseName = "t_create1"; dropDatabaseIfExists(testDatabaseName); - sql("CREATE DATABASE " + testDatabaseName); + sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);"); Map databaseMeta = getDatabaseMetadata(testDatabaseName); Assertions.assertFalse(databaseMeta.containsKey("Comment")); Assertions.assertTrue(databaseMeta.containsKey("Location")); Assertions.assertEquals("datastrato", databaseMeta.get("Owner")); String properties = databaseMeta.get("Properties"); - Assertions.assertTrue(StringUtils.isBlank(properties)); + Assertions.assertTrue(properties.contains("(ID,001)")); testDatabaseName = "t_create2"; dropDatabaseIfExists(testDatabaseName); String testDatabaseLocation = "/tmp/" + testDatabaseName; sql( String.format( - "CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=001);", + "CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=002);", testDatabaseName, testDatabaseLocation)); databaseMeta = getDatabaseMetadata(testDatabaseName); String comment = databaseMeta.get("Comment"); @@ -111,19 +158,22 @@ void testCreateAndLoadSchema() { // underlying catalog may change /tmp/t_create2 to file:/tmp/t_create2 Assertions.assertTrue(databaseMeta.get("Location").contains(testDatabaseLocation)); properties = databaseMeta.get("Properties"); - Assertions.assertEquals("((ID,001))", properties); + Assertions.assertTrue(properties.contains("(ID,002)")); } @Test void testAlterSchema() { String testDatabaseName = "t_alter"; dropDatabaseIfExists(testDatabaseName); - sql("CREATE DATABASE " + testDatabaseName); + sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);"); Assertions.assertTrue( - StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties"))); + getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,001)")); - sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='001')", testDatabaseName)); - Assertions.assertEquals("((ID,001))", getDatabaseMetadata(testDatabaseName).get("Properties")); + sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='002')", testDatabaseName)); + Assertions.assertFalse( + getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,001)")); + Assertions.assertTrue( + getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,002)")); // Hive metastore doesn't support alter database location, therefore this test method // doesn't verify ALTER DATABASE database_name SET LOCATION 'new_location'. @@ -334,9 +384,9 @@ void testAlterTableUpdateColumnType() { checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName)); sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 int)", tableName)); - sql(String.format("ALTER TABLE %S CHANGE COLUMN col1 col1 string", tableName)); + sql(String.format("ALTER TABLE %S CHANGE COLUMN col1 col1 bigint", tableName)); ArrayList updateColumns = new ArrayList<>(simpleTableColumns); - updateColumns.add(SparkColumnInfo.of("col1", DataTypes.StringType, null)); + updateColumns.add(SparkColumnInfo.of("col1", DataTypes.LongType, null)); checkTableColumns(tableName, updateColumns, getTableInfo(tableName)); } @@ -354,7 +404,7 @@ void testAlterTableRenameColumn() { sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 int)", tableName)); sql( String.format( - "ALTER TABLE %S RENAME COLUMN %S TO %S", tableName, oldColumnName, newColumnName)); + "ALTER TABLE %s RENAME COLUMN %s TO %s", tableName, oldColumnName, newColumnName)); ArrayList renameColumns = new ArrayList<>(simpleTableColumns); renameColumns.add(SparkColumnInfo.of(newColumnName, DataTypes.IntegerType, null)); checkTableColumns(tableName, renameColumns, getTableInfo(tableName)); @@ -373,7 +423,7 @@ void testUpdateColumnPosition() { sql( String.format( - "CREATE TABLE %s (id STRING COMMENT '', name STRING COMMENT '', age STRING COMMENT '') USING PARQUET", + "CREATE TABLE %s (id STRING COMMENT '', name STRING COMMENT '', age STRING COMMENT '')", tableName)); checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName)); @@ -456,12 +506,13 @@ void testComplexType() { } @Test + @EnabledIf("supportsPartition") void testCreateDatasourceFormatPartitionTable() { String tableName = "datasource_partition_table"; dropTableIfExists(tableName); String createTableSQL = getCreateSimpleTableString(tableName); - createTableSQL = createTableSQL + "USING PARQUET PARTITIONED BY (name, age)"; + createTableSQL = createTableSQL + " USING PARQUET PARTITIONED BY (name, age)"; sql(createTableSQL); SparkTableInfo tableInfo = getTableInfo(tableName); SparkTableInfoChecker checker = @@ -558,6 +609,7 @@ void testInsertTableAsSelect() { } @Test + @EnabledIf("supportsPartition") void testInsertDatasourceFormatPartitionTableAsSelect() { String tableName = "insert_select_partition_table"; String newTableName = "new_" + tableName; diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkEnvIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkEnvIT.java index 373fbdca4a6..096402e6121 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkEnvIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkEnvIT.java @@ -12,6 +12,7 @@ import com.datastrato.gravitino.integration.test.container.HiveContainer; import com.datastrato.gravitino.integration.test.util.spark.SparkUtilIT; import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig; +import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; import com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin; import com.google.common.collect.Maps; import java.io.IOException; @@ -37,6 +38,7 @@ public abstract class SparkEnvIT extends SparkUtilIT { private SparkSession sparkSession; private String hiveMetastoreUri = "thrift://127.0.0.1:9083"; private String gravitinoUri = "http://127.0.0.1:8090"; + private String warehouse; protected abstract String getCatalogName(); @@ -79,8 +81,18 @@ private void initMetalakeAndCatalogs() { client.createMetalake(NameIdentifier.of(metalakeName), "", Collections.emptyMap()); GravitinoMetalake metalake = client.loadMetalake(NameIdentifier.of(metalakeName)); Map properties = Maps.newHashMap(); - properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri); - + switch (getProvider()) { + case "hive": + properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri); + break; + case "lakehouse-iceberg": + properties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, "hive"); + properties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse); + properties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, hiveMetastoreUri); + break; + default: + throw new IllegalArgumentException("Unsupported provider: " + getProvider()); + } metalake.createCatalog( NameIdentifier.of(metalakeName, getCatalogName()), Catalog.Type.RELATIONAL, @@ -102,6 +114,11 @@ private void initHiveEnv() { "thrift://%s:%d", containerSuite.getHiveContainer().getContainerIpAddress(), HiveContainer.HIVE_METASTORE_PORT); + warehouse = + String.format( + "hdfs://%s:%d/user/hive/warehouse", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT); } private void initHdfsFileSystem() { @@ -129,12 +146,7 @@ private void initSparkEnv() { .config(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri) .config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName) .config("hive.exec.dynamic.partition.mode", "nonstrict") - .config( - "spark.sql.warehouse.dir", - String.format( - "hdfs://%s:%d/user/hive/warehouse", - containerSuite.getHiveContainer().getContainerIpAddress(), - HiveContainer.HDFS_DEFAULTFS_PORT)) + .config("spark.sql.warehouse.dir", warehouse) .enableHiveSupport() .getOrCreate(); } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java index bc513eafa79..91bea87a2aa 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java @@ -40,6 +40,11 @@ protected boolean supportsSparkSQLClusteredBy() { return true; } + @Override + protected boolean supportsPartition() { + return true; + } + @Test public void testCreateHiveFormatPartitionTable() { String tableName = "hive_partition_table"; diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java new file mode 100644 index 00000000000..53cd78db2d1 --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.spark.iceberg; + +import com.datastrato.gravitino.integration.test.spark.SparkCommonIT; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInstance; + +@Tag("gravitino-docker-it") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SparkIcebergCatalogIT extends SparkCommonIT { + + @Override + protected String getCatalogName() { + return "iceberg"; + } + + @Override + protected String getProvider() { + return "lakehouse-iceberg"; + } + + @Override + protected boolean supportsSparkSQLClusteredBy() { + return false; + } + + @Override + protected boolean supportsPartition() { + return false; + } +} diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoCatalogAdaptorFactory.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoCatalogAdaptorFactory.java index 23a13ceeec2..0599f5cad1b 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoCatalogAdaptorFactory.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoCatalogAdaptorFactory.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.spark.connector; import com.datastrato.gravitino.spark.connector.hive.HiveAdaptor; +import com.datastrato.gravitino.spark.connector.iceberg.IcebergAdaptor; import java.util.Locale; /** @@ -17,6 +18,8 @@ public static GravitinoCatalogAdaptor createGravitinoAdaptor(String provider) { switch (provider.toLowerCase(Locale.ROOT)) { case "hive": return new HiveAdaptor(); + case "lakehouse-iceberg": + return new IcebergAdaptor(); default: throw new RuntimeException(String.format("Provider:%s is not supported yet", provider)); } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java index c933cb6d2d7..5b610201817 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java @@ -58,6 +58,7 @@ public void close() { Preconditions.checkState(!isClosed, "Gravitino Catalog is already closed"); isClosed = true; gravitinoClient.close(); + gravitinoCatalogManager = null; } public Catalog getGravitinoCatalogInfo(String name) { diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergAdaptor.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergAdaptor.java new file mode 100644 index 00000000000..cf73dfb0427 --- /dev/null +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergAdaptor.java @@ -0,0 +1,145 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.iceberg; + +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.spark.connector.GravitinoCatalogAdaptor; +import com.datastrato.gravitino.spark.connector.PropertiesConverter; +import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** IcebergAdaptor provides specific operations for Iceberg Catalog to adapt to GravitinoCatalog. */ +public class IcebergAdaptor implements GravitinoCatalogAdaptor { + + private void initHiveProperties( + String catalogBackend, + Map gravitinoProperties, + HashMap icebergProperties) { + String metastoreUri = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI); + Preconditions.checkArgument( + StringUtils.isNotBlank(metastoreUri), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI + + " from Iceberg Catalog properties"); + String hiveWarehouse = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE); + Preconditions.checkArgument( + StringUtils.isNotBlank(hiveWarehouse), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE + + " from Iceberg Catalog properties"); + icebergProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE, + catalogBackend.toLowerCase(Locale.ENGLISH)); + icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, metastoreUri); + icebergProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse); + } + + private void initJdbcProperties( + String catalogBackend, + Map gravitinoProperties, + HashMap icebergProperties) { + String jdbcUri = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcUri), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI + + " from Iceberg Catalog properties"); + String jdbcWarehouse = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcWarehouse), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE + + " from Iceberg Catalog properties"); + String jdbcUser = gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_USER); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcUser), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_JDBC_USER + + " from Iceberg Catalog properties"); + String jdbcPassword = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcPassword), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD + + " from Iceberg Catalog properties"); + String jdbcDriver = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcDriver), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER + + " from Iceberg Catalog properties"); + icebergProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE, + catalogBackend.toLowerCase(Locale.ROOT)); + icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, jdbcUri); + icebergProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse); + icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_USER, jdbcUser); + icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_PASSWORD, jdbcPassword); + icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER, jdbcDriver); + } + + @Override + public PropertiesConverter getPropertiesConverter() { + return new IcebergPropertiesConverter(); + } + + @Override + public SparkBaseTable createSparkTable( + Identifier identifier, + Table gravitinoTable, + TableCatalog sparkCatalog, + PropertiesConverter propertiesConverter) { + return new SparkIcebergTable(identifier, gravitinoTable, sparkCatalog, propertiesConverter); + } + + @Override + public TableCatalog createAndInitSparkCatalog( + String name, CaseInsensitiveStringMap options, Map properties) { + Preconditions.checkArgument( + properties != null, "Iceberg Catalog properties should not be null"); + + String catalogBackend = + properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend should not be empty."); + + HashMap all = new HashMap<>(options); + + switch (catalogBackend.toLowerCase(Locale.ENGLISH)) { + case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE: + initHiveProperties(catalogBackend, properties, all); + break; + case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC: + initJdbcProperties(catalogBackend, properties, all); + break; + default: + // SparkCatalog does not support Memory type catalog + throw new IllegalArgumentException( + "Unsupported Iceberg Catalog backend: " + catalogBackend); + } + + TableCatalog icebergCatalog = new SparkCatalog(); + icebergCatalog.initialize(name, new CaseInsensitiveStringMap(all)); + + return icebergCatalog; + } +} diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java new file mode 100644 index 00000000000..d69964785ab --- /dev/null +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.iceberg; + +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata; +import com.google.common.annotations.VisibleForTesting; + +public class IcebergPropertiesConstants { + + @VisibleForTesting + public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND = + IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME; + + @VisibleForTesting + public static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = + IcebergCatalogPropertiesMetadata.WAREHOUSE; + + @VisibleForTesting + public static final String GRAVITINO_ICEBERG_CATALOG_URI = IcebergCatalogPropertiesMetadata.URI; + + public static final String GRAVITINO_JDBC_USER = + IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_USER; + public static final String GRAVITINO_ICEBERG_JDBC_USER = + IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_USER; + public static final String GRAVITINO_JDBC_PASSWORD = + IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_PASSWORD; + public static final String GRAVITINO_ICEBERG_JDBC_PASSWORD = + IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_PASSWORD; + public static final String GRAVITINO_ICEBERG_JDBC_DRIVER = + IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_DRIVER; + + public static final String GRAVITINO_ICEBERG_CATALOG_TYPE = "type"; + public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; + public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; + + private IcebergPropertiesConstants() {} +} diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java new file mode 100644 index 00000000000..f96107c814d --- /dev/null +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.iceberg; + +import com.datastrato.gravitino.spark.connector.PropertiesConverter; +import java.util.HashMap; +import java.util.Map; + +/** Transform Iceberg catalog properties between Spark and Gravitino. */ +public class IcebergPropertiesConverter implements PropertiesConverter { + @Override + public Map toGravitinoTableProperties(Map properties) { + return new HashMap<>(properties); + } + + @Override + public Map toSparkTableProperties(Map properties) { + return new HashMap<>(properties); + } +} diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java new file mode 100644 index 00000000000..aabdf149efa --- /dev/null +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.iceberg; + +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.spark.connector.PropertiesConverter; +import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; + +public class SparkIcebergTable extends SparkBaseTable { + + public SparkIcebergTable( + Identifier identifier, + Table gravitinoTable, + TableCatalog sparkIcebergCatalog, + PropertiesConverter propertiesConverter) { + super(identifier, gravitinoTable, sparkIcebergCatalog, propertiesConverter); + } +}