Skip to content

Commit

Permalink
[ISSUE apache#8356] dynamically update the number of threads for orde…
Browse files Browse the repository at this point in the history
…red consumption.
  • Loading branch information
CLFutureX committed Jul 3, 2024
1 parent bbe87f5 commit e7695b8
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
*/
package org.apache.rocketmq.client.impl.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -28,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
Expand Down Expand Up @@ -73,14 +71,23 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<>();

this.defaultMQPushConsumer.setMessageQueueListener(new MessageQueueListener() {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqAssigned) {
int queueSize = defaultMQPushConsumerImpl.getRebalanceImpl().processQueueTable.size();
updateCorePoolSize(queueSize);
System.out.println("spacex_consumeExecutor queueSize: " + queueSize + ", corePoolSize: " + consumeExecutor.getCorePoolSize());
}
});

String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
}
Expand Down Expand Up @@ -117,9 +124,9 @@ public synchronized void unlockAllMQ() {

@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
if (corePoolSize > defaultMQPushConsumer.getConsumeThreadMin()
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize <= this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
}
Expand Down

0 comments on commit e7695b8

Please sign in to comment.