From 6d8163a1c90303f33408620a35fe6691cd3cbf16 Mon Sep 17 00:00:00 2001 From: Mayank Vadariya <48036907+mayankvadariya@users.noreply.github.com> Date: Wed, 16 Oct 2024 20:40:09 -0400 Subject: [PATCH] Add support for case-insensitive object names in Iceberg REST catalog Certain Rest catalog implementation such as Polaris supports case-sensitive object(namespace, table, view etc.) names. This change allows querying mixed/upper case letter object names in rest catalog from Trino. `iceberg.rest-catalog.case-insensitive-name-matching` controls whether to match lowercase object names in Trino with different case object names in rest catalog with the limitations that only single object name with the identical name is supported. --- .../main/sphinx/object-storage/metastores.md | 5 + .../rest/IcebergRestCatalogConfig.java | 34 +++ .../rest/TrinoIcebergRestCatalogFactory.java | 21 +- .../catalog/rest/TrinoRestCatalog.java | 183 ++++++++++-- ...gPolarisCatalogCaseInsensitiveMapping.java | 280 ++++++++++++++++++ .../rest/TestIcebergRestCatalogConfig.java | 12 +- .../catalog/rest/TestTrinoRestCatalog.java | 7 +- 7 files changed, 511 insertions(+), 31 deletions(-) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogCaseInsensitiveMapping.java diff --git a/docs/src/main/sphinx/object-storage/metastores.md b/docs/src/main/sphinx/object-storage/metastores.md index 98f2c5971ea5..4eec3a03f670 100644 --- a/docs/src/main/sphinx/object-storage/metastores.md +++ b/docs/src/main/sphinx/object-storage/metastores.md @@ -503,6 +503,11 @@ following properties: * - `iceberg.rest-catalog.nested-namespace-enabled` - Support querying objects under nested namespace. Defaults to `false`. +* - `iceberg.rest-catalog.case-insensitive-name-matching` + - Match namespace, table, and view names case insensitively. Defaults to `false`. +* - `iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl` + - [Duration](prop-type-duration) for which case-insensitive namespace, table, + and view names are cached. Defaults to `1m`. ::: The following example shows a minimal catalog configuration using an Iceberg diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java index fa95ab729337..0da5e4c607a9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java @@ -15,12 +15,16 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; +import io.airlift.units.MinDuration; import jakarta.validation.constraints.NotNull; import org.apache.iceberg.catalog.Namespace; import java.net.URI; import java.util.Optional; +import static java.util.concurrent.TimeUnit.MINUTES; + public class IcebergRestCatalogConfig { public enum Security @@ -43,6 +47,8 @@ public enum SessionType private Security security = Security.NONE; private SessionType sessionType = SessionType.NONE; private boolean vendedCredentialsEnabled; + private boolean caseInsensitiveNameMatching; + private Duration caseInsensitiveNameMatchingCacheTtl = new Duration(1, MINUTES); @NotNull public URI getBaseUri() @@ -152,4 +158,32 @@ public IcebergRestCatalogConfig setVendedCredentialsEnabled(boolean vendedCreden this.vendedCredentialsEnabled = vendedCredentialsEnabled; return this; } + + public boolean isCaseInsensitiveNameMatching() + { + return caseInsensitiveNameMatching; + } + + @Config("iceberg.rest-catalog.case-insensitive-name-matching") + @ConfigDescription("Match object names case-insensitively") + public IcebergRestCatalogConfig setCaseInsensitiveNameMatching(boolean caseInsensitiveNameMatching) + { + this.caseInsensitiveNameMatching = caseInsensitiveNameMatching; + return this; + } + + @NotNull + @MinDuration("0ms") + public Duration getCaseInsensitiveNameMatchingCacheTtl() + { + return caseInsensitiveNameMatchingCacheTtl; + } + + @Config("iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl") + @ConfigDescription("Duration to keep case insensitive object mapping prior to eviction") + public IcebergRestCatalogConfig setCaseInsensitiveNameMatchingCacheTtl(Duration caseInsensitiveNameMatchingCacheTtl) + { + this.caseInsensitiveNameMatchingCacheTtl = caseInsensitiveNameMatchingCacheTtl; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java index d5c5f9bfb3c8..ca57ab4d05da 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java @@ -13,10 +13,12 @@ */ package io.trino.plugin.iceberg.catalog.rest; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; +import io.trino.cache.EvictableCacheBuilder; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergFileSystemFactory; @@ -29,6 +31,7 @@ import io.trino.spi.type.TypeManager; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTSessionCatalog; @@ -38,6 +41,7 @@ import java.util.Set; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; @@ -57,6 +61,9 @@ public class TrinoIcebergRestCatalogFactory private final SecurityProperties securityProperties; private final boolean uniqueTableLocation; private final TypeManager typeManager; + private final boolean caseInsensitiveNameMatching; + private final Cache remoteNamespaceMappingCache; + private final Cache remoteTableMappingCache; @GuardedBy("this") private RESTSessionCatalog icebergCatalog; @@ -86,6 +93,15 @@ public TrinoIcebergRestCatalogFactory( requireNonNull(icebergConfig, "icebergConfig is null"); this.uniqueTableLocation = icebergConfig.isUniqueTableLocation(); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.caseInsensitiveNameMatching = restConfig.isCaseInsensitiveNameMatching(); + this.remoteNamespaceMappingCache = EvictableCacheBuilder.newBuilder() + .expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS) + .shareNothingWhenDisabled() + .build(); + this.remoteTableMappingCache = EvictableCacheBuilder.newBuilder() + .expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS) + .shareNothingWhenDisabled() + .build(); } @Override @@ -132,6 +148,9 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) nestedNamespaceEnabled, trinoVersion, typeManager, - uniqueTableLocation); + uniqueTableLocation, + caseInsensitiveNameMatching, + remoteNamespaceMappingCache, + remoteTableMappingCache); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 0d9d188d4f23..d6e4ffe9acc8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg.catalog.rest; +import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; @@ -93,6 +94,7 @@ import static io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog.ICEBERG_VIEW_RUN_AS_OWNER; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; +import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static java.util.stream.Collectors.joining; @@ -115,6 +117,9 @@ public class TrinoRestCatalog private final boolean nestedNamespaceEnabled; private final String trinoVersion; private final boolean useUniqueTableLocation; + private final boolean caseInsensitiveNameMatching; + private final Cache remoteNamespaceMappingCache; + private final Cache remoteTableMappingCache; private final Cache tableCache = EvictableCacheBuilder.newBuilder() .maximumSize(PER_QUERY_CACHE_SIZE) @@ -129,7 +134,10 @@ public TrinoRestCatalog( boolean nestedNamespaceEnabled, String trinoVersion, TypeManager typeManager, - boolean useUniqueTableLocation) + boolean useUniqueTableLocation, + boolean caseInsensitiveNameMatching, + Cache remoteNamespaceMappingCache, + Cache remoteTableMappingCache) { this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null"); this.catalogName = requireNonNull(catalogName, "catalogName is null"); @@ -140,6 +148,9 @@ public TrinoRestCatalog( this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.useUniqueTableLocation = useUniqueTableLocation; + this.caseInsensitiveNameMatching = caseInsensitiveNameMatching; + this.remoteNamespaceMappingCache = requireNonNull(remoteNamespaceMappingCache, "remoteNamespaceMappingCache is null"); + this.remoteTableMappingCache = requireNonNull(remoteTableMappingCache, "remoteTableMappingCache is null"); } @Override @@ -151,7 +162,7 @@ public Optional getNamespaceSeparator() @Override public boolean namespaceExists(ConnectorSession session, String namespace) { - return restSessionCatalog.namespaceExists(convert(session), toNamespace(namespace)); + return restSessionCatalog.namespaceExists(convert(session), toRemoteNamespace(session, toNamespace(namespace))); } @Override @@ -178,7 +189,7 @@ private List collectNamespaces(ConnectorSession session, Namespace paren public void dropNamespace(ConnectorSession session, String namespace) { try { - restSessionCatalog.dropNamespace(convert(session), toNamespace(namespace)); + restSessionCatalog.dropNamespace(convert(session), toRemoteNamespace(session, toNamespace(namespace))); } catch (NoSuchNamespaceException e) { throw new SchemaNotFoundException(namespace); @@ -186,6 +197,9 @@ public void dropNamespace(ConnectorSession session, String namespace) catch (RESTException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop namespace: %s", namespace), e); } + if (caseInsensitiveNameMatching) { + remoteNamespaceMappingCache.invalidate(toNamespace(namespace)); + } } @Override @@ -193,7 +207,7 @@ public Map loadNamespaceMetadata(ConnectorSession session, Strin { try { // Return immutable metadata as direct modifications will not be reflected on the namespace - return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toNamespace(namespace))); + return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toRemoteNamespace(session, toNamespace(namespace)))); } catch (NoSuchNamespaceException e) { throw new SchemaNotFoundException(namespace); @@ -241,10 +255,10 @@ public List listTables(ConnectorSession session, Optional nam ImmutableList.Builder tables = ImmutableList.builder(); for (Namespace restNamespace : namespaces) { - listTableIdentifiers(restNamespace, () -> restSessionCatalog.listTables(sessionContext, restNamespace)).stream() + listTableIdentifiers(restNamespace, () -> restSessionCatalog.listTables(sessionContext, toRemoteNamespace(session, restNamespace))).stream() .map(id -> new TableInfo(SchemaTableName.schemaTableName(toSchemaName(id.namespace()), id.name()), TableInfo.ExtendedRelationType.TABLE)) .forEach(tables::add); - listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, restNamespace)).stream() + listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, toRemoteNamespace(session, restNamespace))).stream() .map(id -> new TableInfo(SchemaTableName.schemaTableName(toSchemaName(id.namespace()), id.name()), TableInfo.ExtendedRelationType.OTHER_VIEW)) .forEach(tables::add); } @@ -259,7 +273,7 @@ public List listViews(ConnectorSession session, Optional viewNames = ImmutableList.builder(); for (Namespace restNamespace : namespaces) { - listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, restNamespace)).stream() + listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, toRemoteNamespace(session, restNamespace))).stream() .map(id -> SchemaTableName.schemaTableName(id.namespace().toString(), id.name())) .forEach(viewNames::add); } @@ -313,7 +327,7 @@ public Transaction newCreateTableTransaction( String location, Map properties) { - return restSessionCatalog.buildTable(convert(session), toIdentifier(schemaTableName), schema) + return restSessionCatalog.buildTable(convert(session), toRemoteTable(session, schemaTableName, true), schema) .withPartitionSpec(partitionSpec) .withSortOrder(sortOrder) .withLocation(location) @@ -331,7 +345,7 @@ public Transaction newCreateOrReplaceTableTransaction( String location, Map properties) { - return restSessionCatalog.buildTable(convert(session), toIdentifier(schemaTableName), schema) + return restSessionCatalog.buildTable(convert(session), toRemoteTable(session, schemaTableName, true), schema) .withPartitionSpec(partitionSpec) .withSortOrder(sortOrder) .withLocation(location) @@ -342,25 +356,28 @@ public Transaction newCreateOrReplaceTableTransaction( @Override public void registerTable(ConnectorSession session, SchemaTableName tableName, TableMetadata tableMetadata) { - restSessionCatalog.registerTable(convert(session), toIdentifier(tableName), tableMetadata.metadataFileLocation()); + TableIdentifier tableIdentifier = TableIdentifier.of(toRemoteNamespace(session, toNamespace(tableName.getSchemaName())), tableName.getTableName()); + restSessionCatalog.registerTable(convert(session), tableIdentifier, tableMetadata.metadataFileLocation()); } @Override public void unregisterTable(ConnectorSession session, SchemaTableName tableName) { - if (!restSessionCatalog.dropTable(convert(session), toIdentifier(tableName))) { + if (!restSessionCatalog.dropTable(convert(session), toRemoteTable(session, tableName, true))) { throw new TableNotFoundException(tableName); } invalidateTableCache(tableName); + invalidateTableMappingCache(tableName); } @Override public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { - if (!restSessionCatalog.purgeTable(convert(session), toIdentifier(schemaTableName))) { + if (!restSessionCatalog.purgeTable(convert(session), toRemoteTable(session, schemaTableName, true))) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop table: %s", schemaTableName)); } invalidateTableCache(schemaTableName); + invalidateTableMappingCache(schemaTableName); } @Override @@ -375,12 +392,13 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { try { - restSessionCatalog.renameTable(convert(session), toIdentifier(from), toIdentifier(to)); + restSessionCatalog.renameTable(convert(session), toRemoteTable(session, from, true), toRemoteTable(session, to, true)); } catch (RESTException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to rename table %s to %s", from, to), e); } invalidateTableCache(from); + invalidateTableMappingCache(from); } @Override @@ -392,9 +410,7 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName tableCache, schemaTableName, () -> { - TableIdentifier identifier = TableIdentifier.of(namespace, schemaTableName.getTableName()); - - BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), identifier); + BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toRemoteObject(session, schemaTableName)); // Creating a new base table is necessary to adhere to Trino's expectations for quoted table names return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName)); }); @@ -407,6 +423,23 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName } } + private TableIdentifier toRemoteObject(ConnectorSession session, SchemaTableName schemaTableName) + { + TableIdentifier remoteTable = toRemoteTable(session, schemaTableName, false); + if (!remoteTable.name().equals(schemaTableName.getTableName())) { + return remoteTable; + } + + TableIdentifier remoteView = toRemoteView(session, schemaTableName, false); + if (!remoteView.name().equals(schemaTableName.getTableName())) { + return remoteView; + } + if (remoteView.name().equals(schemaTableName.getTableName()) && remoteTable.name().equals(schemaTableName.getTableName())) { + return remoteTable; + } + throw new RuntimeException("Unable to find remote object"); + } + @Override public Map> tryGetColumnMetadata(ConnectorSession session, List tables) { @@ -416,7 +449,7 @@ public Map> tryGetColumnMetadata(Connector @Override public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment) { - Table icebergTable = restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); + Table icebergTable = restSessionCatalog.loadTable(convert(session), toRemoteTable(session, schemaTableName, true)); if (comment.isEmpty()) { icebergTable.updateProperties().remove(TABLE_COMMENT).commit(); } @@ -460,10 +493,10 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, definition.getOwner().ifPresent(owner -> properties.put(ICEBERG_VIEW_RUN_AS_OWNER, owner)); definition.getComment().ifPresent(comment -> properties.put(COMMENT, comment)); Schema schema = IcebergUtil.schemaFromViewColumns(typeManager, definition.getColumns()); - ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toIdentifier(schemaViewName)); + ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toRemoteView(session, schemaViewName, true)); viewBuilder = viewBuilder.withSchema(schema) .withQuery("trino", definition.getOriginalSql()) - .withDefaultNamespace(toNamespace(schemaViewName.getSchemaName())) + .withDefaultNamespace(toRemoteNamespace(session, toNamespace(schemaViewName.getSchemaName()))) .withDefaultCatalog(definition.getCatalog().orElse(null)) .withProperties(properties.buildOrThrow()) .withLocation(defaultTableLocation(session, schemaViewName)); @@ -479,7 +512,8 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, @Override public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) { - restSessionCatalog.renameView(convert(session), toIdentifier(source), toIdentifier(target)); + restSessionCatalog.renameView(convert(session), toRemoteView(session, source, true), toRemoteView(session, target, true)); + invalidateTableMappingCache(source); } @Override @@ -491,7 +525,8 @@ public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaVie @Override public void dropView(ConnectorSession session, SchemaTableName schemaViewName) { - restSessionCatalog.dropView(convert(session), toIdentifier(schemaViewName)); + restSessionCatalog.dropView(convert(session), toRemoteView(session, schemaViewName, true)); + invalidateTableMappingCache(schemaViewName); } @Override @@ -500,7 +535,7 @@ public Map getViews(ConnectorSession s SessionContext sessionContext = convert(session); ImmutableMap.Builder views = ImmutableMap.builder(); for (Namespace restNamespace : listNamespaces(session, namespace)) { - for (TableIdentifier restView : restSessionCatalog.listViews(sessionContext, restNamespace)) { + for (TableIdentifier restView : restSessionCatalog.listViews(sessionContext, toRemoteNamespace(session, restNamespace))) { SchemaTableName schemaTableName = SchemaTableName.schemaTableName(restView.namespace().toString(), restView.name()); try { getView(session, schemaTableName).ifPresent(view -> views.put(schemaTableName, view)); @@ -521,7 +556,7 @@ public Map getViews(ConnectorSession s @Override public Optional getView(ConnectorSession session, SchemaTableName viewName) { - return getIcebergView(session, viewName).flatMap(view -> { + return getIcebergView(session, viewName, false).flatMap(view -> { SQLViewRepresentation sqlView = view.sqlFor("trino"); if (!sqlView.dialect().equalsIgnoreCase("trino")) { throw new TrinoException(ICEBERG_UNSUPPORTED_VIEW_DIALECT, "Cannot read unsupported dialect '%s' for view '%s'".formatted(sqlView.dialect(), viewName)); @@ -541,10 +576,10 @@ public Optional getView(ConnectorSession session, Schem }); } - private Optional getIcebergView(ConnectorSession session, SchemaTableName viewName) + private Optional getIcebergView(ConnectorSession session, SchemaTableName viewName, boolean getCached) { try { - return Optional.of(restSessionCatalog.loadView(convert(session), toIdentifier(viewName))); + return Optional.of(restSessionCatalog.loadView(convert(session), toRemoteView(session, viewName, getCached))); } catch (NoSuchViewException e) { return Optional.empty(); @@ -616,7 +651,7 @@ public Optional redirectTable(ConnectorSession session, @Override public void updateViewComment(ConnectorSession session, SchemaTableName schemaViewName, Optional comment) { - View view = getIcebergView(session, schemaViewName).orElseThrow(() -> new ViewNotFoundException(schemaViewName)); + View view = getIcebergView(session, schemaViewName, true).orElseThrow(() -> new ViewNotFoundException(schemaViewName)); UpdateViewProperties updateViewProperties = view.updateProperties(); comment.ifPresentOrElse( value -> updateViewProperties.set(COMMENT, value), @@ -627,7 +662,7 @@ public void updateViewComment(ConnectorSession session, SchemaTableName schemaVi @Override public void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional comment) { - View view = getIcebergView(session, schemaViewName) + View view = getIcebergView(session, schemaViewName, true) .orElseThrow(() -> new ViewNotFoundException(schemaViewName)); ViewVersion current = view.currentVersion(); @@ -685,6 +720,13 @@ private void invalidateTableCache(SchemaTableName schemaTableName) tableCache.invalidate(schemaTableName); } + private void invalidateTableMappingCache(SchemaTableName schemaTableName) + { + if (caseInsensitiveNameMatching) { + remoteTableMappingCache.invalidate(toIdentifier(schemaTableName)); + } + } + private Namespace toNamespace(String schemaName) { if (!nestedNamespaceEnabled && schemaName.contains(NAMESPACE_SEPARATOR)) { @@ -726,4 +768,91 @@ private List listNamespaces(ConnectorSession session, Optional findRemoteTable(session, tableIdentifier), getCached); + } + + private TableIdentifier findRemoteTable(ConnectorSession session, TableIdentifier tableIdentifier) + { + Namespace remoteNamespace = toRemoteNamespace(session, tableIdentifier.namespace()); + List tableIdentifiers = restSessionCatalog.listTables(convert(session), remoteNamespace); + TableIdentifier matchingTable = null; + for (TableIdentifier identifier : tableIdentifiers) { + if (identifier.name().equalsIgnoreCase(tableIdentifier.name())) { + if (matchingTable != null) { + throw new TrinoException(NOT_SUPPORTED, "Duplicate table names are not supported with Iceberg REST catalog: " + + Joiner.on(", ").join(matchingTable, identifier.name())); + } + matchingTable = identifier; + } + } + return matchingTable == null ? TableIdentifier.of(remoteNamespace, tableIdentifier.name()) : matchingTable; + } + + private TableIdentifier toRemoteView(ConnectorSession session, SchemaTableName schemaViewName, boolean getCached) + { + TableIdentifier tableIdentifier = toIdentifier(schemaViewName); + return toRemoteObject(tableIdentifier, () -> findRemoteView(session, tableIdentifier), getCached); + } + + private TableIdentifier findRemoteView(ConnectorSession session, TableIdentifier tableIdentifier) + { + Namespace remoteNamespace = toRemoteNamespace(session, tableIdentifier.namespace()); + List tableIdentifiers = restSessionCatalog.listViews(convert(session), remoteNamespace); + TableIdentifier matchingView = null; + for (TableIdentifier identifier : tableIdentifiers) { + if (identifier.name().equalsIgnoreCase(tableIdentifier.name())) { + if (matchingView != null) { + throw new TrinoException(NOT_SUPPORTED, "Duplicate view names are not supported with Iceberg REST catalog: " + + Joiner.on(", ").join(matchingView.name(), identifier.name())); + } + matchingView = identifier; + } + } + return matchingView == null ? TableIdentifier.of(remoteNamespace, tableIdentifier.name()) : matchingView; + } + + private TableIdentifier toRemoteObject(TableIdentifier tableIdentifier, Supplier remoteObjectProvider, boolean getCached) + { + if (caseInsensitiveNameMatching) { + if (getCached) { + return uncheckedCacheGet(remoteTableMappingCache, tableIdentifier, remoteObjectProvider); + } + return remoteObjectProvider.get(); + } + return tableIdentifier; + } + + private Namespace toRemoteNamespace(ConnectorSession session, Namespace trinoNamespace) + { + if (caseInsensitiveNameMatching) { + return uncheckedCacheGet(remoteNamespaceMappingCache, trinoNamespace, () -> findRemoteNamespace(session, trinoNamespace)); + } + return trinoNamespace; + } + + private Namespace findRemoteNamespace(ConnectorSession session, Namespace trinoNamespace) + { + List matchingRemoteNamespaces = listNamespaces(session, Namespace.empty()).stream() + .filter(ns -> toTrinoNamespace(ns).equals(trinoNamespace)) + .collect(toImmutableList()); + if (matchingRemoteNamespaces.size() > 1) { + throw new TrinoException(NOT_SUPPORTED, "Duplicate namespace names are not supported with Iceberg REST catalog: " + matchingRemoteNamespaces); + } + return matchingRemoteNamespaces.isEmpty() ? trinoNamespace : matchingRemoteNamespaces.getFirst(); + } + + private List listNamespaces(ConnectorSession session, Namespace parentNamespace) + { + List childNamespaces = restSessionCatalog.listNamespaces(convert(session), parentNamespace); + return childNamespaces.stream().flatMap(childNamespace -> Stream.concat(Stream.of(childNamespace), listNamespaces(session, childNamespace).stream())).toList(); + } + + private static Namespace toTrinoNamespace(Namespace namespace) + { + return Namespace.of(Arrays.stream(namespace.levels()).map(level -> level.toLowerCase(ENGLISH)).toArray(String[]::new)); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogCaseInsensitiveMapping.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogCaseInsensitiveMapping.java new file mode 100644 index 000000000000..2267cc46dac2 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogCaseInsensitiveMapping.java @@ -0,0 +1,280 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog.SessionContext; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.ViewBuilder; +import org.assertj.core.util.Files; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; +import static io.trino.plugin.iceberg.catalog.rest.TestingPolarisCatalog.WAREHOUSE; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.nio.file.Files.createDirectories; +import static java.util.Locale.ENGLISH; +import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; +import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; +import static org.apache.iceberg.rest.auth.OAuth2Properties.SCOPE; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +final class TestIcebergPolarisCatalogCaseInsensitiveMapping + extends AbstractTestQueryFramework +{ + private static final SessionContext SESSION_CONTEXT = new SessionContext("dummy", null, null, ImmutableMap.of(), null); + private static final String SCHEMA = "LeVeL1_" + randomNameSuffix(); + private static final String LOWERCASE_SCHEMA = SCHEMA.toLowerCase(ENGLISH); + private static final Namespace NAMESPACE = Namespace.of(SCHEMA); + + private RESTSessionCatalog icebergCatalog; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + File warehouseLocation = Files.newTemporaryFolder(); + TestingPolarisCatalog polarisCatalog = closeAfterClass(new TestingPolarisCatalog(warehouseLocation.getPath())); + + Map properties = ImmutableMap.builder() + .put(CatalogProperties.URI, polarisCatalog.restUri() + "/api/catalog") + .put(WAREHOUSE_LOCATION, WAREHOUSE) + .put(CREDENTIAL, polarisCatalog.oauth2Credentials()) + .put(SCOPE, "PRINCIPAL_ROLE:ALL") + .put("view-endpoints-supported", "true") + .buildOrThrow(); + + RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog( + config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build(), null); + icebergCatalogInstance.initialize("test_catalog", properties); + + icebergCatalog = icebergCatalogInstance; + closeAfterClass(icebergCatalog); + + return IcebergQueryRunner.builder(LOWERCASE_SCHEMA) + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) + .addIcebergProperty("iceberg.catalog.type", "rest") + .addIcebergProperty("iceberg.rest-catalog.uri", polarisCatalog.restUri() + "/api/catalog") + .addIcebergProperty("iceberg.rest-catalog.warehouse", WAREHOUSE) + .addIcebergProperty("iceberg.rest-catalog.security", "OAUTH2") + .addIcebergProperty("iceberg.rest-catalog.oauth2.credential", polarisCatalog.oauth2Credentials()) + .addIcebergProperty("iceberg.rest-catalog.oauth2.scope", "PRINCIPAL_ROLE:ALL") + .addIcebergProperty("iceberg.rest-catalog.case-insensitive-name-matching", "true") + .addIcebergProperty("iceberg.register-table-procedure.enabled", "true") + .build(); + } + + @BeforeAll + void setup() + { + icebergCatalog.createNamespace(SESSION_CONTEXT, NAMESPACE); + assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()) + .containsExactlyInAnyOrder( + "information_schema", + "tpch", + LOWERCASE_SCHEMA); + + assertThat(computeActual("SHOW SCHEMAS LIKE 'level%'").getOnlyColumnAsSet()) + .containsExactlyInAnyOrder( + LOWERCASE_SCHEMA); + + assertQuery("SELECT * FROM information_schema.schemata", + """ + VALUES + ('iceberg', 'information_schema'), + ('iceberg', '%s'), + ('iceberg', 'tpch') + """.formatted(LOWERCASE_SCHEMA)); + } + + @Test + void testCaseInsensitiveMatchingForTable() + { + Map namespaceMetadata = icebergCatalog.loadNamespaceMetadata(SESSION_CONTEXT, NAMESPACE); + String namespaceLocation = namespaceMetadata.get(LOCATION_PROPERTY); + createDir(namespaceLocation); + + // Create and query a mixed case letter table from Trino + String tableName1 = "MiXed_CaSe_TaBlE1_" + randomNameSuffix(); + String lowercaseTableName1 = tableName1.toLowerCase(ENGLISH); + String table1Location = namespaceLocation + "/" + lowercaseTableName1; + assertUpdate("CREATE TABLE " + tableName1 + " WITH (location = '" + table1Location + "') AS SELECT BIGINT '42' a, DOUBLE '-38.5' b", 1); + assertQuery("SELECT * FROM " + tableName1, "VALUES (42, -38.5)"); + + // Create a mixed case letter table directly using rest catalog and query from Trino + String tableName2 = "mIxEd_cAsE_tAbLe2_" + randomNameSuffix(); + String lowercaseTableName2 = tableName2.toLowerCase(ENGLISH); + String table2Location = namespaceLocation + "/" + lowercaseTableName2; + createDir(table2Location); + createDir(table2Location + "/data"); + createDir(table2Location + "/metadata"); + icebergCatalog + .buildTable(SESSION_CONTEXT, TableIdentifier.of(NAMESPACE, tableName2), new Schema(required(1, "x", Types.LongType.get()))) + .withLocation(table2Location) + .createTransaction() + .commitTransaction(); + assertUpdate("INSERT INTO " + tableName2 + " VALUES (78)", 1); + assertQuery("SELECT * FROM " + tableName2, "VALUES (78)"); + + // Test register/unregister table. Re-register for further testing. + assertThat(icebergCatalog.dropTable(SESSION_CONTEXT, TableIdentifier.of(NAMESPACE, lowercaseTableName1))).isTrue(); + assertQueryFails("SELECT * FROM " + tableName1, ".*'iceberg.%s.%s' does not exist".formatted(LOWERCASE_SCHEMA, lowercaseTableName1)); + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName1 + "', '" + table1Location + "')"); + assertQuery("SELECT * FROM " + tableName1, "VALUES (42, -38.5)"); + assertUpdate("CALL system.unregister_table (CURRENT_SCHEMA, '" + tableName1 + "')"); + assertQueryFails("SELECT * FROM " + tableName1, ".*'iceberg.%s.%s' does not exist".formatted(LOWERCASE_SCHEMA, lowercaseTableName1)); + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName1 + "', '" + table1Location + "')"); + + // Query information_schema and list objects + assertThat(computeActual("SHOW TABLES IN " + SCHEMA).getOnlyColumnAsSet()).contains(lowercaseTableName1, lowercaseTableName2); + assertThat(computeActual("SHOW TABLES IN " + SCHEMA + " LIKE 'mixed_case_table%'").getOnlyColumnAsSet()).isEqualTo(Set.of(lowercaseTableName1, lowercaseTableName2)); + assertQuery("SELECT * FROM information_schema.tables WHERE table_schema != 'information_schema' AND table_type = 'BASE TABLE'", + """ + VALUES + ('iceberg', '%1$s', '%2$s', 'BASE TABLE'), + ('iceberg', '%1$s', '%3$s', 'BASE TABLE') + """.formatted(LOWERCASE_SCHEMA, lowercaseTableName1, lowercaseTableName2)); + + // Add table comment + assertUpdate("COMMENT ON TABLE " + tableName1 + " IS 'test comment' "); + assertThat(getTableComment(lowercaseTableName1)).isEqualTo("test comment"); + + // Add table column comment + assertUpdate("COMMENT ON COLUMN " + tableName1 + ".a IS 'test column comment'"); + assertThat(getColumnComment(lowercaseTableName1, "a")).isEqualTo("test column comment"); + + // Rename table + String renamedTableName1 = tableName1 + "_renamed"; + assertUpdate("ALTER TABLE " + lowercaseTableName1 + " RENAME TO " + renamedTableName1); + assertQueryFails("SELECT * FROM " + tableName1, ".*'iceberg.%s.%s' does not exist".formatted(LOWERCASE_SCHEMA, lowercaseTableName1)); + assertQuery("SELECT * FROM " + renamedTableName1, "VALUES (42, -38.5)"); + + // Drop tables + assertUpdate("DROP TABLE " + renamedTableName1); + assertUpdate("DROP TABLE " + tableName2); + + // Query dropped tablesd + assertQueryFails("SELECT * FROM " + renamedTableName1, ".*'iceberg.%s.%s' does not exist".formatted(LOWERCASE_SCHEMA, renamedTableName1.toLowerCase(ENGLISH))); + assertQueryFails("SELECT * FROM " + tableName2, ".*'iceberg.%s.%s' does not exist".formatted(LOWERCASE_SCHEMA, lowercaseTableName2)); + } + + @Test + void testCaseInsensitiveMatchingForView() + { + Map namespaceMetadata = icebergCatalog.loadNamespaceMetadata(SESSION_CONTEXT, NAMESPACE); + String namespaceLocation = namespaceMetadata.get(LOCATION_PROPERTY); + createDir(namespaceLocation); + + // Create and query a mixed case letter view from Trino + String viewName1 = "MiXed_CaSe_vIeW1_" + randomNameSuffix(); + String lowercaseViewName1 = viewName1.toLowerCase(ENGLISH); + assertUpdate("CREATE VIEW " + viewName1 + " AS SELECT BIGINT '25' a, DOUBLE '99.4' b"); + assertQuery("SELECT * FROM " + viewName1, "VALUES (25, 99.4)"); + + // Create a mixed case letter view directly using rest catalog and query from Trino + String viewName2 = "mIxEd_cAsE_ViEw2_" + randomNameSuffix(); + String lowercaseViewName2 = viewName2.toLowerCase(ENGLISH); + String view2Location = namespaceLocation + "/" + lowercaseViewName2; + createDir(view2Location); + createDir(view2Location + "/data"); + createDir(view2Location + "/metadata"); + ViewBuilder viewBuilder = icebergCatalog.buildView(SESSION_CONTEXT, TableIdentifier.of(NAMESPACE, viewName2)); + viewBuilder + .withQuery("trino", "SELECT BIGINT '34' y") + .withSchema(new Schema(required(1, "y", Types.LongType.get()))) + .withDefaultNamespace(NAMESPACE) + .withLocation(view2Location) + .createOrReplace(); + assertQuery("SELECT * FROM " + viewName2, "VALUES (34)"); + + // Query information_schema and list objects + assertThat(computeActual("SHOW TABLES IN " + SCHEMA).getOnlyColumnAsSet()).contains(lowercaseViewName1, lowercaseViewName2); + assertThat(computeActual("SHOW TABLES IN " + SCHEMA + " LIKE 'mixed_case_view%'").getOnlyColumnAsSet()).contains(lowercaseViewName1, lowercaseViewName2); + assertQuery("SELECT * FROM information_schema.tables WHERE table_schema != 'information_schema' AND table_type = 'VIEW'", + """ + VALUES + ('iceberg', '%1$s', '%2$s', 'VIEW'), + ('iceberg', '%1$s', '%3$s', 'VIEW') + """.formatted(LOWERCASE_SCHEMA, lowercaseViewName1, lowercaseViewName2)); + + // Add view comment + assertUpdate("COMMENT ON VIEW " + viewName1 + " IS 'test comment' "); + assertThat(getTableComment(lowercaseViewName1)).isEqualTo("test comment"); + + // Add view column comment + assertUpdate("COMMENT ON COLUMN " + viewName1 + ".a IS 'test column comment'"); + assertThat(getColumnComment(lowercaseViewName1, "a")).isEqualTo("test column comment"); + + // Rename view + String renamedViewName1 = viewName1 + "_renamed"; + assertUpdate("ALTER VIEW " + lowercaseViewName1 + " RENAME TO " + renamedViewName1); + assertQueryFails("SELECT * FROM " + viewName1, ".*'iceberg.%s.%s' does not exist".formatted(LOWERCASE_SCHEMA, lowercaseViewName1)); + assertQuery("SELECT * FROM " + renamedViewName1, "VALUES (25, 99.4)"); + + // Drop views + assertUpdate("DROP VIEW " + renamedViewName1); + assertUpdate("DROP VIEW " + viewName2); + + // Query dropped views + assertQueryFails("SELECT * FROM " + renamedViewName1, ".*'iceberg.%s.%s' does not exist".formatted(LOWERCASE_SCHEMA, renamedViewName1.toLowerCase(ENGLISH))); + assertQueryFails("SELECT * FROM " + viewName2, ".*'iceberg.%s.%s' does not exist".formatted(LOWERCASE_SCHEMA, lowercaseViewName2)); + } + + private String getTableComment(String tableName) + { + return (String) computeScalar("SELECT comment FROM system.metadata.table_comments " + + "WHERE catalog_name = 'iceberg' AND schema_name = '" + LOWERCASE_SCHEMA + "' AND table_name = '" + tableName + "'"); + } + + private String getColumnComment(String tableName, String columnName) + { + return (String) computeScalar("SELECT comment FROM information_schema.columns " + + "WHERE table_schema = '" + LOWERCASE_SCHEMA + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'"); + } + + private static void createDir(String absoluteDirPath) + { + Path path = Paths.get(URI.create(absoluteDirPath).getPath()); + try { + createDirectories(path); + } + catch (IOException e) { + throw new UncheckedIOException("Cannot create %s directory".formatted(absoluteDirPath), e); + } + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java index a9a36df55e51..cff018d440a4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog.rest; import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; import org.junit.jupiter.api.Test; import java.util.Map; @@ -21,6 +22,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static java.util.concurrent.TimeUnit.MINUTES; public class TestIcebergRestCatalogConfig { @@ -35,7 +37,9 @@ public void testDefaults() .setNestedNamespaceEnabled(false) .setSessionType(IcebergRestCatalogConfig.SessionType.NONE) .setSecurity(IcebergRestCatalogConfig.Security.NONE) - .setVendedCredentialsEnabled(false)); + .setVendedCredentialsEnabled(false) + .setCaseInsensitiveNameMatching(false) + .setCaseInsensitiveNameMatchingCacheTtl(new Duration(1, MINUTES))); } @Test @@ -50,6 +54,8 @@ public void testExplicitPropertyMappings() .put("iceberg.rest-catalog.security", "OAUTH2") .put("iceberg.rest-catalog.session", "USER") .put("iceberg.rest-catalog.vended-credentials-enabled", "true") + .put("iceberg.rest-catalog.case-insensitive-name-matching", "true") + .put("iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl", "3m") .buildOrThrow(); IcebergRestCatalogConfig expected = new IcebergRestCatalogConfig() @@ -60,7 +66,9 @@ public void testExplicitPropertyMappings() .setNestedNamespaceEnabled(true) .setSessionType(IcebergRestCatalogConfig.SessionType.USER) .setSecurity(IcebergRestCatalogConfig.Security.OAUTH2) - .setVendedCredentialsEnabled(true); + .setVendedCredentialsEnabled(true) + .setCaseInsensitiveNameMatching(true) + .setCaseInsensitiveNameMatchingCacheTtl(new Duration(3, MINUTES)); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index a45d29c10a54..1c9d85333c20 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; +import io.trino.cache.EvictableCacheBuilder; import io.trino.metastore.TableInfo; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.iceberg.CommitTaskData; @@ -53,6 +54,7 @@ import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.util.Locale.ENGLISH; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -89,7 +91,10 @@ private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLoc false, "test", new TestingTypeManager(), - useUniqueTableLocations); + useUniqueTableLocations, + false, + EvictableCacheBuilder.newBuilder().expireAfterWrite(1000, MILLISECONDS).shareNothingWhenDisabled().build(), + EvictableCacheBuilder.newBuilder().expireAfterWrite(1000, MILLISECONDS).shareNothingWhenDisabled().build()); } @Test