diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a9b913192fa..419463c84c7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1947,16 +1947,14 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, final RemotingCommand response = RemotingCommand.createResponseCommand(null); try { final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); - ConsumeStats consumeStats = new ConsumeStats(); + List topicListProvided = requestHeader.fetchTopicList(); + String topicProvided = requestHeader.getTopic(); + String group = requestHeader.getConsumerGroup(); - Set topics = new HashSet<>(); - if (UtilAll.isBlank(requestHeader.getTopic())) { - topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup()); - } else { - topics.add(requestHeader.getTopic()); - } + ConsumeStats consumeStats = new ConsumeStats(); + Set topicsForCollecting = getTopicsForCollecting(topicListProvided, topicProvided, group); - for (String topic : topics) { + for (String topic : topicsForCollecting) { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic); @@ -1964,20 +1962,6 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, } TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic); - - { - SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); - - if (null == findSubscriptionData - && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { - LOGGER.warn( - "AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, " - + "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup()); - continue; - } - } - for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(); mq.setTopic(topic); @@ -2038,6 +2022,37 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, return response; } + private Set getTopicsForCollecting(List topicListProvided, String topicProvided, String group) { + Set topicsForCollecting = new HashSet<>(); + if (!topicListProvided.isEmpty()) { + // if topic list is provided, only collect the topics in the list + // and ignore subscription check + topicsForCollecting.addAll(topicListProvided); + } else { + // In order to be compatible with the old logic, + // even if the topic has been provided here, the subscription will be checked. + if (UtilAll.isBlank(topicProvided)) { + topicsForCollecting.addAll( + this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group)); + } else { + topicsForCollecting.add(topicProvided); + } + int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group); + Iterator iterator = topicsForCollecting.iterator(); + while (iterator.hasNext()) { + String topic = iterator.next(); + SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); + if (findSubscriptionData == null && subscriptionCount > 0) { + LOGGER.warn( + "AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}", + topic, group); + iterator.remove(); + } + } + } + return topicsForCollecting; + } + private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 114093e3502..bed6c1c4762 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1748,16 +1748,27 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - return getConsumeStats(addr, consumerGroup, null, timeoutMillis); + return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis); + } + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List topicList, + final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis); } public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, - final long timeoutMillis) + final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis); + } + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, + final List topicList, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(topic); + requestHeader.updateTopicList(topicList); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java index 7b1f9470e1e..8354f83053d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java @@ -21,12 +21,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.action.Action; import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.common.resource.ResourceType; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RequestCode; -@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = Action.GET) +@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.GET) public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader { private static final String CONFIG_TYPE_SEPARATOR = ";"; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java index 51a46879e86..022f3005daf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java @@ -17,27 +17,62 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.action.Action; import org.apache.rocketmq.common.action.RocketMQAction; import org.apache.rocketmq.common.resource.ResourceType; import org.apache.rocketmq.common.resource.RocketMQResource; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; @RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET) public class GetConsumeStatsRequestHeader extends TopicRequestHeader { + private static final String TOPIC_NAME_SEPARATOR = ";"; + @CFNotNull @RocketMQResource(ResourceType.GROUP) private String consumerGroup; + @RocketMQResource(ResourceType.TOPIC) private String topic; + // if topicList is provided, topic will be ignored + @RocketMQResource(value = ResourceType.TOPIC, splitter = TOPIC_NAME_SEPARATOR) + private String topicList; + @Override public void checkFields() throws RemotingCommandException { } + public List fetchTopicList() { + if (StringUtils.isBlank(topicList)) { + return Collections.emptyList(); + } + return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR)); + } + + public void updateTopicList(List topicList) { + if (topicList == null) { + return; + } + StringBuilder sb = new StringBuilder(); + topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR)); + this.setTopicList(sb.toString()); + } + + public String getTopicList() { + return topicList; + } + + public void setTopicList(String topicList) { + this.topicList = topicList; + } + public String getConsumerGroup() { return consumerGroup; } diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index dc5642f88c2..ec5f7571d28 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -456,7 +456,7 @@ public void testMessageTrackDetail() throws InterruptedException, RemotingExcept connection.setConnectionSet(connections); when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(connection); ConsumeStats consumeStats = new ConsumeStats(); - when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), isNull(), anyLong())).thenReturn(consumeStats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String) isNull(), anyLong())).thenReturn(consumeStats); List broadcastMessageTracks = defaultMQAdminExt.messageTrackDetail(messageExt); assertThat(broadcastMessageTracks.size()).isEqualTo(2); assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);