From 25c4b7cee402a1d486d720a71dc2e06aa6d9af64 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 May 2023 19:28:18 +0300 Subject: [PATCH] [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, follow up (#20401) Fixes #20386 ### Motivation - the previous attempt #20387 to fix the flakiness wasn't effective ### Modifications - improve the fix and rely on the fact that entryId is -1 when the message is dropped in the broker code: https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1699-L1711 ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` --- .../client/api/NonPersistentTopicTest.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index c41ab3e8ccc73..63ce0f00dff15 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -50,6 +51,7 @@ import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; import org.apache.pulsar.client.impl.ProducerImpl; @@ -824,13 +826,14 @@ public void testMsgDropStat() throws Exception { conf.setMaxConcurrentNonPersistentMessagePerConnection(1); stopBroker(); startBroker(); + + pulsar.getBrokerService().updateRates(); + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1") - .receiverQueueSize(1) - .messageListener((c, msg) -> {}).subscribe(); + .receiverQueueSize(1).subscribe(); Consumer consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2") - .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared) - .messageListener((c, msg) -> {}).subscribe(); + .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe(); ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer().topic(topicName) .enableBatching(false) @@ -839,17 +842,26 @@ public void testMsgDropStat() throws Exception { @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(5); byte[] msgData = "testData".getBytes(); - final int totalProduceMessages = 200; - CountDownLatch latch = new CountDownLatch(totalProduceMessages); + final int totalProduceMessages = 1000; + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger messagesSent = new AtomicInteger(0); for (int i = 0; i < totalProduceMessages; i++) { executor.submit(() -> { - producer.sendAsync(msgData).handle((msg, e) -> { - latch.countDown(); + producer.sendAsync(msgData).handle((msgId, e) -> { + int count = messagesSent.incrementAndGet(); + // process at least 20% of messages before signalling the latch + // a non-persistent message will return entryId as -1 when it has been dropped + // due to setMaxConcurrentNonPersistentMessagePerConnection limit + // also ensure that it has happened before the latch is signalled + if (count > totalProduceMessages * 0.2 && msgId != null + && ((MessageIdImpl) msgId).getEntryId() == -1) { + latch.countDown(); + } return null; }); }); } - latch.await(); + latch.await(5, TimeUnit.SECONDS); NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();