diff --git a/src/main/java/org/atc/ConsumerThread.java b/src/main/java/org/atc/ConsumerThread.java index 83d7e30..1195c04 100644 --- a/src/main/java/org/atc/ConsumerThread.java +++ b/src/main/java/org/atc/ConsumerThread.java @@ -19,6 +19,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; +import com.google.common.util.concurrent.RateLimiter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.atc.config.SubscriberConfig; @@ -83,12 +84,19 @@ public final void run() { log.info("Starting consumer to receive " + messageCount + " messages from " + config.getQueueName() + " Consumer ID: " + consumerID); ATCMessage message = null; - + RateLimiter rateLimiter = null; + if (config.getMessagesPerSecond() != 0) { + rateLimiter = RateLimiter.create(config.getMessagesPerSecond()); + } try { long latency; for (int i = 1; i <= messageCount; i++) { + if (null != rateLimiter) { + rateLimiter.acquire(); // wait for a permit to publish or block + } message = consumer.receive(); + if (config.getReceiveWaitTimeMillis() > 0) { try { TimeUnit.MILLISECONDS.sleep(config.getReceiveWaitTimeMillis()); diff --git a/src/main/java/org/atc/PublisherThread.java b/src/main/java/org/atc/PublisherThread.java index 3ca081b..935f004 100644 --- a/src/main/java/org/atc/PublisherThread.java +++ b/src/main/java/org/atc/PublisherThread.java @@ -18,6 +18,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; +import com.google.common.util.concurrent.RateLimiter; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,14 +88,24 @@ private void publish() { " Publisher ID: " + publisherID); ATCMessage atcMessage = null; String messageContent = config.getMessageContent(); + if(StringUtils.isEmpty(messageContent)) { messageContent = DEFAULT_CONTENT; } + RateLimiter rateLimiter = null; + if (config.getMessagesPerSecond() != 0) { + rateLimiter = RateLimiter.create(config.getMessagesPerSecond()); + } + try { for (int i = 1; i <= messageCount; i++) { atcMessage = publisher.createTextMessage(messageContent); atcMessage.setMessageID(publisherID + "-" + i); + + if (null != rateLimiter) { + rateLimiter.acquire(); // wait for a permit to publish or block + } publisher.send(atcMessage); if (log.isDebugEnabled()) { @@ -104,7 +115,7 @@ private void publish() { publishRate.mark(); if (config.getDelayBetweenMsgs() > 0) { - TimeUnit.NANOSECONDS.sleep(publisher.getConfigs().getDelayBetweenMsgs()); + TimeUnit.MILLISECONDS.sleep(config.getDelayBetweenMsgs()); } } @@ -126,6 +137,7 @@ private void publish() { private void transactionalPublish() { long messageCount = publisher.getConfigs().getMessageCount(); + PublisherConfig config = publisher.getConfigs(); String publisherID = publisher.getConfigs().getId(); log.info("Starting transactional publisher to send " + messageCount + " messages to " + @@ -137,13 +149,22 @@ private void transactionalPublish() { if(StringUtils.isEmpty(messageContent)) { messageContent = DEFAULT_CONTENT; } + DisruptorBasedPublisher disruptorPublisher = new DisruptorBasedPublisher(batchSize, publisher, sentCount, publishRate); + RateLimiter rateLimiter = null; + if (config.getMessagesPerSecond() != 0) { + rateLimiter = RateLimiter.create(config.getMessagesPerSecond()); + } + for (int i = 1; i <= messageCount; i++) { try { atcMessage = publisher.createTextMessage(messageContent); atcMessage.setMessageID(Integer.toString(i)); + if (null != rateLimiter) { + rateLimiter.acquire(); // wait for a permit to publish or block + } disruptorPublisher.publish(atcMessage); } catch (ATCException e) { log.error("Exception occurred while creating message for publisher " + publisherID, e); diff --git a/src/main/java/org/atc/amqp/queue/AMQPQueueReceiver.java b/src/main/java/org/atc/amqp/queue/AMQPQueueReceiver.java index 6abfc73..735799e 100644 --- a/src/main/java/org/atc/amqp/queue/AMQPQueueReceiver.java +++ b/src/main/java/org/atc/amqp/queue/AMQPQueueReceiver.java @@ -72,6 +72,7 @@ public final void unsubscribe() { } public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingException, ATCException { + config = conf; try { String queueName = conf.getQueueName(); Properties properties = new Properties(); @@ -83,7 +84,7 @@ public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingExcep QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(conf.getConnectionFactoryName()); queueConnection = connFactory.createQueueConnection(); queueConnection.start(); - if (conf.isEnableClientAcknowledgment()) { + if (config.isEnableClientAcknowledgment()) { queueSession = queueConnection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); } else if (config.isTransactional()) { queueSession = queueConnection.createQueueSession(true, QueueSession.AUTO_ACKNOWLEDGE); @@ -93,7 +94,6 @@ public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingExcep //Receive message Queue queue = (Queue) ctx.lookup(queueName); consumer = queueSession.createConsumer(queue); - config = conf; return consumer; } catch (JMSException e) { throw new ATCException("Subscriber initialisation failed. Subscriber id " + config.getId(), e); diff --git a/src/main/java/org/atc/amqp/topic/AMQPDurableTopicSubscriber.java b/src/main/java/org/atc/amqp/topic/AMQPDurableTopicSubscriber.java index fb779ee..4f517fb 100644 --- a/src/main/java/org/atc/amqp/topic/AMQPDurableTopicSubscriber.java +++ b/src/main/java/org/atc/amqp/topic/AMQPDurableTopicSubscriber.java @@ -80,6 +80,7 @@ public final void unsubscribe() throws ATCException { public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingException, ATCException { + config = conf; try { String topicName = conf.getQueueName(); subscriptionId = conf.getSubscriptionID(); @@ -94,7 +95,7 @@ public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingExcep topicConnection.start(); if (conf.isEnableClientAcknowledgment()) { topicSession = topicConnection.createTopicSession(false, TopicSession.CLIENT_ACKNOWLEDGE); - } else if (config.isTransactional()) { + } else if (conf.isTransactional()) { topicSession = topicConnection.createTopicSession(true, TopicSession.AUTO_ACKNOWLEDGE); } else { topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); @@ -103,7 +104,6 @@ public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingExcep // create durable subscriber with subscription ID Topic topic = (Topic) ctx.lookup(topicName); topicSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId); - config = conf; return topicSubscriber; } catch (JMSException e) { throw new ATCException("Subscriber initialisation failed. Subscriber id " + config.getId(), e); diff --git a/src/main/java/org/atc/config/PubSubConfig.java b/src/main/java/org/atc/config/PubSubConfig.java index cc49375..991a9c8 100644 --- a/src/main/java/org/atc/config/PubSubConfig.java +++ b/src/main/java/org/atc/config/PubSubConfig.java @@ -18,11 +18,11 @@ import org.apache.commons.lang3.StringUtils; +import java.lang.reflect.Field; +import java.util.UUID; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; -import java.lang.reflect.Field; -import java.util.UUID; @SuppressWarnings("unused") @XmlAccessorType(XmlAccessType.FIELD) @@ -63,6 +63,9 @@ public abstract class PubSubConfig { @XmlAttribute private int delayBetweenMsgs; + @XmlAttribute + private int messagesPerSecond; + PubSubConfig() { id = UUID.randomUUID().toString(); } @@ -112,6 +115,14 @@ public final String getTCPConnectionURL() { return builder.toString(); } + public int getMessagesPerSecond() { + return messagesPerSecond; + } + + public void setMessagesPerSecond(int messagesPerSecond) { + this.messagesPerSecond = messagesPerSecond; + } + public final int getPort() { return port; } diff --git a/src/main/java/org/atc/config/PublisherConfig.java b/src/main/java/org/atc/config/PublisherConfig.java index 76541e9..d6e9898 100644 --- a/src/main/java/org/atc/config/PublisherConfig.java +++ b/src/main/java/org/atc/config/PublisherConfig.java @@ -29,14 +29,6 @@ public class PublisherConfig extends PubSubConfig { @XmlAttribute private String messageContent; - public final int getPublisherMaxThroughput() { - return publisherMaxThroughput; - } - - final void setPublisherMaxThroughput(int publisherMaxThroughput) { - this.publisherMaxThroughput = publisherMaxThroughput; - } - public final String getMessageContent() { return messageContent; } diff --git a/src/main/resources/queue-test.yaml b/src/main/resources/queue-test.yaml index 6983495..ce5248c 100644 --- a/src/main/resources/queue-test.yaml +++ b/src/main/resources/queue-test.yaml @@ -62,6 +62,7 @@ queuePublishers: isTransactional: false transactionBatchSize: 10 parallelThreads: 1 + messagesPerSecond: 1500 messageContent: "test message content" # failoverParams: failover='roundrobin'&brokerlist='tcp://10.100.5.94:5672?connectdelay='1000'&connecttimeout='3000'&retries='1';tcp://10.100.5.94:5673?connectdelay='1000'&connecttimeout='3000'&retries='1'' @@ -76,6 +77,7 @@ queueSubscribers: messageCount: 100 parallelThreads: 1 enableClientAcknowledgment: true + messagesPerSecond: 1500 isTransactional: false ## End of subscriber configurations diff --git a/src/main/resources/topic-test.yaml b/src/main/resources/topic-test.yaml index 20791a3..76eb8cc 100644 --- a/src/main/resources/topic-test.yaml +++ b/src/main/resources/topic-test.yaml @@ -61,6 +61,7 @@ topicPublishers: isTransactional: false transactionBatchSize: 10 parallelThreads: 1 + messagesPerSecond: 1500 messageContent: "topic message content" # End of publisher Configurations @@ -79,6 +80,7 @@ topicSubscribers: id: "subscriber" messageCount: 100 # receiveWaitTimeMillis: 1000 +# messagesPerSecond: 1500 enableClientAcknowledgment: false parallelThreads: 1