Skip to content

Commit

Permalink
[Broker] Move schema compatibility strategy cmd from topics to topicP…
Browse files Browse the repository at this point in the history
…olicies

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Feb 15, 2022
1 parent 18d9f1b commit a4ba12d
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RelativeTimeUtil;

Expand Down Expand Up @@ -141,6 +142,9 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-max-subscriptions-per-topic", new SetMaxSubscriptionsPerTopic());
jcommander.addCommand("remove-max-subscriptions-per-topic", new RemoveMaxSubscriptionsPerTopic());

jcommander.addCommand("remove-schema-compatibility-strategy", new RemoveSchemaCompatibilityStrategy());
jcommander.addCommand("set-schema-compatibility-strategy", new SetSchemaCompatibilityStrategy());
jcommander.addCommand("get-schema-compatibility-strategy", new GetSchemaCompatibilityStrategy());
}

@Parameters(commandDescription = "Get max consumers per subscription for a topic")
Expand Down Expand Up @@ -1684,6 +1688,53 @@ void run() throws PulsarAdminException {
}
}


@Parameters(commandDescription = "Remove schema compatibility strategy on a topic")
private class RemoveSchemaCompatibilityStrategy extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topicPolicies().removeSchemaCompatibilityStrategy(persistentTopic);
}
}

@Parameters(commandDescription = "Set schema compatibility strategy on a topic")
private class SetSchemaCompatibilityStrategy extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--strategy", "-s"}, description = "Schema compatibility strategy: [UNDEFINED, "
+ "ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, "
+ "FORWARD_TRANSITIVE, FULL_TRANSITIVE]", required = true)
private SchemaCompatibilityStrategy strategy;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topicPolicies().setSchemaCompatibilityStrategy(persistentTopic, strategy);
}
}

@Parameters(commandDescription = "Get schema compatibility strategy on a topic")
private class GetSchemaCompatibilityStrategy extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"-ap", "--applied"}, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
SchemaCompatibilityStrategy strategy =
getAdmin().topicPolicies().getSchemaCompatibilityStrategy(persistentTopic, applied);
print(strategy == null ? "null" : strategy.name());
}
}

private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.RelativeTimeUtil;
Expand Down Expand Up @@ -245,10 +244,6 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-replication-clusters", new SetReplicationClusters());
jcommander.addCommand("remove-replication-clusters", new RemoveReplicationClusters());

jcommander.addCommand("remove-schema-compatibility-strategy", new RemoveSchemaCompatibilityStrategy());
jcommander.addCommand("set-schema-compatibility-strategy", new SetSchemaCompatibilityStrategy());
jcommander.addCommand("get-schema-compatibility-strategy", new GetSchemaCompatibilityStrategy());

initDeprecatedCommands();
}

Expand Down Expand Up @@ -2816,48 +2811,4 @@ void run() throws PulsarAdminException {

}
}

@Parameters(commandDescription = "Remove schema compatibility strategy on a topic")
private class RemoveSchemaCompatibilityStrategy extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topicPolicies().removeSchemaCompatibilityStrategy(persistentTopic);
}
}

@Parameters(commandDescription = "Set schema compatibility strategy on a topic")
private class SetSchemaCompatibilityStrategy extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--strategy", "-s"}, description = "Schema compatibility strategy: [UNDEFINED, "
+ "ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, "
+ "FORWARD_TRANSITIVE, FULL_TRANSITIVE]", required = true)
private SchemaCompatibilityStrategy strategy;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topicPolicies().setSchemaCompatibilityStrategy(persistentTopic, strategy);
}
}

@Parameters(commandDescription = "Get schema compatibility strategy on a topic")
private class GetSchemaCompatibilityStrategy extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"-ap", "--applied"}, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topicPolicies().getSchemaCompatibilityStrategy(persistentTopic, applied));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,71 @@

package org.apache.pulsar.tests.integration.cli.topicpolicies;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.suites.PulsarCliTestSuite;
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class SchemaCompatibilityStrategyTest extends PulsarTestSuite {
public class SchemaCompatibilityStrategyTest extends PulsarCliTestSuite {
@BeforeClass(alwaysRun = true)
@Override
public void setupCluster() throws Exception {
super.setupCluster();
public void before() throws Exception {
enableTopicPolicies();
super.before();
}

@BeforeClass(alwaysRun = true)
@Override
public void tearDownCluster() throws Exception {
super.tearDownCluster();
public void after() throws Exception {
super.after();
}

@Test
public void testSchemaCompatibilityCmd() throws Exception {
String topicName = generateTopicName("",true);
public void testSchemaCompatibilityStrategyCmd() throws Exception {
String topicName = generateTopicName("test-schema-compatibility-strategy", true);
pulsarAdmin.topics().createNonPartitionedTopic(topicName);

Awaitility.await().untilAsserted(()->{
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy " + topicName);
assertTrue(result.getStdout().contentEquals("UNDEFINED"));
});
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies",
"get-schema-compatibility-strategy", topicName);
assertEquals(result.getStdout().trim(), "null");

Awaitility.await().untilAsserted(()->{
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy --applied " + topicName);
assertTrue(result.getStdout().contentEquals("FULL"));
});
result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-schema-compatibility-strategy",
"--applied", topicName);
assertEquals(result.getStdout().trim(), SchemaCompatibilityStrategy.FULL.name());

pulsarAdmin.topicPolicies().removeSchemaCompatibilityStrategy(topicName);
Awaitility.await().untilAsserted(()->{
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy " + topicName);
assertTrue(result.getStdout().contentEquals("UNDEFINED"));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies",
"get-schema-compatibility-strategy", topicName).getStdout().trim(), "null");
});
}

@Test
public void testSchemaCompatibilityCmdWithNamespaceLevel() throws Exception {
String topicName = generateTopicName("",true);
pulsarAdmin.namespaces()
.setSchemaCompatibilityStrategy("public/default", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
public void testSchemaCompatibilityStrategyCmdWithNamespaceLevel() throws Exception {
String ns = generateNamespaceName();
String fullNS = "public/" + ns;
pulsarAdmin.namespaces().createNamespace("public/"+ns);

String topicName = generateTopicName(ns, "test-schema-compatibility-strategy",
true);
pulsarAdmin.namespaces().setSchemaCompatibilityStrategy(fullNS, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
pulsarAdmin.topics().createNonPartitionedTopic(topicName);

Awaitility.await().untilAsserted(()->{
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy " + topicName);
assertTrue(result.getStdout().contentEquals("UNDEFINED"));
});
Awaitility.await().untilAsserted(()->{
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy --applied " + topicName);
assertTrue(result.getStdout().contentEquals("ALWAYS_INCOMPATIBLE"));
});
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies",
"get-schema-compatibility-strategy", topicName);
assertEquals(result.getStdout().trim(), "null");

result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies",
"get-schema-compatibility-strategy", "--applied", topicName);
assertEquals(result.getStdout().trim(), SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE.name());

pulsarAdmin.namespaces()
.setSchemaCompatibilityStrategy("public/default", SchemaCompatibilityStrategy.UNDEFINED);
Awaitility.await().untilAsserted(()->{
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics get-schema-compatibility-strategy " + topicName);
assertTrue(result.getStdout().contentEquals("UNDEFINED"));
});
.setSchemaCompatibilityStrategy(fullNS, SchemaCompatibilityStrategy.UNDEFINED);
result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-schema-compatibility-strategy",
topicName);
assertEquals(result.getStdout().trim(), "null");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.suites;

import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

public abstract class PulsarCliTestSuite extends PulsarClusterTestBase {
@BeforeClass(alwaysRun = true)
public void before() throws Exception {
setup();
}

@AfterClass(alwaysRun = true)
public void after() throws Exception {
cleanup();
}

protected final void enableTopicPolicies() {
this.brokerEnvs.put("systemTopicEnabled", "true");
this.brokerEnvs.put("topicLevelPoliciesEnabled", "true");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.topologies;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand All @@ -29,6 +31,8 @@

@Slf4j
public abstract class PulsarClusterTestBase extends PulsarTestBase {
protected final Map<String, String> brokerEnvs = new HashMap<>();

@Override
protected final void setup() throws Exception {
setupCluster();
Expand Down Expand Up @@ -98,7 +102,8 @@ public void setupCluster(String namePrefix) throws Exception {
.collect(joining("-"));

PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder = PulsarClusterSpec.builder()
.clusterName(clusterName);
.clusterName(clusterName)
.brokerEnvs(brokerEnvs);

setupCluster(beforeSetupCluster(clusterName, specBuilder).build());
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/src/test/resources/pulsar-cli.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<class name="org.apache.pulsar.tests.integration.cli.FunctionsCLITest"/>
<class name="org.apache.pulsar.tests.integration.cli.PackagesCliTest"/>
<class name="org.apache.pulsar.tests.integration.cli.PulsarVersionTest"/>
<class name="org.apache.pulsar.tests.integration.cli.topicpolicies.SchemaCompatibilityStrategyTest"/>
</classes>
</test>
</suite>

0 comments on commit a4ba12d

Please sign in to comment.