Skip to content

Commit

Permalink
[ISSUE apache#9152] The getConsumeStats supports inputting multiple t…
Browse files Browse the repository at this point in the history
…opics
  • Loading branch information
qianye1001 committed Jan 22, 2025
1 parent a275510 commit ecd3007
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1947,37 +1947,21 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
try {
final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
ConsumeStats consumeStats = new ConsumeStats();
List<String> topicListProvided = requestHeader.fetchTopicList();
String topicProvided = requestHeader.getTopic();
String group = requestHeader.getConsumerGroup();

Set<String> topics = new HashSet<>();
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
} else {
topics.add(requestHeader.getTopic());
}
ConsumeStats consumeStats = new ConsumeStats();
Set<String> topicsForCollecting = getTopicsForCollecting(topicListProvided, topicProvided, group);

for (String topic : topics) {
for (String topic : topicsForCollecting) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic);
continue;
}

TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);

{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);

if (null == findSubscriptionData
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
LOGGER.warn(
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, "
+ "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup());
continue;
}
}

for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
Expand Down Expand Up @@ -2038,6 +2022,37 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
return response;
}

private Set<String> getTopicsForCollecting(List<String> topicListProvided, String topicProvided, String group) {
Set<String> topicsForCollecting = new HashSet<>();
if (!topicListProvided.isEmpty()) {
// if topic list is provided, only collect the topics in the list
// and ignore subscription check
topicsForCollecting.addAll(topicListProvided);
} else {
// In order to be compatible with the old logic,
// even if the topic has been provided here, the subscription will be checked.
if (UtilAll.isBlank(topicProvided)) {
topicsForCollecting.addAll(
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group));
} else {
topicsForCollecting.add(topicProvided);
}
int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group);
Iterator<String> iterator = topicsForCollecting.iterator();
while (iterator.hasNext()) {
String topic = iterator.next();
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
if (findSubscriptionData == null && subscriptionCount > 0) {
LOGGER.warn(
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}",
topic, group);
iterator.remove();
}
}
}
return topicsForCollecting;
}

private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,16 +1748,27 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List<String> topicList,
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final long timeoutMillis)
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final List<String> topicList, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
requestHeader.updateTopicList(topicList);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,62 @@
package org.apache.rocketmq.remoting.protocol.header;

import com.google.common.base.MoreObjects;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.common.resource.RocketMQResource;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;

@RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET)
public class GetConsumeStatsRequestHeader extends TopicRequestHeader {
private static final String TOPIC_NAME_SEPARATOR = ";";

@CFNotNull
@RocketMQResource(ResourceType.GROUP)
private String consumerGroup;

@RocketMQResource(ResourceType.TOPIC)
private String topic;

// if topicList is provided, topic will be ignored
@RocketMQResource(ResourceType.TOPIC)
private String topicList;

@Override
public void checkFields() throws RemotingCommandException {
}

public List<String> fetchTopicList() {
if (StringUtils.isBlank(topicList)) {
return Collections.emptyList();
}
return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR));
}

public void updateTopicList(List<String> topicList) {
if (topicList == null) {
return;
}
StringBuilder sb = new StringBuilder();
topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR));
this.setTopicList(sb.toString());
}

public String getTopicList() {
return topicList;
}

public void setTopicList(String topicList) {
this.topicList = topicList;
}

public String getConsumerGroup() {
return consumerGroup;
}
Expand Down

0 comments on commit ecd3007

Please sign in to comment.