Skip to content

Commit

Permalink
[Test] Cleanup ProxyPublishConsumeTest (#12607)
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui authored Nov 4, 2021
1 parent b549620 commit 556ba0e
Showing 1 changed file with 9 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -56,11 +54,9 @@

import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TopicType;
Expand All @@ -82,7 +78,6 @@
import org.glassfish.jersey.logging.LoggingFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -295,14 +290,14 @@ public void unsubscribeTest() throws Exception {
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
consumerFuture.get();
List<String> subs = admin.topics().getSubscriptions(topic);
Assert.assertEquals(subs.size(), 1);
Assert.assertEquals(subs.get(0), subscription);
assertEquals(subs.size(), 1);
assertEquals(subs.get(0), subscription);
// do unsubscribe
consumeSocket.unsubscribe();
//wait for delete
Thread.sleep(1000);
subs = admin.topics().getSubscriptions(topic);
Assert.assertEquals(subs.size(), 0);
assertEquals(subs.size(), 0);
} finally {
stopWebSocketClient(consumeClient);
}
Expand Down Expand Up @@ -927,11 +922,11 @@ public void ackBatchMessageTest() throws Exception {
producer.flush();
consumeSocket.sendPermits(messages);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
assertEquals(consumeSocket.getReceivedMessagesCount(), messages));

// The message should not be acked since we only acked 1 message of the batch message
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
assertEquals(admin.topics().getStats(topic).getSubscriptions()
.get(subscription).getMsgBacklog(), 0));

} finally {
Expand Down Expand Up @@ -976,7 +971,7 @@ public void consumeEncryptedMessages() throws Exception {
producer.flush();
consumeSocket.sendPermits(messages);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
assertEquals(consumeSocket.getReceivedMessagesCount(), messages));

for (JsonObject msg : consumeSocket.messages) {
assertTrue(msg.has("encryptionContext"));
Expand All @@ -989,7 +984,7 @@ public void consumeEncryptedMessages() throws Exception {

// The message should not be acked since we only acked 1 message of the batch message
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
assertEquals(admin.topics().getStats(topic).getSubscriptions()
.get(subscription).getMsgBacklog(), 0));

} finally {
Expand Down Expand Up @@ -1051,13 +1046,13 @@ private void verifyProxyStats(Client client, String baseUrl, String topic) {
// number of consumers are connected = 2 (one is reader)
assertEquals(stats.consumerStats.size(), 2);
ConsumerStats consumerStats = stats.consumerStats.iterator().next();
// Assert.assertTrue(consumerStats.numberOfMsgDelivered > 0);
assertTrue(consumerStats.numberOfMsgDelivered > 0);
assertNotNull(consumerStats.remoteConnection);

// number of producers are connected = 1
assertEquals(stats.producerStats.size(), 1);
ProducerStats producerStats = stats.producerStats.iterator().next();
// Assert.assertTrue(producerStats.numberOfMsgPublished > 0);
assertTrue(producerStats.numberOfMsgPublished > 0);
assertNotNull(producerStats.remoteConnection);
}

Expand Down

0 comments on commit 556ba0e

Please sign in to comment.