Skip to content

Commit

Permalink
test: fix to follow existing production codes
Browse files Browse the repository at this point in the history
  • Loading branch information
equanz committed Jan 19, 2022
1 parent 1150b7f commit 446ef2b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 75 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ flexible messaging model and an intuitive client API.</description>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>

<!--config keys to congiure test selection -->
<!--config keys to configure test selection -->
<include>**/Test*.java,**/*Test.java,**/*Tests.java,**/*TestCase.java</include>
<exclude/>
<groups/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1025,9 +1024,11 @@ public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");

final List<String> primaryList = new ArrayList<>();
primaryList.add(brokerName + ".*");
NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList(ns1Name))
.primary(Collections.singletonList(brokerName + ".*"))
.primary(primaryList)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
Expand Down Expand Up @@ -1576,60 +1577,6 @@ public void testForceDeleteNamespace() throws Exception {
}
}

@Test
public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws Exception {
conf.setForceDeleteNamespaceAllowed(true);
final String ns = "prop-xyz/distinguish-topic-type-ns";
final String exNs = "prop-xyz/ex-distinguish-topic-type-ns";
admin.namespaces().createNamespace(ns, 2);
admin.namespaces().createNamespace(exNs, 2);

final String p1 = "persistent://" + ns + "/p1";
final String p5 = "persistent://" + ns + "/p5";
final String np = "persistent://" + ns + "/np";

admin.topics().createPartitionedTopic(p1, 1);
admin.topics().createPartitionedTopic(p5, 5);
admin.topics().createNonPartitionedTopic(np);

final String exNp = "persistent://" + exNs + "/np";
admin.topics().createNonPartitionedTopic(exNp);
// insert an invalid topic name
pulsar.getLocalMetadataStore().put(
"/managed-ledgers/" + exNs + "/persistent/", "".getBytes(), Optional.empty()).join();

List<String> topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get();
List<String> exTopics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get();

// ensure that the topic list contains all the topics
List<String> allTopics = new ArrayList<>(Arrays.asList(np, TopicName.get(p1).getPartition(0).toString()));
for (int i = 0; i < 5; i++) {
allTopics.add(TopicName.get(p5).getPartition(i).toString());
}
Assert.assertEquals(allTopics.stream().filter(t -> !topics.contains(t)).count(), 0);
Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/"));
// partition num = p1 + p5 + np
Assert.assertEquals(topics.size(), 1 + 5 + 1);
Assert.assertEquals(exTopics.size(), 1 + 1);

admin.namespaces().deleteNamespace(ns, true);
Arrays.asList(p1, p5, np).forEach(t -> {
try {
admin.schemas().getSchemaInfo(t);
} catch (PulsarAdminException e) {
// all the normal topics' schemas have been deleted
Assert.assertEquals(e.getStatusCode(), 404);
}
});

try {
admin.namespaces().deleteNamespace(exNs, true);
fail("Should fail due to invalid topic");
} catch (Exception e) {
//ok
}
}

@Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
Expand Down Expand Up @@ -1721,11 +1668,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
admin.topics().createPartitionedTopic(
"persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
+ "-05c0ded5e9__transaction_pending_ack", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);


// check first create system topics, then normal topic, unlimited even setMaxTopicsPerNamespace
Expand All @@ -1735,11 +1678,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
admin.topics().createPartitionedTopic(
"persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
+ "-05c0ded5e9__transaction_pending_ack", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
Expand Down Expand Up @@ -1910,16 +1849,16 @@ public void testMaxSubPerTopicApi() throws Exception {

@Test(timeOut = 30000)
public void testMaxSubPerTopic() throws Exception {
pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
pulsarClient.newProducer().topic(topic).create().close();
final int maxSub = 2;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
field.setAccessible(true);
Awaitility.await().until(() -> (int) field.get(persistentTopic) == maxSub);
Awaitility.await().until(() ->
persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == maxSub);

List<Consumer<?>> consumerList = new ArrayList<>(maxSub);
for (int i = 0; i < maxSub; i++) {
Expand All @@ -1936,7 +1875,8 @@ public void testMaxSubPerTopic() throws Exception {
}
//After removing the restriction, it should be able to create normally
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().until(() -> field.get(persistentTopic) == null);
Awaitility.await().until(() ->
persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == 0);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
Expand Down Expand Up @@ -1981,16 +1921,16 @@ public void testMaxSubPerTopicPriority() throws Exception {
final int nsLevelMaxSub = 4;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
field.setAccessible(true);
Awaitility.await().until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub);
Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
.getMaxSubscriptionsPerTopic().get() == nsLevelMaxSub);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
assertEquals(consumerList.size(), 3);
//After removing the restriction, it should fail again
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().until(() -> field.get(persistentTopic) == null);
Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
.getMaxSubscriptionsPerTopic().get() == brokerLevelMaxSub);
try {
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
Expand Down

0 comments on commit 446ef2b

Please sign in to comment.