Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9152] Broker getConsumeStats supports inputting multiple topics #9153

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1947,37 +1947,21 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
try {
final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
ConsumeStats consumeStats = new ConsumeStats();
List<String> topicListProvided = requestHeader.fetchTopicList();
String topicProvided = requestHeader.getTopic();
String group = requestHeader.getConsumerGroup();

Set<String> topics = new HashSet<>();
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
} else {
topics.add(requestHeader.getTopic());
}
ConsumeStats consumeStats = new ConsumeStats();
Set<String> topicsForCollecting = getTopicsForCollecting(topicListProvided, topicProvided, group);

for (String topic : topics) {
for (String topic : topicsForCollecting) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic);
continue;
}

TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);

{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);

if (null == findSubscriptionData
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
LOGGER.warn(
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, "
+ "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup());
continue;
}
}

for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
Expand Down Expand Up @@ -2038,6 +2022,37 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
return response;
}

private Set<String> getTopicsForCollecting(List<String> topicListProvided, String topicProvided, String group) {
Set<String> topicsForCollecting = new HashSet<>();
if (!topicListProvided.isEmpty()) {
// if topic list is provided, only collect the topics in the list
// and ignore subscription check
topicsForCollecting.addAll(topicListProvided);
} else {
// In order to be compatible with the old logic,
// even if the topic has been provided here, the subscription will be checked.
if (UtilAll.isBlank(topicProvided)) {
topicsForCollecting.addAll(
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group));
} else {
topicsForCollecting.add(topicProvided);
}
int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group);
Iterator<String> iterator = topicsForCollecting.iterator();
while (iterator.hasNext()) {
String topic = iterator.next();
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
if (findSubscriptionData == null && subscriptionCount > 0) {
LOGGER.warn(
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}",
topic, group);
iterator.remove();
}
}
}
qianye1001 marked this conversation as resolved.
Show resolved Hide resolved
return topicsForCollecting;
}

private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,16 +1748,27 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List<String> topicList,
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final long timeoutMillis)
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final List<String> topicList, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
requestHeader.updateTopicList(topicList);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;

@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = Action.GET)
@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.GET)
public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader {
private static final String CONFIG_TYPE_SEPARATOR = ";";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,62 @@
package org.apache.rocketmq.remoting.protocol.header;

import com.google.common.base.MoreObjects;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.common.resource.RocketMQResource;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;

@RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET)
public class GetConsumeStatsRequestHeader extends TopicRequestHeader {
private static final String TOPIC_NAME_SEPARATOR = ";";

@CFNotNull
@RocketMQResource(ResourceType.GROUP)
private String consumerGroup;

@RocketMQResource(ResourceType.TOPIC)
private String topic;

// if topicList is provided, topic will be ignored
@RocketMQResource(value = ResourceType.TOPIC, splitter = TOPIC_NAME_SEPARATOR)
private String topicList;

@Override
public void checkFields() throws RemotingCommandException {
}

public List<String> fetchTopicList() {
if (StringUtils.isBlank(topicList)) {
return Collections.emptyList();
}
return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR));
}

public void updateTopicList(List<String> topicList) {
if (topicList == null) {
return;
}
StringBuilder sb = new StringBuilder();
topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR));
this.setTopicList(sb.toString());
}

public String getTopicList() {
return topicList;
}

public void setTopicList(String topicList) {
this.topicList = topicList;
}

public String getConsumerGroup() {
return consumerGroup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void testMessageTrackDetail() throws InterruptedException, RemotingExcept
connection.setConnectionSet(connections);
when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(connection);
ConsumeStats consumeStats = new ConsumeStats();
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), isNull(), anyLong())).thenReturn(consumeStats);
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String) isNull(), anyLong())).thenReturn(consumeStats);
List<MessageTrack> broadcastMessageTracks = defaultMQAdminExt.messageTrackDetail(messageExt);
assertThat(broadcastMessageTracks.size()).isEqualTo(2);
assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);
Expand Down
Loading