From 64e16f5f0393493fccf5212764100fc252a5b901 Mon Sep 17 00:00:00 2001 From: FANNG Date: Mon, 23 Sep 2024 10:54:11 +0800 Subject: [PATCH] [#4718] fix(iceberg): use unified logic to transform catalog backend name to handle the renaming of catalog (#4900) ### What changes were proposed in this pull request? 1. Spark,Trino, Iceberg catalog and Iceberg REST server use `getCatalogBackendName` to get catalog backend name. 2. change the default backend name to catalog backend, like `jdbc`, it will not change after rename. ### Why are the changes needed? Fix: #4718 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? 1. create a jdbc catalog with catalog backend name, check whether can see the schema after rename 1. create a jdbc catalog without catalog backend name, check whether can see the schema after rename --- catalogs/catalog-common/build.gradle.kts | 5 ++ .../iceberg/IcebergPropertiesUtils.java | 22 ++++++++ .../iceberg/TestIcebergPropertiesUtils.java | 55 +++++++++++++++++++ .../iceberg/IcebergCatalogOperations.java | 3 - docs/lakehouse-iceberg-catalog.md | 18 ++---- .../iceberg/common/IcebergConfig.java | 5 +- .../common/utils/IcebergCatalogUtil.java | 18 ++++-- .../iceberg/GravitinoIcebergCatalog.java | 7 +-- .../IcebergCatalogPropertyConverter.java | 22 +++++--- .../iceberg/IcebergConnectorAdapter.java | 1 - .../connector/metadata/GravitinoCatalog.java | 3 +- 11 files changed, 121 insertions(+), 38 deletions(-) create mode 100644 catalogs/catalog-common/src/test/java/org/apache/gravitino/lakehouse/iceberg/TestIcebergPropertiesUtils.java diff --git a/catalogs/catalog-common/build.gradle.kts b/catalogs/catalog-common/build.gradle.kts index ef3785f7ca9..c9a76ca9078 100644 --- a/catalogs/catalog-common/build.gradle.kts +++ b/catalogs/catalog-common/build.gradle.kts @@ -25,4 +25,9 @@ plugins { dependencies { implementation(libs.slf4j.api) implementation(libs.guava) + + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + + testRuntimeOnly(libs.junit.jupiter.engine) } diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java index 3420daa9724..5a1dbf7052e 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java @@ -20,7 +20,11 @@ import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; +import org.apache.gravitino.storage.OSSProperties; +import org.apache.gravitino.storage.S3Properties; public class IcebergPropertiesUtils { @@ -66,4 +70,22 @@ public static Map toIcebergCatalogProperties( }); return icebergProperties; } + + /** + * Get catalog backend name from Gravitino catalog properties. + * + * @param catalogProperties a map of Gravitino catalog properties. + * @return catalog backend name. + */ + public static String getCatalogBackendName(Map catalogProperties) { + String backendName = catalogProperties.get(IcebergConstants.CATALOG_BACKEND_NAME); + if (backendName != null) { + return backendName; + } + + String catalogBackend = catalogProperties.get(IcebergConstants.CATALOG_BACKEND); + return Optional.ofNullable(catalogBackend) + .map(s -> s.toLowerCase(Locale.ROOT)) + .orElse("memory"); + } } diff --git a/catalogs/catalog-common/src/test/java/org/apache/gravitino/lakehouse/iceberg/TestIcebergPropertiesUtils.java b/catalogs/catalog-common/src/test/java/org/apache/gravitino/lakehouse/iceberg/TestIcebergPropertiesUtils.java new file mode 100644 index 00000000000..b8efe974ded --- /dev/null +++ b/catalogs/catalog-common/src/test/java/org/apache/gravitino/lakehouse/iceberg/TestIcebergPropertiesUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lakehouse.iceberg; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestIcebergPropertiesUtils { + + @Test + void testGetCatalogBackendName() { + Map catalogProperties = + ImmutableMap.of( + IcebergConstants.CATALOG_BACKEND_NAME, "a", IcebergConstants.CATALOG_BACKEND, "jdbc"); + String backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties); + Assertions.assertEquals("a", backendName); + + catalogProperties = ImmutableMap.of(IcebergConstants.CATALOG_BACKEND, "jdbc"); + backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties); + Assertions.assertEquals("jdbc", backendName); + + catalogProperties = ImmutableMap.of(IcebergConstants.CATALOG_BACKEND, "JDBC"); + backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties); + Assertions.assertEquals("jdbc", backendName); + + catalogProperties = ImmutableMap.of(IcebergConstants.CATALOG_BACKEND, "hive"); + backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties); + Assertions.assertEquals("hive", backendName); + + catalogProperties = ImmutableMap.of(); + backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties); + Assertions.assertEquals("memory", backendName); + } +} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 67a0471f87e..19400d97c78 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -108,9 +108,6 @@ public void initialize( Map resultConf = Maps.newHashMap(prefixMap); resultConf.putAll(gravitinoConfig); resultConf.put("catalog_uuid", info.id().toString()); - if (!resultConf.containsKey(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME)) { - resultConf.put(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME, info.name()); - } IcebergConfig icebergConfig = new IcebergConfig(resultConf); this.icebergTableOps = new IcebergTableOps(icebergConfig); diff --git a/docs/lakehouse-iceberg-catalog.md b/docs/lakehouse-iceberg-catalog.md index 1b341598979..298c2519374 100644 --- a/docs/lakehouse-iceberg-catalog.md +++ b/docs/lakehouse-iceberg-catalog.md @@ -33,18 +33,12 @@ Builds with Apache Iceberg `1.5.2`. The Apache Iceberg table format version is ` ### Catalog properties -| Property name | Description | Default value | Required | Since Version | -|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------|-------------------------------------------------------------|---------------| -| `catalog-backend` | Catalog backend of Gravitino Iceberg catalog. Supports `hive` or `jdbc` or `rest`. | (none) | Yes | 0.2.0 | -| `uri` | The URI configuration of the Iceberg catalog. `thrift://127.0.0.1:9083` or `jdbc:postgresql://127.0.0.1:5432/db_name` or `jdbc:mysql://127.0.0.1:3306/metastore_db` or `http://127.0.0.1:9001`. | (none) | Yes | 0.2.0 | -| `warehouse` | Warehouse directory of catalog. `file:///user/hive/warehouse-hive/` for local fs or `hdfs://namespace/hdfs/path` for HDFS. | (none) | Yes | 0.2.0 | -| `catalog-backend-name` | The catalog name passed to underlying Iceberg catalog backend. Catalog name in JDBC backend is used to isolate namespace and tables. | Gravitino catalog name | No | 0.5.2 | -| `authentication.type` | The type of authentication for Iceberg catalog backend, currently Gravitino only supports `Kerberos`, `simple`. | `simple` | No | 0.6.0 | -| `authentication.impersonation-enable` | Whether to enable impersonation for the Iceberg catalog | `false` | No | 0.6.0 | -| `authentication.kerberos.principal` | The principal of the Kerberos authentication | (none) | required if the value of `authentication.type` is Kerberos. | 0.6.0 | -| `authentication.kerberos.keytab-uri` | The URI of The keytab for the Kerberos authentication. | (none) | required if the value of `authentication.type` is Kerberos. | 0.6.0 | -| `authentication.kerberos.check-interval-sec` | The check interval of Kerberos credential for Iceberg catalog. | 60 | No | 0.6.0 | -| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`. | 60 | No | 0.6.0 | +| Property name | Description | Default value | Required | Since Version | +|----------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|-------------------------------------------------------------|---------------| +| `catalog-backend` | Catalog backend of Gravitino Iceberg catalog. Supports `hive` or `jdbc` or `rest`. | (none) | Yes | 0.2.0 | +| `uri` | The URI configuration of the Iceberg catalog. `thrift://127.0.0.1:9083` or `jdbc:postgresql://127.0.0.1:5432/db_name` or `jdbc:mysql://127.0.0.1:3306/metastore_db` or `http://127.0.0.1:9001`. | (none) | Yes | 0.2.0 | +| `warehouse` | Warehouse directory of catalog. `file:///user/hive/warehouse-hive/` for local fs or `hdfs://namespace/hdfs/path` for HDFS. | (none) | Yes | 0.2.0 | +| `catalog-backend-name` | The catalog name passed to underlying Iceberg catalog backend. Catalog name in JDBC backend is used to isolate namespace and tables. | The property value of `catalog-backend`, like `jdbc` for JDBC catalog backend. | No | 0.5.2 | Any properties not defined by Gravitino with `gravitino.bypass.` prefix will pass to Iceberg catalog properties and HDFS configuration. For example, if specify `gravitino.bypass.list-all-tables`, `list-all-tables` will pass to Iceberg catalog properties. diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java index 8bf9b86d019..a671204294b 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableMap; import java.util.Map; -import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Config; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; @@ -162,8 +161,8 @@ public String getJdbcDriver() { return get(JDBC_DRIVER); } - public String getCatalogBackendName(String defaultCatalogBackendName) { - return Optional.ofNullable(get(CATALOG_BACKEND_NAME)).orElse(defaultCatalogBackendName); + public String getCatalogBackendName() { + return IcebergPropertiesUtils.getCatalogBackendName(getAllConfig()); } public IcebergConfig(Map properties) { diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java index 8f171bea65c..88aa6002453 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java @@ -52,7 +52,7 @@ public class IcebergCatalogUtil { private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogUtil.class); private static InMemoryCatalog loadMemoryCatalog(IcebergConfig icebergConfig) { - String icebergCatalogName = icebergConfig.getCatalogBackendName("memory"); + String icebergCatalogName = icebergConfig.getCatalogBackendName(); InMemoryCatalog memoryCatalog = new InMemoryCatalog(); Map resultProperties = icebergConfig.getIcebergCatalogProperties(); resultProperties.put(CatalogProperties.WAREHOUSE_LOCATION, "/tmp"); @@ -63,7 +63,7 @@ private static InMemoryCatalog loadMemoryCatalog(IcebergConfig icebergConfig) { private static HiveCatalog loadHiveCatalog(IcebergConfig icebergConfig) { ClosableHiveCatalog hiveCatalog = new ClosableHiveCatalog(); HdfsConfiguration hdfsConfiguration = new HdfsConfiguration(); - String icebergCatalogName = icebergConfig.getCatalogBackendName("hive"); + String icebergCatalogName = icebergConfig.getCatalogBackendName(); Map properties = icebergConfig.getIcebergCatalogProperties(); properties.forEach(hdfsConfiguration::set); @@ -110,7 +110,7 @@ private static KerberosClient initKerberosAndReturnClient( private static JdbcCatalog loadJdbcCatalog(IcebergConfig icebergConfig) { String driverClassName = icebergConfig.getJdbcDriver(); - String icebergCatalogName = icebergConfig.getCatalogBackendName("jdbc"); + String icebergCatalogName = icebergConfig.getCatalogBackendName(); Map properties = icebergConfig.getIcebergCatalogProperties(); Preconditions.checkNotNull( @@ -136,7 +136,7 @@ private static JdbcCatalog loadJdbcCatalog(IcebergConfig icebergConfig) { } private static Catalog loadRestCatalog(IcebergConfig icebergConfig) { - String icebergCatalogName = icebergConfig.getCatalogBackendName("rest"); + String icebergCatalogName = icebergConfig.getCatalogBackendName(); RESTCatalog restCatalog = new RESTCatalog(); HdfsConfiguration hdfsConfiguration = new HdfsConfiguration(); Map properties = icebergConfig.getIcebergCatalogProperties(); @@ -146,6 +146,16 @@ private static Catalog loadRestCatalog(IcebergConfig icebergConfig) { return restCatalog; } + private static Catalog loadCustomCatalog(IcebergConfig icebergConfig) { + String customCatalogName = icebergConfig.getCatalogBackendName(); + String className = icebergConfig.get(IcebergConfig.CATALOG_BACKEND_IMPL); + return CatalogUtil.loadCatalog( + className, + customCatalogName, + icebergConfig.getIcebergCatalogProperties(), + new HdfsConfiguration()); + } + @VisibleForTesting static Catalog loadCatalogBackend(String catalogType) { return loadCatalogBackend( diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java index 68aaef5ea47..ccadc69e49a 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java @@ -22,7 +22,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; -import java.util.Optional; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; import org.apache.gravitino.rel.Table; import org.apache.gravitino.spark.connector.PropertiesConverter; import org.apache.gravitino.spark.connector.SparkTransformConverter; @@ -58,10 +58,7 @@ public class GravitinoIcebergCatalog extends BaseCatalog @Override protected TableCatalog createAndInitSparkCatalog( String name, CaseInsensitiveStringMap options, Map properties) { - String catalogBackendName = - Optional.ofNullable( - properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_NAME)) - .orElse(name); + String catalogBackendName = IcebergPropertiesUtils.getCatalogBackendName(properties); Map all = getPropertiesConverter().toSparkCatalogProperties(options, properties); TableCatalog icebergCatalog = new SparkCatalog(); diff --git a/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java b/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java index 692c3d5cea8..c3afb001ecc 100644 --- a/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java +++ b/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.commons.collections4.bidimap.TreeBidiMap; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; import org.apache.gravitino.catalog.property.PropertyConverter; import org.apache.gravitino.trino.connector.GravitinoErrorCode; @@ -345,17 +346,22 @@ private Map buildJDBCBackendProperties(Map prope Map jdbcProperties = new HashMap<>(); jdbcProperties.put("iceberg.catalog.type", "jdbc"); - jdbcProperties.put("iceberg.jdbc-catalog.driver-class", properties.get("jdbc-driver")); - jdbcProperties.put("iceberg.jdbc-catalog.connection-url", properties.get("uri")); - jdbcProperties.put("iceberg.jdbc-catalog.connection-user", properties.get("jdbc-user")); - jdbcProperties.put("iceberg.jdbc-catalog.connection-password", properties.get("jdbc-password")); - jdbcProperties.put("iceberg.jdbc-catalog.default-warehouse-dir", properties.get("warehouse")); + jdbcProperties.put( + "iceberg.jdbc-catalog.driver-class", + properties.get(IcebergConstants.GRAVITINO_JDBC_DRIVER)); + jdbcProperties.put("iceberg.jdbc-catalog.connection-url", properties.get(IcebergConstants.URI)); + jdbcProperties.put( + "iceberg.jdbc-catalog.connection-user", + properties.get(IcebergConstants.GRAVITINO_JDBC_USER)); + jdbcProperties.put( + "iceberg.jdbc-catalog.connection-password", + properties.get(IcebergConstants.GRAVITINO_JDBC_PASSWORD)); + jdbcProperties.put( + "iceberg.jdbc-catalog.default-warehouse-dir", properties.get(IcebergConstants.WAREHOUSE)); - // TODO (yuhui) Optimize the code for retrieve the catalogname - String catalogName = properties.get("catalog-name"); jdbcProperties.put( "iceberg.jdbc-catalog.catalog-name", - properties.getOrDefault(IcebergConstants.CATALOG_BACKEND_NAME, catalogName)); + IcebergPropertiesUtils.getCatalogBackendName(properties)); return jdbcProperties; } diff --git a/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java b/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java index 1841b4d0495..89b8f0198cf 100644 --- a/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java +++ b/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java @@ -46,7 +46,6 @@ public IcebergConnectorAdapter() { @Override public Map buildInternalConnectorConfig(GravitinoCatalog catalog) throws Exception { - catalog.getProperties().put("catalog-name", catalog.getName()); return catalogConverter.gravitinoToEngineProperties(catalog.getProperties()); } diff --git a/trino-connector/src/main/java/org/apache/gravitino/trino/connector/metadata/GravitinoCatalog.java b/trino-connector/src/main/java/org/apache/gravitino/trino/connector/metadata/GravitinoCatalog.java index 124d9aa2d74..d59e59e9f73 100644 --- a/trino-connector/src/main/java/org/apache/gravitino/trino/connector/metadata/GravitinoCatalog.java +++ b/trino-connector/src/main/java/org/apache/gravitino/trino/connector/metadata/GravitinoCatalog.java @@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.json.JsonMapper; import io.trino.spi.TrinoException; import java.time.Instant; -import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; @@ -60,7 +59,7 @@ public GravitinoCatalog(String metalake, Catalog catalog) { this.metalake = metalake; this.provider = catalog.provider(); this.name = catalog.name(); - this.properties = new HashMap<>(catalog.properties()); + this.properties = catalog.properties(); Instant time = catalog.auditInfo().lastModifiedTime() == null ? catalog.auditInfo().createTime()