From ab26883ab935d337dd11fdd698095fe92f035927 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 24 Jun 2024 16:42:48 -0700 Subject: [PATCH] DGS-11418 Ensure aliases are properly qualified --- .../client/rest/entities/ExtendedSchema.java | 4 ++++ .../kafka/schemaregistry/utils/QualifiedSubject.java | 12 ++++++++++++ .../schemaregistry/rest/filters/AliasFilter.java | 11 ++++------- .../kafka/schemaregistry/storage/InMemoryCache.java | 4 ++-- .../schemaregistry/storage/KafkaSchemaRegistry.java | 5 ++++- 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ExtendedSchema.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ExtendedSchema.java index c914afa6aa1..32d6a5084fd 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ExtendedSchema.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ExtendedSchema.java @@ -52,6 +52,10 @@ public ExtendedSchema(Schema schema, List aliases) { this.aliases = aliases; } + public ExtendedSchema copy() { + return new ExtendedSchema(this, aliases); + } + @io.swagger.v3.oas.annotations.media.Schema(description = ALIASES_DESC) @JsonProperty("aliases") public List getAliases() { diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/utils/QualifiedSubject.java b/client/src/main/java/io/confluent/kafka/schemaregistry/utils/QualifiedSubject.java index 63eb156f49d..71995dbed94 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/utils/QualifiedSubject.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/utils/QualifiedSubject.java @@ -164,6 +164,11 @@ public static String qualifiedContextFor(String tenant, String qualifiedSubject) public static QualifiedSubject qualifySubjectWithParent( String tenant, String parent, String subjectWithoutTenant) { + return qualifySubjectWithParent(tenant, parent, subjectWithoutTenant, false); + } + + public static QualifiedSubject qualifySubjectWithParent( + String tenant, String parent, String subjectWithoutTenant, boolean prefixTenant) { // Since the subject has no tenant, pass the default tenant QualifiedSubject qualifiedSubject = QualifiedSubject.create(DEFAULT_TENANT, subjectWithoutTenant); @@ -181,6 +186,13 @@ public static QualifiedSubject qualifySubjectWithParent( DEFAULT_TENANT, qualifiedParent.getContext(), subjectWithoutTenant); } } + if (prefixTenant) { + // Prefix the tenant if prefixTenant is true. + // For example, references are stored without tenant prefixes, + // while alias replacements need the tenant. + qualifiedSubject = new QualifiedSubject( + tenant, qualifiedSubject.getContext(), qualifiedSubject.getSubject()); + } return qualifiedSubject; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/filters/AliasFilter.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/filters/AliasFilter.java index 5433eb6a828..ca79fab2fdf 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/filters/AliasFilter.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/filters/AliasFilter.java @@ -15,12 +15,10 @@ package io.confluent.kafka.schemaregistry.rest.filters; -import static io.confluent.kafka.schemaregistry.utils.QualifiedSubject.DEFAULT_TENANT; -import static io.confluent.kafka.schemaregistry.utils.QualifiedSubject.TENANT_DELIMITER; - import com.google.common.annotations.VisibleForTesting; import io.confluent.kafka.schemaregistry.client.rest.entities.Config; import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import java.io.IOException; import java.net.URI; import java.net.URLDecoder; @@ -131,10 +129,9 @@ private String replaceAlias(String subject) { } String alias = config.getAlias(); if (alias != null && !alias.isEmpty()) { - if (!DEFAULT_TENANT.equals(schemaRegistry.tenant())) { - alias = schemaRegistry.tenant() + TENANT_DELIMITER + alias; - } - return alias; + QualifiedSubject qualAlias = + QualifiedSubject.qualifySubjectWithParent(schemaRegistry.tenant(), subject, alias, true); + return qualAlias.toQualifiedSubject(); } else { return subject; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryCache.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryCache.java index 4c4c08ad469..e714ad3b92e 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryCache.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryCache.java @@ -174,7 +174,7 @@ public void schemaDeleted( addToSchemaHashToGuid(schemaKey, schemaValue); for (SchemaReference ref : schemaValue.getReferences()) { QualifiedSubject refSubject = QualifiedSubject.qualifySubjectWithParent( - tenant(), schemaKey.getSubject(), ref.getSubject()); + tenant(), schemaKey.getSubject(), ref.getSubject(), true); SchemaKey refKey = new SchemaKey(refSubject.toQualifiedSubject(), ref.getVersion()); Map>> ctxRefBy = referencedBy.getOrDefault(tenant(), Collections.emptyMap()); @@ -226,7 +226,7 @@ public void schemaRegistered( addToSchemaHashToGuid(schemaKey, schemaValue); for (SchemaReference ref : schemaValue.getReferences()) { QualifiedSubject refSubject = QualifiedSubject.qualifySubjectWithParent( - tenant(), schemaKey.getSubject(), ref.getSubject()); + tenant(), schemaKey.getSubject(), ref.getSubject(), true); SchemaKey refKey = new SchemaKey(refSubject.toQualifiedSubject(), ref.getVersion()); Map>> ctxRefBy = referencedBy.computeIfAbsent(tenant(), k -> new ConcurrentHashMap<>()); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index f07a8343481..19012e17e72 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -1925,7 +1925,10 @@ private Map> getAliases(String subjectPrefix) if (alias == null) { continue; } - List aliases = subjectToAliases.computeIfAbsent(alias, k -> new ArrayList<>()); + QualifiedSubject qualAlias = + QualifiedSubject.qualifySubjectWithParent(tenant(), subjectPrefix, alias, true); + List aliases = subjectToAliases.computeIfAbsent( + qualAlias.toQualifiedSubject(), k -> new ArrayList<>()); aliases.add(configValue.getSubject()); } return subjectToAliases;