Skip to content

Commit

Permalink
[fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, fo…
Browse files Browse the repository at this point in the history
…llow up (#20401)

Fixes #20386

### Motivation

- the previous attempt #20387 to fix the flakiness wasn't effective

### Modifications

- improve the fix and rely on the fact that entryId is -1 when the message is dropped in the broker
  code: https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1699-L1711

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
  • Loading branch information
lhotari authored May 25, 2023
1 parent 9918bce commit 25c4b7c
Showing 1 changed file with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -50,6 +51,7 @@
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
Expand Down Expand Up @@ -824,13 +826,14 @@ public void testMsgDropStat() throws Exception {
conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
stopBroker();
startBroker();

pulsar.getBrokerService().updateRates();

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
.receiverQueueSize(1)
.messageListener((c, msg) -> {}).subscribe();
.receiverQueueSize(1).subscribe();

Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
.messageListener((c, msg) -> {}).subscribe();
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();

ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
Expand All @@ -839,17 +842,26 @@ public void testMsgDropStat() throws Exception {
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(5);
byte[] msgData = "testData".getBytes();
final int totalProduceMessages = 200;
CountDownLatch latch = new CountDownLatch(totalProduceMessages);
final int totalProduceMessages = 1000;
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger messagesSent = new AtomicInteger(0);
for (int i = 0; i < totalProduceMessages; i++) {
executor.submit(() -> {
producer.sendAsync(msgData).handle((msg, e) -> {
latch.countDown();
producer.sendAsync(msgData).handle((msgId, e) -> {
int count = messagesSent.incrementAndGet();
// process at least 20% of messages before signalling the latch
// a non-persistent message will return entryId as -1 when it has been dropped
// due to setMaxConcurrentNonPersistentMessagePerConnection limit
// also ensure that it has happened before the latch is signalled
if (count > totalProduceMessages * 0.2 && msgId != null
&& ((MessageIdImpl) msgId).getEntryId() == -1) {
latch.countDown();
}
return null;
});
});
}
latch.await();
latch.await(5, TimeUnit.SECONDS);

NonPersistentTopic topic =
(NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
Expand Down

0 comments on commit 25c4b7c

Please sign in to comment.