From ecd300753539a7a6a2d40ea58d2fb6deccd7781f Mon Sep 17 00:00:00 2001 From: qianye Date: Wed, 22 Jan 2025 11:24:33 +0800 Subject: [PATCH 1/5] [ISSUE #9152] The getConsumeStats supports inputting multiple topics --- .../processor/AdminBrokerProcessor.java | 59 ++++++++++++------- .../rocketmq/client/impl/MQClientAPIImpl.java | 15 ++++- .../header/GetConsumeStatsRequestHeader.java | 37 +++++++++++- 3 files changed, 86 insertions(+), 25 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a9b913192fa..419463c84c7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1947,16 +1947,14 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, final RemotingCommand response = RemotingCommand.createResponseCommand(null); try { final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); - ConsumeStats consumeStats = new ConsumeStats(); + List topicListProvided = requestHeader.fetchTopicList(); + String topicProvided = requestHeader.getTopic(); + String group = requestHeader.getConsumerGroup(); - Set topics = new HashSet<>(); - if (UtilAll.isBlank(requestHeader.getTopic())) { - topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup()); - } else { - topics.add(requestHeader.getTopic()); - } + ConsumeStats consumeStats = new ConsumeStats(); + Set topicsForCollecting = getTopicsForCollecting(topicListProvided, topicProvided, group); - for (String topic : topics) { + for (String topic : topicsForCollecting) { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic); @@ -1964,20 +1962,6 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, } TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic); - - { - SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); - - if (null == findSubscriptionData - && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { - LOGGER.warn( - "AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, " - + "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup()); - continue; - } - } - for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(); mq.setTopic(topic); @@ -2038,6 +2022,37 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, return response; } + private Set getTopicsForCollecting(List topicListProvided, String topicProvided, String group) { + Set topicsForCollecting = new HashSet<>(); + if (!topicListProvided.isEmpty()) { + // if topic list is provided, only collect the topics in the list + // and ignore subscription check + topicsForCollecting.addAll(topicListProvided); + } else { + // In order to be compatible with the old logic, + // even if the topic has been provided here, the subscription will be checked. + if (UtilAll.isBlank(topicProvided)) { + topicsForCollecting.addAll( + this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group)); + } else { + topicsForCollecting.add(topicProvided); + } + int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group); + Iterator iterator = topicsForCollecting.iterator(); + while (iterator.hasNext()) { + String topic = iterator.next(); + SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); + if (findSubscriptionData == null && subscriptionCount > 0) { + LOGGER.warn( + "AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}", + topic, group); + iterator.remove(); + } + } + } + return topicsForCollecting; + } + private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 114093e3502..bed6c1c4762 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1748,16 +1748,27 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - return getConsumeStats(addr, consumerGroup, null, timeoutMillis); + return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis); + } + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List topicList, + final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis); } public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, - final long timeoutMillis) + final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis); + } + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, + final List topicList, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(topic); + requestHeader.updateTopicList(topicList); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java index 51a46879e86..c311086d51e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java @@ -17,27 +17,62 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.action.Action; import org.apache.rocketmq.common.action.RocketMQAction; import org.apache.rocketmq.common.resource.ResourceType; import org.apache.rocketmq.common.resource.RocketMQResource; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; @RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET) public class GetConsumeStatsRequestHeader extends TopicRequestHeader { + private static final String TOPIC_NAME_SEPARATOR = ";"; + @CFNotNull @RocketMQResource(ResourceType.GROUP) private String consumerGroup; + @RocketMQResource(ResourceType.TOPIC) private String topic; + // if topicList is provided, topic will be ignored + @RocketMQResource(ResourceType.TOPIC) + private String topicList; + @Override public void checkFields() throws RemotingCommandException { } + public List fetchTopicList() { + if (StringUtils.isBlank(topicList)) { + return Collections.emptyList(); + } + return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR)); + } + + public void updateTopicList(List topicList) { + if (topicList == null) { + return; + } + StringBuilder sb = new StringBuilder(); + topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR)); + this.setTopicList(sb.toString()); + } + + public String getTopicList() { + return topicList; + } + + public void setTopicList(String topicList) { + this.topicList = topicList; + } + public String getConsumerGroup() { return consumerGroup; } From 37e6de33350df9df0f3e862e791bcc57fa6bb941 Mon Sep 17 00:00:00 2001 From: qianye Date: Wed, 22 Jan 2025 11:34:30 +0800 Subject: [PATCH 2/5] [ISSUE #9111] Fix ACL in ExportRocksDBConfigToJsonRequestHeader --- .../header/ExportRocksDBConfigToJsonRequestHeader.java | 3 ++- .../remoting/protocol/header/GetConsumeStatsRequestHeader.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java index 7b1f9470e1e..96ea98cab60 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java @@ -21,12 +21,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.action.Action; import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.common.resource.ResourceType; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RequestCode; -@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = Action.GET) +@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.UPDATE) public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader { private static final String CONFIG_TYPE_SEPARATOR = ";"; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java index c311086d51e..022f3005daf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java @@ -42,7 +42,7 @@ public class GetConsumeStatsRequestHeader extends TopicRequestHeader { private String topic; // if topicList is provided, topic will be ignored - @RocketMQResource(ResourceType.TOPIC) + @RocketMQResource(value = ResourceType.TOPIC, splitter = TOPIC_NAME_SEPARATOR) private String topicList; @Override From b975772d60094a210b383ced5e574365c20011d9 Mon Sep 17 00:00:00 2001 From: qianye Date: Wed, 22 Jan 2025 11:36:43 +0800 Subject: [PATCH 3/5] [ISSUE #9111] Fix ACL in ExportRocksDBConfigToJsonRequestHeader --- .../protocol/header/ExportRocksDBConfigToJsonRequestHeader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java index 96ea98cab60..8354f83053d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java @@ -27,7 +27,7 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RequestCode; -@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.UPDATE) +@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.GET) public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader { private static final String CONFIG_TYPE_SEPARATOR = ";"; From 61ef8fa699dd513ec8cda95d0f3a20ebd0966bb4 Mon Sep 17 00:00:00 2001 From: qianye Date: Wed, 22 Jan 2025 11:45:43 +0800 Subject: [PATCH 4/5] [ISSUE #9152] Fix Ci --- .../org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index dc5642f88c2..ec5f7571d28 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -456,7 +456,7 @@ public void testMessageTrackDetail() throws InterruptedException, RemotingExcept connection.setConnectionSet(connections); when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(connection); ConsumeStats consumeStats = new ConsumeStats(); - when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), isNull(), anyLong())).thenReturn(consumeStats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String) isNull(), anyLong())).thenReturn(consumeStats); List broadcastMessageTracks = defaultMQAdminExt.messageTrackDetail(messageExt); assertThat(broadcastMessageTracks.size()).isEqualTo(2); assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING); From 668de8ba902c37d27349dc85fb90ac0382f1208e Mon Sep 17 00:00:00 2001 From: qianye Date: Wed, 5 Feb 2025 11:49:35 +0800 Subject: [PATCH 5/5] [ISSUE #9152] Fix Ci --- .../processor/AdminBrokerProcessor.java | 5 +- .../header/GetConsumeStatsRequestHeader.java | 2 +- .../GetConsumeStatsRequestHeaderTest.java | 123 ++++++++++++++++++ 3 files changed, 127 insertions(+), 3 deletions(-) create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 419463c84c7..2247e90f569 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1952,7 +1952,7 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, String group = requestHeader.getConsumerGroup(); ConsumeStats consumeStats = new ConsumeStats(); - Set topicsForCollecting = getTopicsForCollecting(topicListProvided, topicProvided, group); + Set topicsForCollecting = getTopicsForCollectingConsumeStats(topicListProvided, topicProvided, group); for (String topic : topicsForCollecting) { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); @@ -2022,7 +2022,8 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, return response; } - private Set getTopicsForCollecting(List topicListProvided, String topicProvided, String group) { + private Set getTopicsForCollectingConsumeStats(List topicListProvided, String topicProvided, + String group) { Set topicsForCollecting = new HashSet<>(); if (!topicListProvided.isEmpty()) { // if topic list is provided, only collect the topics in the list diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java index 022f3005daf..2c51c3f529b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java @@ -57,7 +57,7 @@ public List fetchTopicList() { } public void updateTopicList(List topicList) { - if (topicList == null) { + if (topicList == null || topicList.isEmpty()) { return; } StringBuilder sb = new StringBuilder(); diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java new file mode 100644 index 00000000000..8004305e17a --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class GetConsumeStatsRequestHeaderTest { + + private GetConsumeStatsRequestHeader header; + + @Before + public void setUp() { + header = new GetConsumeStatsRequestHeader(); + } + + @Test + public void updateTopicList_NullTopicList_DoesNotUpdate() { + header.updateTopicList(null); + assertNull(header.getTopicList()); + } + + @Test + public void updateTopicList_EmptyTopicList_SetsEmptyString() { + header.updateTopicList(Collections.emptyList()); + assertNull(header.getTopicList()); + } + + @Test + public void updateTopicList_SingleTopic_SetsSingleTopicString() { + List topicList = Collections.singletonList("TopicA"); + header.updateTopicList(topicList); + assertEquals("TopicA;", header.getTopicList()); + } + + @Test + public void updateTopicList_MultipleTopics_SetsMultipleTopicsString() { + List topicList = Arrays.asList("TopicA", "TopicB", "TopicC"); + header.updateTopicList(topicList); + assertEquals("TopicA;TopicB;TopicC;", header.getTopicList()); + } + + @Test + public void updateTopicList_RepeatedTopics_SetsRepeatedTopicsString() { + List topicList = Arrays.asList("TopicA", "TopicA", "TopicB"); + header.updateTopicList(topicList); + assertEquals("TopicA;TopicA;TopicB;", header.getTopicList()); + } + + @Test + public void fetchTopicList_NullTopicList_ReturnsEmptyList() { + header.setTopicList(null); + List topicList = header.fetchTopicList(); + assertEquals(Collections.emptyList(), topicList); + + header.updateTopicList(new ArrayList<>()); + topicList = header.fetchTopicList(); + assertEquals(Collections.emptyList(), topicList); + } + + @Test + public void fetchTopicList_EmptyTopicList_ReturnsEmptyList() { + header.setTopicList(""); + List topicList = header.fetchTopicList(); + assertEquals(Collections.emptyList(), topicList); + } + + @Test + public void fetchTopicList_BlankTopicList_ReturnsEmptyList() { + header.setTopicList(" "); + List topicList = header.fetchTopicList(); + assertEquals(Collections.emptyList(), topicList); + } + + @Test + public void fetchTopicList_SingleTopic_ReturnsSingleTopicList() { + header.setTopicList("TopicA"); + List topicList = header.fetchTopicList(); + assertEquals(Collections.singletonList("TopicA"), topicList); + } + + @Test + public void fetchTopicList_MultipleTopics_ReturnsTopicList() { + header.setTopicList("TopicA;TopicB;TopicC"); + List topicList = header.fetchTopicList(); + assertEquals(Arrays.asList("TopicA", "TopicB", "TopicC"), topicList); + } + + @Test + public void fetchTopicList_TopicListEndsWithSeparator_ReturnsTopicList() { + header.setTopicList("TopicA;TopicB;"); + List topicList = header.fetchTopicList(); + assertEquals(Arrays.asList("TopicA", "TopicB"), topicList); + } + + @Test + public void fetchTopicList_TopicListStartsWithSeparator_ReturnsTopicList() { + header.setTopicList(";TopicA;TopicB"); + List topicList = header.fetchTopicList(); + assertEquals(Arrays.asList("TopicA", "TopicB"), topicList); + } +}