Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Do not write replicated snapshot marker when the topic which is not enable replication #21495

Merged
merged 9 commits into from
Nov 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -3010,7 +3010,7 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
}

updateTopicPolicyByNamespacePolicy(data);

checkReplicatedSubscriptionControllerState();
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
isEncryptionRequired = data.encryption_required;

isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
Expand Down Expand Up @@ -3497,12 +3497,14 @@ private synchronized void checkReplicatedSubscriptionControllerState(boolean sho
boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
boolean replicationEnabled = this.topicPolicies.getReplicationClusters().get().size() > 1;

if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions) {
if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions && replicationEnabled) {
log.info("[{}] Enabling replicated subscriptions controller", topic);
replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
brokerService.pulsar().getConfiguration().getClusterName()));
} else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions) {
} else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions
|| !replicationEnabled) {
log.info("[{}] Disabled replicated subscriptions controller", topic);
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
replicatedSubscriptionsController = Optional.empty();
Expand Down Expand Up @@ -3685,6 +3687,7 @@ public void onUpdate(TopicPolicies policies) {
updateTopicPolicy(policies);
shadowTopics = policies.getShadowTopics();
updateDispatchRateLimiter();
checkReplicatedSubscriptionControllerState();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -50,7 +51,9 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand All @@ -60,6 +63,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -728,6 +732,213 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
}

@DataProvider(name = "isTopicPolicyEnabled")
private Object[][] isTopicPolicyEnabled() {
// Todo: fix replication can not be enabled at topic level.
return new Object[][] { { Boolean.FALSE } };
}

/**
* Test the replication subscription can work normal in the following cases:
* <p>
* 1. Do not write data into the original topic when the topic does not configure a remote cluster. {topic1}
* 1. Publish message to the topic and then wait a moment,
* the backlog will not increase after publishing completely.
* 2. Acknowledge the messages, the last confirm entry does not change.
* 2. Snapshot and mark will be written after topic configure a remote cluster. {topic2}
* 1. publish message to topic. After publishing completely, the backlog of the topic keep increase.
* 2. Wait the snapshot complete, the backlog stop changing.
* 3. Publish messages to wait another snapshot complete.
* 4. Ack messages to move the mark delete position after the position record in the first snapshot.
* 5. Check new entry (a mark) appending to the original topic.
* 3. Stopping writing snapshot and mark after remove the remote cluster of the topic. {topic2}
* similar to step 1.
* </p>
*/
@Test(dataProvider = "isTopicPolicyEnabled")
public void testWriteMarkerTaskOfReplicateSubscriptions(boolean isTopicPolicyEnabled) throws Exception {
// 1. Prepare resource and use proper configuration.
String namespace = BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
String topic1 = "persistent://" + namespace + "/replication-enable";
String topic2 = "persistent://" + namespace + "/replication-disable";
String subName = "sub";

admin1.namespaces().createNamespace(namespace);
pulsar1.getConfiguration().setTopicLevelPoliciesEnabled(isTopicPolicyEnabled);
pulsar1.getConfiguration().setReplicationPolicyCheckDurationSeconds(1);
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
// 2. Build Producer and Consumer.
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topic1)
.subscriptionName(subName)
.ackTimeout(5, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.replicateSubscriptionState(true)
.subscribe();
@Cleanup
Producer<byte[]> producer1 = client1.newProducer()
.topic(topic1)
.create();
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
// 3. Test replication subscription work as expected.
// Test case 1: disable replication, backlog will not increase.
testReplicatedSubscriptionWhenDisableReplication(producer1, consumer1, topic1);

// Test case 2: enable replication, mark and snapshot work as expected.
if (isTopicPolicyEnabled) {
admin1.topics().createNonPartitionedTopic(topic2);
admin1.topics().setReplicationClusters(topic2, List.of("r1", "r2"));
} else {
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
}
@Cleanup
Consumer<byte[]> consumer2 = client1.newConsumer()
.topic(topic2)
.subscriptionName(subName)
.ackTimeout(5, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.replicateSubscriptionState(true)
.subscribe();
@Cleanup
Producer<byte[]> producer2 = client1.newProducer()
.topic(topic2)
.create();
testReplicatedSubscriptionWhenEnableReplication(producer2, consumer2, topic2);

// Test case 3: enable replication, mark and snapshot work as expected.
if (isTopicPolicyEnabled) {
admin1.topics().setReplicationClusters(topic2, List.of("r1"));
} else {
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
}
testReplicatedSubscriptionWhenDisableReplication(producer2, consumer2, topic2);
// 4. Clear resource.
pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(true);
admin1.namespaces().deleteNamespace(namespace, true);
pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false);
}

/**
* Disable replication subscription.
* Test scheduled task case.
* 1. Send three messages |1:0|1:1|1:2|.
* 2. Get topic backlog, as backlog1.
* 3. Wait a moment.
* 4. Get the topic backlog again, the backlog will not increase.
* Test acknowledge messages case.
* 1. Get the last confirm entry, as LAC1.
* 2. Acknowledge these messages |1:0|1:1|.
* 3. wait a moment.
* 4. Get the last confirm entry, as LAC2. LAC1 is equal to LAC2.
* Clear environment.
* 1. Ack all the retained messages. |1:2|
* 2. Wait for the backlog to return to zero.
*/
private void testReplicatedSubscriptionWhenDisableReplication(Producer<byte[]> producer, Consumer<byte[]> consumer,
String topic) throws Exception {
final int messageSum = 3;
// Test scheduled task case.
for (int i = 0; i < messageSum; i++) {
producer.newMessage().send();
}
long backlog1 = admin1.topics().getStats(topic, false).getBacklogSize();
Thread.sleep(3000);
long backlog2 = admin1.topics().getStats(topic, false).getBacklogSize();
assertEquals(backlog1, backlog2);
// Test acknowledge messages case.
String lastConfirmEntry1 = admin1.topics().getInternalStats(topic).lastConfirmedEntry;
for (int i = 0; i < messageSum - 1; i++) {
consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
}
Awaitility.await().untilAsserted(() -> {
String lastConfirmEntry2 = admin1.topics().getInternalStats(topic).lastConfirmedEntry;
assertEquals(lastConfirmEntry1, lastConfirmEntry2);
});
// Clear environment.
consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
Awaitility.await().untilAsserted(() -> {
long backlog4 = admin1.topics().getStats(topic, false).getBacklogSize();
assertEquals(backlog4, 0);
});
}

/**
* Enable replication subscription.
* Test scheduled task case.
* 1. Wait replicator connected.
* 2. Send three messages |1:0|1:1|1:2|.
* 3. Get topic backlog, as backlog1.
* 4. Wait a moment.
* 5. Get the topic backlog again, as backlog2. The backlog2 is bigger than backlog1. |1:0|1:1|1:2|mark|.
* 6. Wait the snapshot complete.
* Test acknowledge messages case.
* 1. Write messages and wait another snapshot complete. |1:0|1:1|1:2|mark|1:3|1:4|1:5|mark|
* 2. Ack message |1:0|1:1|1:2|1:3|1:4|.
* 3. Get last confirm entry, as LAC1.
* 2. Wait a moment.
* 3. Get Last confirm entry, as LAC2. LAC2 different to LAC1. |1:5|mark|mark|
* Clear environment.
* 1. Ack all the retained message |1:5|.
* 2. Wait for the backlog to return to zero.
*/
private void testReplicatedSubscriptionWhenEnableReplication(Producer<byte[]> producer, Consumer<byte[]> consumer,
String topic) throws Exception {
final int messageSum = 3;
Awaitility.await().untilAsserted(() -> {
List<String> keys = pulsar1.getBrokerService()
.getTopic(topic, false).get().get()
.getReplicators().keys();
assertEquals(keys.size(), 1);
assertTrue(pulsar1.getBrokerService()
.getTopic(topic, false).get().get()
.getReplicators().get(keys.get(0)).isConnected());
});
// Test scheduled task case.
sendMessageAndWaitSnapshotComplete(producer, topic, messageSum);
// Test acknowledge messages case.
// After snapshot write completely, acknowledging message to move the mark delete position
// after the position recorded in the snapshot will trigger to write a new marker.
sendMessageAndWaitSnapshotComplete(producer, topic, messageSum);
String lastConfirmedEntry3 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
for (int i = 0; i < messageSum * 2 - 1; i++) {
consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
}
Awaitility.await().untilAsserted(() -> {
String lastConfirmedEntry4 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
assertNotEquals(lastConfirmedEntry3, lastConfirmedEntry4);
});
// Clear environment.
consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
Awaitility.await().untilAsserted(() -> {
long backlog4 = admin1.topics().getStats(topic, false).getBacklogSize();
assertEquals(backlog4, 0);
});
}

private void sendMessageAndWaitSnapshotComplete(Producer<byte[]> producer, String topic,
int messageSum) throws Exception {
for (int i = 0; i < messageSum; i++) {
producer.newMessage().send();
}
long backlog1 = admin1.topics().getStats(topic, false).getBacklogSize();
Awaitility.await().untilAsserted(() -> {
long backlog2 = admin1.topics().getStats(topic, false).getBacklogSize();
assertTrue(backlog2 > backlog1);
});
// Wait snapshot write completely, stop writing marker into topic.
Awaitility.await().untilAsserted(() -> {
String lastConfirmedEntry1 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
PersistentTopicInternalStats persistentTopicInternalStats = admin1.topics().getInternalStats(topic, false);
Thread.sleep(1000);
String lastConfirmedEntry2 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
assertEquals(lastConfirmedEntry1, lastConfirmedEntry2);
});
}

void publishMessages(Producer<byte[]> producer, int startIndex, int numMessages, Set<String> sentMessages)
throws PulsarClientException {
for (int i = startIndex; i < startIndex + numMessages; i++) {
Expand Down
Loading