Skip to content

Commit

Permalink
Add Throughput limiting feature to publisher and subscriber
Browse files Browse the repository at this point in the history
- Introduce throughput control through messagesPerSecond attribute
- Fix subscriber null pointer issue
  • Loading branch information
Asitha committed Jan 17, 2017
1 parent 5c63575 commit 22aeff0
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 16 deletions.
10 changes: 9 additions & 1 deletion src/main/java/org/atc/ConsumerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.atc.config.SubscriberConfig;
Expand Down Expand Up @@ -83,12 +84,19 @@ public final void run() {
log.info("Starting consumer to receive " + messageCount + " messages from " + config.getQueueName() +
" Consumer ID: " + consumerID);
ATCMessage message = null;

RateLimiter rateLimiter = null;
if (config.getMessagesPerSecond() != 0) {
rateLimiter = RateLimiter.create(config.getMessagesPerSecond());
}
try {
long latency;
for (int i = 1; i <= messageCount; i++) {

if (null != rateLimiter) {
rateLimiter.acquire(); // wait for a permit to publish or block
}
message = consumer.receive();

if (config.getReceiveWaitTimeMillis() > 0) {
try {
TimeUnit.MILLISECONDS.sleep(config.getReceiveWaitTimeMillis());
Expand Down
23 changes: 22 additions & 1 deletion src/main/java/org/atc/PublisherThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -87,14 +88,24 @@ private void publish() {
" Publisher ID: " + publisherID);
ATCMessage atcMessage = null;
String messageContent = config.getMessageContent();

if(StringUtils.isEmpty(messageContent)) {
messageContent = DEFAULT_CONTENT;
}

RateLimiter rateLimiter = null;
if (config.getMessagesPerSecond() != 0) {
rateLimiter = RateLimiter.create(config.getMessagesPerSecond());
}

try {
for (int i = 1; i <= messageCount; i++) {
atcMessage = publisher.createTextMessage(messageContent);
atcMessage.setMessageID(publisherID + "-" + i);

if (null != rateLimiter) {
rateLimiter.acquire(); // wait for a permit to publish or block
}
publisher.send(atcMessage);

if (log.isDebugEnabled()) {
Expand All @@ -104,7 +115,7 @@ private void publish() {
publishRate.mark();

if (config.getDelayBetweenMsgs() > 0) {
TimeUnit.NANOSECONDS.sleep(publisher.getConfigs().getDelayBetweenMsgs());
TimeUnit.MILLISECONDS.sleep(config.getDelayBetweenMsgs());
}
}

Expand All @@ -126,6 +137,7 @@ private void publish() {

private void transactionalPublish() {
long messageCount = publisher.getConfigs().getMessageCount();
PublisherConfig config = publisher.getConfigs();
String publisherID = publisher.getConfigs().getId();

log.info("Starting transactional publisher to send " + messageCount + " messages to " +
Expand All @@ -137,13 +149,22 @@ private void transactionalPublish() {
if(StringUtils.isEmpty(messageContent)) {
messageContent = DEFAULT_CONTENT;
}

DisruptorBasedPublisher disruptorPublisher =
new DisruptorBasedPublisher(batchSize, publisher, sentCount, publishRate);

RateLimiter rateLimiter = null;
if (config.getMessagesPerSecond() != 0) {
rateLimiter = RateLimiter.create(config.getMessagesPerSecond());
}

for (int i = 1; i <= messageCount; i++) {
try {
atcMessage = publisher.createTextMessage(messageContent);
atcMessage.setMessageID(Integer.toString(i));
if (null != rateLimiter) {
rateLimiter.acquire(); // wait for a permit to publish or block
}
disruptorPublisher.publish(atcMessage);
} catch (ATCException e) {
log.error("Exception occurred while creating message for publisher " + publisherID, e);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/atc/amqp/queue/AMQPQueueReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public final void unsubscribe() {
}

public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingException, ATCException {
config = conf;
try {
String queueName = conf.getQueueName();
Properties properties = new Properties();
Expand All @@ -83,7 +84,7 @@ public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingExcep
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(conf.getConnectionFactoryName());
queueConnection = connFactory.createQueueConnection();
queueConnection.start();
if (conf.isEnableClientAcknowledgment()) {
if (config.isEnableClientAcknowledgment()) {
queueSession = queueConnection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
} else if (config.isTransactional()) {
queueSession = queueConnection.createQueueSession(true, QueueSession.AUTO_ACKNOWLEDGE);
Expand All @@ -93,7 +94,6 @@ public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingExcep
//Receive message
Queue queue = (Queue) ctx.lookup(queueName);
consumer = queueSession.createConsumer(queue);
config = conf;
return consumer;
} catch (JMSException e) {
throw new ATCException("Subscriber initialisation failed. Subscriber id " + config.getId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public final void unsubscribe() throws ATCException {

public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingException, ATCException {

config = conf;
try {
String topicName = conf.getQueueName();
subscriptionId = conf.getSubscriptionID();
Expand All @@ -94,7 +95,7 @@ public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingExcep
topicConnection.start();
if (conf.isEnableClientAcknowledgment()) {
topicSession = topicConnection.createTopicSession(false, TopicSession.CLIENT_ACKNOWLEDGE);
} else if (config.isTransactional()) {
} else if (conf.isTransactional()) {
topicSession = topicConnection.createTopicSession(true, TopicSession.AUTO_ACKNOWLEDGE);
} else {
topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Expand All @@ -103,7 +104,6 @@ public final MessageConsumer subscribe(SubscriberConfig conf) throws NamingExcep
// create durable subscriber with subscription ID
Topic topic = (Topic) ctx.lookup(topicName);
topicSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
config = conf;
return topicSubscriber;
} catch (JMSException e) {
throw new ATCException("Subscriber initialisation failed. Subscriber id " + config.getId(), e);
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/org/atc/config/PubSubConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.Field;
import java.util.UUID;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import java.lang.reflect.Field;
import java.util.UUID;

@SuppressWarnings("unused")
@XmlAccessorType(XmlAccessType.FIELD)
Expand Down Expand Up @@ -63,6 +63,9 @@ public abstract class PubSubConfig {
@XmlAttribute
private int delayBetweenMsgs;

@XmlAttribute
private int messagesPerSecond;

PubSubConfig() {
id = UUID.randomUUID().toString();
}
Expand Down Expand Up @@ -112,6 +115,14 @@ public final String getTCPConnectionURL() {
return builder.toString();
}

public int getMessagesPerSecond() {
return messagesPerSecond;
}

public void setMessagesPerSecond(int messagesPerSecond) {
this.messagesPerSecond = messagesPerSecond;
}

public final int getPort() {
return port;
}
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/org/atc/config/PublisherConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ public class PublisherConfig extends PubSubConfig {
@XmlAttribute
private String messageContent;

public final int getPublisherMaxThroughput() {
return publisherMaxThroughput;
}

final void setPublisherMaxThroughput(int publisherMaxThroughput) {
this.publisherMaxThroughput = publisherMaxThroughput;
}

public final String getMessageContent() {
return messageContent;
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/queue-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ queuePublishers:
isTransactional: false
transactionBatchSize: 10
parallelThreads: 1
messagesPerSecond: 1500
messageContent: "test message content"
# failoverParams: failover='roundrobin'&brokerlist='tcp://10.100.5.94:5672?connectdelay='1000'&connecttimeout='3000'&retries='1';tcp://10.100.5.94:5673?connectdelay='1000'&connecttimeout='3000'&retries='1''

Expand All @@ -76,6 +77,7 @@ queueSubscribers:
messageCount: 100
parallelThreads: 1
enableClientAcknowledgment: true
messagesPerSecond: 1500
isTransactional: false

## End of subscriber configurations
2 changes: 2 additions & 0 deletions src/main/resources/topic-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ topicPublishers:
isTransactional: false
transactionBatchSize: 10
parallelThreads: 1
messagesPerSecond: 1500
messageContent: "topic message content"

# End of publisher Configurations
Expand All @@ -79,6 +80,7 @@ topicSubscribers:
id: "subscriber"
messageCount: 100
# receiveWaitTimeMillis: 1000
# messagesPerSecond: 1500
enableClientAcknowledgment: false
parallelThreads: 1

Expand Down

0 comments on commit 22aeff0

Please sign in to comment.