Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP][broker]PIP-162: Enable system topic by default. #15619

Merged
merged 10 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -555,15 +555,15 @@ maxNumPartitionsPerPartitionedTopic=0
zookeeperSessionExpiredPolicy=reconnect

# Enable or disable system topic
systemTopicEnabled=false
systemTopicEnabled=true

# The schema compatibility strategy is used for system topics.
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false
topicLevelPoliciesEnabled=true

# If a topic remains fenced for this number of seconds, it will be closed forcefully.
# If it is set to 0 or a negative number, the fenced topic will not be closed.
Expand Down
4 changes: 2 additions & 2 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,15 @@ brokerClientTlsCiphers=
brokerClientTlsProtocols=

# Enable or disable system topic
systemTopicEnabled=false
systemTopicEnabled=true

# The schema compatibility strategy is used for system topics.
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false
topicLevelPoliciesEnabled=true

# If a topic remains fenced for this number of seconds, it will be closed forcefully.
# If it is set to 0 or a negative number, the fenced topic will not be closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable system topic.")
private boolean systemTopicEnabled = false;
private boolean systemTopicEnabled = true;

@FieldContext(
category = CATEGORY_SCHEMA,
Expand All @@ -1269,7 +1269,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_SERVER,
doc = "Enable or disable topic level policies, topic level policies depends on the system topic, "
+ "please enable the system topic first.")
private boolean topicLevelPoliciesEnabled = false;
private boolean topicLevelPoliciesEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ public void testIncrementPartitionsOfTopic() throws Exception {
public void testTopicPoliciesWithMultiBroker() throws Exception {
//setup cluster with 3 broker
cleanup();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
super.internalSetup();
admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl((pulsar.getWebServiceAddress() + ",localhost:1026," + "localhost:2050")).build());
Expand Down Expand Up @@ -1415,8 +1413,6 @@ public void testDeleteNamespace() throws Exception {
@Test
public void testDeleteNamespaceWithTopicPolicies() throws Exception {
stopBroker();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
setup();

String tenant = "test-tenant";
Expand Down Expand Up @@ -2210,8 +2206,6 @@ public void testCompactionApi() throws Exception {
@Test(timeOut = 200000)
public void testCompactionPriority() throws Exception {
cleanup();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10000);
setup();
final String topic = "persistent://prop-xyz/ns1/topic" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ public void testNamespaceDelayedDeliveryPolicyApi() throws Exception {
@Test(timeOut = 30000)
public void testDelayedDeliveryApplied() throws Exception {
cleanup();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
setup();
final String namespace = "delayed-delivery-messages/my-ns";
final String topic = "persistent://" + namespace + "/test" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ public void testMaxUnackedMessagesPerConsumerPriority() throws Exception {
int namespaceLevelPolicy = 2;
int topicLevelPolicy = 1;
cleanup();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setMaxUnackedMessagesPerConsumer(brokerLevelPolicy);
setup();
final String namespace = "max-unacked-messages/priority-on-consumers";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
public void setup() throws Exception {
conf.setManagedLedgerMaxEntriesPerLedger(10);
conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);

super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
super.internalSetup();

// Setup namespaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
Expand Down Expand Up @@ -3271,8 +3273,6 @@ public void testPartitionedTopicMsgDelayedAggregated() throws Exception {
public void testPartitionedTopicTruncate() throws Exception {
final String topicName = "persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
final String subName = "my-sub";
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setSystemTopicEnabled(true);
admin.topics().createPartitionedTopic(topicName,6);
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(60, 50));
List<MessageId> messageIds = publishMessagesOnPersistentTopic(topicName, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public class MaxUnackedMessagesTest extends ProducerConsumerBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public void initNamespace() throws Exception {
@BeforeMethod
public void setup() throws Exception {
resetConfig();
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
conf.setClusterName(testLocalCluster);
super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
@Override
protected void setup() throws Exception {
resetConfig();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setTtlDurationDefaultInSeconds(3600);
super.internalSetup();

Expand Down Expand Up @@ -128,7 +126,6 @@ public void testGetMessageTTL() throws Exception {
@Test
public void testTopicPolicyDisabled() throws Exception {
super.internalCleanup();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(false);
super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(false);
super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setDefaultNumberOfNamespaceBundles(1);
super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class V1_AdminApi2Test extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
resetConfig();
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
conf.setLoadBalancerEnabled(true);
super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ protected void setup() throws Exception {
conf.setEnablePackagesManagement(true);
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
conf.setTransactionCoordinatorEnabled(true);
conf.setSystemTopicEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(1);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {

@BeforeMethod
public void setup() throws Exception {
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
this.conf.setDisableBrokerInterceptors(false);

this.listener1 = mock(BrokerInterceptor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ void setup() throws Exception {
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAllowAutoTopicCreationType("non-partitioned");
config.setSystemTopicEnabled(false);
config.setTopicLevelPoliciesEnabled(false);

pulsar = new PulsarService(config);
pulsar.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ public void testStrictBookieIsolation() throws Exception {
config.setBrokerServicePort(Optional.of(0));
config.setAdvertisedAddress("localhost");
config.setStrictBookieAffinityEnabled(true);
config.setTopicLevelPoliciesEnabled(false);
config.setSystemTopicEnabled(false);
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);

config.setManagedLedgerDefaultEnsembleSize(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,6 @@ public void testMaxNumPartitionsPerPartitionedTopicTopicCreation() {
@Test
public void testAutoCreationOfSystemTopicTransactionBufferSnapshot() throws Exception {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
pulsar.getConfiguration().setSystemTopicEnabled(true);

final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;

Expand All @@ -407,7 +406,6 @@ public void testAutoCreationOfSystemTopicTransactionBufferSnapshot() throws Exce
@Test
public void testAutoCreationOfSystemTopicNamespaceEvents() throws Exception {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
pulsar.getConfiguration().setSystemTopicEnabled(true);

final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class BrokerServiceTest extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
super.baseSetup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest {
public void baseSetup() throws Exception {
super.internalSetup();
baseSetupCommon();
afterSetup();
}

public void baseSetup(ServiceConfiguration serviceConfiguration) throws Exception {
super.internalSetup(serviceConfiguration);
baseSetupCommon();
afterSetup();
}

protected void afterSetup() throws Exception {
// NOP
}

private void baseSetupCommon() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,6 @@ public void testMaxInactiveDuration() throws Exception {

@Test(timeOut = 20000)
public void testTopicLevelInActiveTopicApi() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
super.baseSetup();
final String topicName = "persistent://prop/ns-abc/testMaxInactiveDuration-" + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
Expand All @@ -394,8 +392,6 @@ public void testTopicLevelInActiveTopicApi() throws Exception {

@Test(timeOut = 30000)
public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setBrokerDeleteInactiveTopicsEnabled(true);
conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
Expand Down Expand Up @@ -472,8 +468,6 @@ public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
@Test(timeOut = 30000)
public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Exception {
final String namespace = "prop/ns-abc";
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setBrokerDeleteInactiveTopicsEnabled(true);
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
Expand Down Expand Up @@ -527,8 +521,6 @@ public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Excepti

@Test(timeOut = 30000)
public void testInactiveTopicApplied() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
super.baseSetup();

final String namespace = "prop/ns-abc";
Expand Down Expand Up @@ -578,7 +570,6 @@ public void testInactiveTopicApplied() throws Exception {

@Test(timeOut = 30000)
public void testHealthTopicInactiveNotClean() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public void setup() throws Exception {
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setClusterName("pulsar-cluster");
svcConfig.setSystemTopicEnabled(false);
svcConfig.setTopicLevelPoliciesEnabled(false);
pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
doReturn(svcConfig).when(pulsar).getConfiguration();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,8 +996,6 @@ public void testMessageExpiryWithTopicMessageTTL() throws Exception {
String namespaceName = "prop/expiry-check-2";

cleanup();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setForceDeleteNamespaceAllowed(true);
setup();

Expand Down Expand Up @@ -1525,6 +1523,10 @@ public void testBrokerTopicStats() throws Exception {
@Test
public void testBrokerConnectionStats() throws Exception {

cleanup();
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
setup();
BrokerService brokerService = this.pulsar.getBrokerService();

final String namespace = "prop/ns-abc";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public void setup() throws Exception {
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setMaxUnackedMessagesPerConsumer(50000);
svcConfig.setClusterName("pulsar-cluster");
svcConfig.setTopicLevelPoliciesEnabled(false);
svcConfig.setSystemTopicEnabled(false);
pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
doReturn(svcConfig).when(pulsar).getConfiguration();
doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception
@Test
public void testBrokerLevelPublishRateDynamicUpdate() throws Exception{
conf.setPreciseTopicPublishRateLimiterEnable(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setSystemTopicEnabled(true);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testMultiLevelPublishRate";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public Object[][] dispatchRateProvider() {
@Test
public void testReplicatorRatePriority() throws Exception {
cleanup();
config1.setSystemTopicEnabled(true);
config1.setTopicLevelPoliciesEnabled(true);
config1.setDispatchThrottlingRatePerReplicatorInMsg(100);
config1.setDispatchThrottlingRatePerReplicatorInByte(200L);
setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
config.setAllowAutoTopicCreationType("non-partitioned");
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setSystemTopicEnabled(true);
config.setTopicLevelPoliciesEnabled(true);
}

public void resetConfig1() {
Expand Down
Loading