diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 8af4b0de0370c..628a9ed5ba3ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -766,17 +766,39 @@ protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Thr } protected CompletableFuture getSchemaCompatibilityStrategyAsync() { - return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> { - SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( - policies.schema_auto_update_compatibility_strategy); - if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { - schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); - } - } - return schemaCompatibilityStrategy; - }); + return validateTopicOperationAsync(topicName, TopicOperation.GET_SCHEMA_COMPATIBILITY_STRATEGY) + .thenCompose((__) -> { + CompletableFuture future; + if (config().isTopicLevelPoliciesEnabled()) { + future = getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null)); + } else { + future = CompletableFuture.completedFuture(null); + } + + return future.thenCompose((topicSchemaCompatibilityStrategy) -> { + if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) { + return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy); + } + return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> { + SchemaCompatibilityStrategy schemaCompatibilityStrategy = + policies.schema_compatibility_strategy; + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + policies.schema_auto_update_compatibility_strategy); + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); + } + } + return schemaCompatibilityStrategy; + }); + }); + }).whenComplete((__, ex) -> { + if (ex != null) { + log.error("[{}] Failed to get schema compatibility strategy of topic {} {}", + clientAppId(), topicName, ex); + } + }); } @CanIgnoreReturnValue diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 151c499dac94c..d1658ef64e3b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -122,6 +122,7 @@ import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicOperation; @@ -4820,4 +4821,30 @@ private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(Async resumeAsyncResponseExceptionally(asyncResponse, e); } } + + protected CompletableFuture internalGetSchemaCompatibilityStrategy(boolean applied) { + if (applied) { + return getSchemaCompatibilityStrategyAsync(); + } + return validateTopicOperationAsync(topicName, TopicOperation.GET_SCHEMA_COMPATIBILITY_STRATEGY) + .thenCompose(n -> getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> { + if (!op.isPresent()) { + return null; + } + SchemaCompatibilityStrategy strategy = op.get().getSchemaCompatibilityStrategy(); + return SchemaCompatibilityStrategy.isUndefined(strategy) ? null : strategy; + })); + } + + protected CompletableFuture internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) { + return validateTopicOperationAsync(topicName, TopicOperation.SET_SCHEMA_COMPATIBILITY_STRATEGY) + .thenCompose((__) -> getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setSchemaCompatibilityStrategy( + strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, topicPolicies); + })); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 68fd9a8b2c0ee..4681e953cfb32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -64,6 +64,7 @@ import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; @@ -3566,5 +3567,115 @@ public void getReplicatedSubscriptionStatus( internalGetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative); } + @GET + @Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy") + @ApiOperation(value = "Get schema compatibility strategy on a topic") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist")}) + public void getSchemaCompatibilityStrategy( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the cluster", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + + preValidation(authoritative) + .thenCompose(__-> internalGetSchemaCompatibilityStrategy(applied)) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getSchemaCompatibilityStrategy", ex, asyncResponse); + return null; + }); + } + + @PUT + @Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy") + @ApiOperation(value = "Get schema compatibility strategy on a topic") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist")}) + public void setSchemaCompatibilityStrategy( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Strategy used to check the compatibility of new schema") + SchemaCompatibilityStrategy strategy) { + validateTopicName(tenant, namespace, encodedTopic); + + preValidation(authoritative) + .thenCompose(__ -> internalSetSchemaCompatibilityStrategy(strategy)) + .thenRun(() -> { + log.info( + "[{}] Successfully set topic schema compatibility strategy: tenant={}, namespace={}, " + + "topic={}, shemaCompatibilityStrategy = {}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName(), + strategy); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + handleTopicPolicyException("setSchemaCompatibilityStrategy", ex, asyncResponse); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy") + @ApiOperation(value = "Remove schema compatibility strategy on a topic") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist")}) + public void removeSchemaCompatibilityStrategy( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Strategy used to check the compatibility of new schema") + SchemaCompatibilityStrategy strategy) { + validateTopicName(tenant, namespace, encodedTopic); + + preValidation(authoritative) + .thenCompose(__ -> internalSetSchemaCompatibilityStrategy(null)) + .thenRun(() -> { + log.info( + "[{}] Successfully remove topic schema compatibility strategy: tenant={}, namespace={}, " + + "topic={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName()); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeSchemaCompatibilityStrategy", ex, asyncResponse); + return null; + }); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 0470563e9f801..e1506ae578d38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -54,7 +54,6 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; -import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -104,9 +103,6 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener this.topicPolicies.getBackLogQuotaMap().get(type) .updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type))); + updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); + } + + private void updateSchemaCompatibilityStrategyNamespaceValue(Policies namespacePolicies){ + if (isSystemTopic()) { + return; + } + + SchemaCompatibilityStrategy strategy = namespacePolicies.schema_compatibility_strategy; + if (SchemaCompatibilityStrategy.isUndefined(namespacePolicies.schema_compatibility_strategy)) { + strategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + namespacePolicies.schema_auto_update_compatibility_strategy); + } + topicPolicies.getSchemaCompatibilityStrategy() + .updateNamespaceValue(formatSchemaCompatibilityStrategy(strategy)); } private void updateTopicPolicyByBrokerConfig() { @@ -252,6 +273,12 @@ private void updateTopicPolicyByBrokerConfig() { topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis()); topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes()); topicPolicies.getReplicationClusters().updateBrokerValue(Collections.emptyList()); + SchemaCompatibilityStrategy schemaCompatibilityStrategy = config.getSchemaCompatibilityStrategy(); + if (isSystemTopic()) { + schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy(); + } + topicPolicies.getSchemaCompatibilityStrategy() + .updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy)); } private EnumSet subTypeStringsToEnumSet(Set getSubscriptionTypesEnabled) { @@ -456,7 +483,7 @@ public CompletableFuture addSchema(SchemaData schema) { SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); if (allowAutoUpdateSchema()) { - return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy); + return schemaRegistryService.putSchemaIfAbsent(id, schema, getSchemaCompatibilityStrategy()); } else { return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema) @@ -504,7 +531,7 @@ public CompletableFuture checkSchemaCompatibleForConsumer(SchemaData schem String id = TopicName.get(base).getSchemaName(); return brokerService.pulsar() .getSchemaRegistryService() - .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy); + .checkConsumerCompatibility(id, schema, getSchemaCompatibilityStrategy()); } @Override @@ -659,23 +686,6 @@ public long increasePublishLimitedTimes() { return RATE_LIMITED_UPDATER.incrementAndGet(this); } - protected void setSchemaCompatibilityStrategy(Policies policies) { - if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { - schemaCompatibilityStrategy = - brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); - return; - } - - schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( - policies.schema_auto_update_compatibility_strategy); - if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { - schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy(); - } - } - } - private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-") .quantile(0.0) .quantile(0.50) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index fd7ad954ba40d..4809671cd6ef6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -160,7 +160,6 @@ public CompletableFuture initialize() { updateTopicPolicyByNamespacePolicy(policies); isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; - setSchemaCompatibilityStrategy(policies); schemaValidationEnforced = policies.schema_validation_enforced; } }); @@ -1002,7 +1001,6 @@ public CompletableFuture onPoliciesUpdate(Policies data) { updateTopicPolicyByNamespacePolicy(data); isEncryptionRequired = data.encryption_required; - setSchemaCompatibilityStrategy(data); isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; schemaValidationEnforced = data.schema_validation_enforced; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bb81d589b3605..eeae7c0f6d336 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -320,7 +320,6 @@ public CompletableFuture initialize() { this.isEncryptionRequired = policies.encryption_required; - setSchemaCompatibilityStrategy(policies); isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; schemaValidationEnforced = policies.schema_validation_enforced; @@ -2384,7 +2383,6 @@ public CompletableFuture onPoliciesUpdate(Policies data) { isEncryptionRequired = data.encryption_required; - setSchemaCompatibilityStrategy(data); isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; schemaValidationEnforced = data.schema_validation_enforced; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java index 0408f49eab393..0647d820779fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java @@ -19,19 +19,17 @@ package org.apache.pulsar.broker.admin; import com.google.common.collect.Sets; -import java.lang.reflect.Field; import lombok.extern.slf4j.Slf4j; import org.apache.avro.reflect.AvroAlias; import org.apache.avro.reflect.AvroDefault; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; -import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -173,15 +171,10 @@ private void testAutoUpdateDisabled(String namespace, String topicName) throws E admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Full); - for (int i = 0; i < 100; i++) { - Topic t = pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); - // get around fact that field is private and topic can be persistent or non-persistent - Field strategy = t.getClass().getSuperclass().getDeclaredField("schemaCompatibilityStrategy"); - strategy.setAccessible(true); - if (strategy.get(t) == SchemaCompatibilityStrategy.FULL) { - break; - } - } + Awaitility.await().untilAsserted( + () -> Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), + SchemaAutoUpdateCompatibilityStrategy.Full)); + log.info("try with fully compat, again"); try (Producer p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) { p.send(new V4Data("test2", 1, (short)100)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckOnTopicLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckOnTopicLevelTest.java new file mode 100644 index 0000000000000..6d146831af85f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckOnTopicLevelTest.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.schema.compatibility; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; +import com.google.common.collect.Sets; +import java.util.Collections; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.schema.Schemas; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class SchemaTypeCompatibilityCheckOnTopicLevelTest extends MockedPulsarServiceBaseTest { + private static final String CLUSTER_NAME = "test"; + private static final String PUBLIC_TENANT = "public"; + private static final String namespace = "test-namespace"; + private static final String namespaceName = PUBLIC_TENANT + "/" + namespace; + + @BeforeMethod + @Override + public void setup() throws Exception { + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + + super.internalSetup(); + + // Setup namespaces + admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()) + .build()); + + TenantInfo tenantInfo = TenantInfo.builder() + .allowedClusters(Collections.singleton(CLUSTER_NAME)) + .build(); + admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); + admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME)); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testSetAlwaysInCompatibleStrategyOnTopicLevelAndCheckAlwaysInCompatible() + throws PulsarClientException, PulsarServerException, PulsarAdminException { + String topicName = TopicName.get( + TopicDomain.persistent.value(), + PUBLIC_TENANT, + namespace, + "testSetAlwaysInCompatibleStrategyOnTopicLevelAndCheckAlwaysInCompatible" + ).toString(); + + pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsar.getAdminClient().topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + + Awaitility.await() + .untilAsserted(() -> assertEquals( + pulsar.getAdminClient().topicPolicies().getSchemaCompatibilityStrategy(topicName, true), + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE)); + + pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())) + .topic(topicName) + .create(); + + ProducerBuilder producerBuilder = pulsarClient.newProducer( + Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true) + .withPojo(Schemas.PersonThree.class).build())) + .topic(topicName); + + Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); + assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema")); + } + + @Test + public void testSetAlwaysCompatibleOnNamespaceLevelAndCheckAlwaysInCompatible() + throws PulsarClientException, PulsarServerException, PulsarAdminException { + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + + String topicName = TopicName.get( + TopicDomain.persistent.value(), + PUBLIC_TENANT, + namespace, + "testSetAlwaysCompatibleOnNamespaceLevelAndCheckAlwaysInCompatible" + ).toString(); + + pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsar.getAdminClient().topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + + Awaitility.await() + .untilAsserted(() -> assertEquals( + pulsar.getAdminClient().topicPolicies().getSchemaCompatibilityStrategy(topicName, true), + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE)); + + pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())) + .topic(topicName) + .create(); + + ProducerBuilder producerBuilder = pulsarClient.newProducer( + Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true) + .withPojo(Schemas.PersonThree.class).build())) + .topic(topicName); + + Throwable t = + expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); + assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema")); + } + + @Test + public void testDisableTopicPoliciesAndSetAlwaysInCompatibleOnNamespaceLevel() + throws PulsarClientException, PulsarServerException, PulsarAdminException { + conf.setTopicLevelPoliciesEnabled(false); + conf.setSystemTopicEnabled(false); + + admin.namespaces() + .setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + + String topicName = TopicName.get( + TopicDomain.persistent.value(), + PUBLIC_TENANT, + namespace, + "testDisableTopicPoliciesAndSetAlwaysInCompatibleOnNamespaceLevel" + ).toString(); + + pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName); + + pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())) + .topic(topicName) + .create(); + + ProducerBuilder producerBuilder = pulsarClient.newProducer( + Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true) + .withPojo(Schemas.PersonThree.class).build())) + .topic(topicName); + + Throwable t = + expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); + assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema")); + } + + @Test + public void testDisableTopicPoliciesWithDefaultConfig() + throws PulsarClientException, PulsarServerException, PulsarAdminException { + conf.setTopicLevelPoliciesEnabled(false); + conf.setSystemTopicEnabled(false); + + String topicName = TopicName.get( + TopicDomain.persistent.value(), + PUBLIC_TENANT, + namespace, + "testDisableTopicPoliciesWithDefaultConfig" + ).toString(); + + pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName); + + pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())) + .topic(topicName) + .create(); + + pulsarClient.newProducer( + Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true) + .withPojo(Schemas.PersonThree.class).build())) + .topic(topicName).create(); + } + + @Test + public void testDefaultConfig() + throws PulsarClientException, PulsarServerException, PulsarAdminException { + String topicName = TopicName.get( + TopicDomain.persistent.value(), + PUBLIC_TENANT, + namespace, + "testDefaultConfig" + ).toString(); + + pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName); + + pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())) + .topic(topicName) + .create(); + + pulsarClient.newProducer( + Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true) + .withPojo(Schemas.PersonThree.class).build())) + .topic(topicName).create(); + } + + @Test + public void testUpdateSchemaCompatibilityStrategyRepeatedly() + throws PulsarClientException, PulsarServerException, PulsarAdminException { + assertEquals(conf.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL); + + String topicName = TopicName.get( + TopicDomain.persistent.value(), + PUBLIC_TENANT, + namespace, + "testUpdateSchemaCompatibilityStrategyRepeatedly" + ).toString(); + + pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName); + + Awaitility.await().untilAsserted( + () -> assertEquals(admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), + SchemaCompatibilityStrategy.FULL)); + Awaitility.await().untilAsserted( + () -> assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespaceName))); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName), + SchemaCompatibilityStrategy.UNDEFINED)); + + pulsarClient.newProducer( + Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true) + .withPojo(Schemas.PersonOne.class).build())) + .topic(topicName).create(); + + // Set SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE to schema_auto_update_compatibility_strategy on + // namespace level. + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespaceName, + SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespaceName), + SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled)); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName), + SchemaCompatibilityStrategy.UNDEFINED)); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE)); + + ProducerBuilder producerBuilder = + pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())) + .topic(topicName); + + Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); + assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema")); + + // Set SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE to schema_compatibility_strategy on namespace level. + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespaceName), + SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled)); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE)); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE)); + pulsarClient.newProducer( + Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true) + .withPojo(Schemas.PersonOne.class).build())) + .topic(topicName).create(); + + // Set SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE on topic level. + admin.topicPolicies().setSchemaCompatibilityStrategy(topicName,SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE)); + producerBuilder = pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())) + .topic(topicName); + t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); + assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema")); + + // Remove schema compatibility strategy on topic level. + admin.topicPolicies().removeSchemaCompatibilityStrategy(topicName); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE)); + pulsarClient.newProducer( + Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true) + .withPojo(Schemas.PersonOne.class).build())) + .topic(topicName).create(); + + // Remove schema_compatibility_strategy on namespace level. + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName), + SchemaCompatibilityStrategy.UNDEFINED)); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE)); + producerBuilder = pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())) + .topic(topicName); + t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); + assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema")); + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java index 8dcfc31836a1b..5e358095919b5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java @@ -30,6 +30,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; /** @@ -1671,5 +1672,50 @@ CompletableFuture setSubscriptionTypesEnabledAsync(String topic, */ CompletableFuture removeSubscribeRateAsync(String topic) throws PulsarAdminException; + /** + * Get schema compatibility strategy on a topic. + * + * @param topic The topic in whose policy we are interested + * @param applied Get the current applied schema compatibility strategy + */ + SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied) + throws PulsarAdminException; + + /** + * Get schema compatibility strategy on a topic asynchronously. + * + * @param topic The topic in whose policy we are interested + * @param applied Get the current applied schema compatibility strategy + */ + CompletableFuture getSchemaCompatibilityStrategyAsync(String topic, boolean applied); + + /** + * Set schema compatibility strategy on a topic. + * + * @param topic The topic in whose policy should be set + * @param strategy The schema compatibility strategy + */ + void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy) throws PulsarAdminException; + /** + * Set schema compatibility strategy on a topic asynchronously. + * + * @param topic The topic in whose policy should be set + * @param strategy The schema compatibility strategy + */ + CompletableFuture setSchemaCompatibilityStrategyAsync(String topic, SchemaCompatibilityStrategy strategy); + + /** + * Remove schema compatibility strategy on a topic. + * + * @param topic The topic in whose policy should be removed + */ + void removeSchemaCompatibilityStrategy(String topic) throws PulsarAdminException; + + /** + * Remove schema compatibility strategy on a topic asynchronously. + * + * @param topic The topic in whose policy should be removed + */ + CompletableFuture removeSchemaCompatibilityStrategyAsync(String topic); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java index 0629f0d249874..2caf1876f5156 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java @@ -45,6 +45,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; public class TopicPoliciesImpl extends BaseResource implements TopicPolicies { @@ -1376,6 +1377,62 @@ public CompletableFuture removeReplicatorDispatchRateAsync(String topic) { return asyncDeleteRequest(path); } + @Override + public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied) + throws PulsarAdminException { + return sync(() -> getSchemaCompatibilityStrategyAsync(topic, applied)); + } + + @Override + public CompletableFuture getSchemaCompatibilityStrategyAsync(String topic, + boolean applied) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "schemaCompatibilityStrategy"); + path = path.queryParam("applied", applied); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(SchemaCompatibilityStrategy schemaCompatibilityStrategy) { + future.complete(schemaCompatibilityStrategy); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + + return future; + } + + @Override + public void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy) + throws PulsarAdminException { + sync(() -> setSchemaCompatibilityStrategyAsync(topic, strategy)); + } + + @Override + public CompletableFuture setSchemaCompatibilityStrategyAsync(String topic, + SchemaCompatibilityStrategy strategy) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "schemaCompatibilityStrategy"); + return asyncPutRequest(path, Entity.entity(strategy, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeSchemaCompatibilityStrategy(String topic) + throws PulsarAdminException { + sync(()->removeSchemaCompatibilityStrategyAsync(topic)); + } + + @Override + public CompletableFuture removeSchemaCompatibilityStrategyAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "schemaCompatibilityStrategy"); + return asyncDeleteRequest(path); + } + /* * returns topic name with encoded Local Name */ diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 18d1fbaada28e..2bfa31490576f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -69,6 +69,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.RelativeTimeUtil; @@ -241,6 +242,10 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("set-replication-clusters", new SetReplicationClusters()); jcommander.addCommand("remove-replication-clusters", new RemoveReplicationClusters()); + jcommander.addCommand("remove-schema-compatibility-strategy", new RemoveSchemaCompatibilityStrategy()); + jcommander.addCommand("set-schema-compatibility-strategy", new SetSchemaCompatibilityStrategy()); + jcommander.addCommand("get-schema-compatibility-strategy", new GetSchemaCompatibilityStrategy()); + initDeprecatedCommands(); } @@ -2802,4 +2807,48 @@ void run() throws PulsarAdminException { } } + + @Parameters(commandDescription = "Remove schema compatibility strategy on a topic") + private class RemoveSchemaCompatibilityStrategy extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getAdmin().topicPolicies().removeSchemaCompatibilityStrategy(persistentTopic); + } + } + + @Parameters(commandDescription = "Set schema compatibility strategy on a topic") + private class SetSchemaCompatibilityStrategy extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--strategy", "-s"}, description = "Schema compatibility strategy: [UNDEFINED, " + + "ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, " + + "FORWARD_TRANSITIVE, FULL_TRANSITIVE]", required = true) + private SchemaCompatibilityStrategy strategy; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getAdmin().topicPolicies().setSchemaCompatibilityStrategy(persistentTopic, strategy); + } + } + + @Parameters(commandDescription = "Get schema compatibility strategy on a topic") + private class GetSchemaCompatibilityStrategy extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"-ap", "--applied"}, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getAdmin().topicPolicies().getSchemaCompatibilityStrategy(persistentTopic, applied)); + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 2b6bcc8acdecf..bff71460082b7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -49,7 +49,7 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue delayedDeliveryEnabled; final PolicyHierarchyValue delayedDeliveryTickTimeMillis; final PolicyHierarchyValue maxConsumersPerSubscription; - + final PolicyHierarchyValue schemaCompatibilityStrategy; public HierarchyTopicPolicies() { replicationClusters = new PolicyHierarchyValue<>(); @@ -72,5 +72,6 @@ public HierarchyTopicPolicies() { delayedDeliveryEnabled = new PolicyHierarchyValue<>(); delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>(); compactionThreshold = new PolicyHierarchyValue<>(); + schemaCompatibilityStrategy = new PolicyHierarchyValue<>(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java index d4de706e607b7..5dcd822b4000b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java @@ -54,4 +54,7 @@ public enum TopicOperation { SET_REPLICATED_SUBSCRIPTION_STATUS, GET_REPLICATED_SUBSCRIPTION_STATUS, + + SET_SCHEMA_COMPATIBILITY_STRATEGY, + GET_SCHEMA_COMPATIBILITY_STRATEGY, } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 8920e0570e41f..6e81509c83094 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -70,6 +70,7 @@ public class TopicPolicies { private Integer maxMessageSize; private Integer maxSubscriptionsPerTopic; private DispatchRateImpl replicatorDispatchRate; + private SchemaCompatibilityStrategy schemaCompatibilityStrategy; public boolean isGlobalPolicies() { return isGlobal != null && isGlobal; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topicpolicies/SchemaCompatibilityStrategyTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topicpolicies/SchemaCompatibilityStrategyTest.java new file mode 100644 index 0000000000000..e82dfef43b79a --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topicpolicies/SchemaCompatibilityStrategyTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.tests.integration.cli.topicpolicies; + +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class SchemaCompatibilityStrategyTest extends PulsarTestSuite { + @Override + public void setupCluster() throws Exception { + super.setupCluster(); + } + + @Override + public void tearDownCluster() throws Exception { + super.tearDownCluster(); + } + + @Test + public void testSchemaCompatibilityCmd() throws Exception { + String topicName = generateTopicName("",true); + pulsarAdmin.topics().createNonPartitionedTopic(topicName); + + Awaitility.await().untilAsserted(()->{ + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy " + topicName); + assertTrue(result.getStdout().contentEquals("UNDEFINED")); + }); + + Awaitility.await().untilAsserted(()->{ + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy --applied " + topicName); + assertTrue(result.getStdout().contentEquals("FULL")); + }); + + pulsarAdmin.topicPolicies().removeSchemaCompatibilityStrategy(topicName); + Awaitility.await().untilAsserted(()->{ + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy " + topicName); + assertTrue(result.getStdout().contentEquals("UNDEFINED")); + }); + } + + @Test + public void testSchemaCompatibilityCmdWithNamespaceLevel() throws Exception { + String topicName = generateTopicName("",true); + pulsarAdmin.namespaces() + .setSchemaCompatibilityStrategy("public/default", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + pulsarAdmin.topics().createNonPartitionedTopic(topicName); + + Awaitility.await().untilAsserted(()->{ + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy " + topicName); + assertTrue(result.getStdout().contentEquals("UNDEFINED")); + }); + Awaitility.await().untilAsserted(()->{ + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy --applied " + topicName); + assertTrue(result.getStdout().contentEquals("ALWAYS_INCOMPATIBLE")); + }); + + pulsarAdmin.namespaces() + .setSchemaCompatibilityStrategy("public/default", SchemaCompatibilityStrategy.UNDEFINED); + Awaitility.await().untilAsserted(()->{ + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy " + topicName); + assertTrue(result.getStdout().contentEquals("UNDEFINED")); + }); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java index 70bd7d6f844dc..0c153f0ca4ef3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java @@ -20,6 +20,7 @@ import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.testng.annotations.DataProvider; import java.util.stream.Stream; @@ -75,6 +76,8 @@ public Object[][] serviceAndAdminUrls() { }; } + protected PulsarAdmin pulsarAdmin; + protected PulsarCluster pulsarCluster; public PulsarCluster getPulsarCluster() { @@ -121,6 +124,8 @@ protected void setupCluster(PulsarClusterSpec spec) throws Exception { pulsarCluster.start(); + pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build(); + log.info("Cluster {} is setup", spec.clusterName()); }