Skip to content

Commit

Permalink
[ISSUE apache#8354] fix the consumer's threadpool cannot be expanded …
Browse files Browse the repository at this point in the history
…to the maximum number of threads.
  • Loading branch information
CLFutureX committed Jul 3, 2024
1 parent bbe87f5 commit fb3201e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,17 @@ public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPush

this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<>();
int queueSize = this.defaultMQPushConsumer.getPullThresholdForQueue() * this.defaultMQPushConsumerImpl.getSubscriptionInner().size();
this.consumeRequestQueue = new LinkedBlockingQueue<>(queueSize);

String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public ConsumeMessagePopConcurrentlyService(DefaultMQPushConsumerImpl defaultMQP

this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<>();
int queueSize = this.defaultMQPushConsumer.getPullThresholdForQueue() * this.defaultMQPushConsumerImpl.getSubscriptionInner().size();
this.consumeRequestQueue = new LinkedBlockingQueue<>(queueSize);

this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
Expand Down

0 comments on commit fb3201e

Please sign in to comment.