From a1ea1eb1f2b8187166b1b0e1f646e6476f27dda5 Mon Sep 17 00:00:00 2001 From: caigy Date: Tue, 27 Aug 2024 15:21:53 +0800 Subject: [PATCH] [ISSUE #8576] Support Creating or Updating Subscription Groups in Batch support creating or updating groups in batch support creating or updating groups in batch support creating or updating groups in batch --- .../processor/AdminBrokerProcessor.java | 38 ++++++ .../SubscriptionGroupManager.java | 12 ++ .../SubscriptionGroupManagerTest.java | 57 ++++++++- .../rocketmq/client/impl/MQClientAPIImpl.java | 17 +++ .../remoting/protocol/RequestCode.java | 2 + .../protocol/body/SubscriptionGroupList.java | 42 +++++++ .../tools/admin/DefaultMQAdminExt.java | 6 + .../tools/admin/DefaultMQAdminExtImpl.java | 6 + .../rocketmq/tools/admin/MQAdminExt.java | 4 + .../tools/command/MQAdminStartup.java | 2 + .../UpdateSubGroupListSubCommand.java | 119 ++++++++++++++++++ .../UpdateSubGroupListSubCommandTest.java | 45 +++++++ 12 files changed, 348 insertions(+), 2 deletions(-) create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupList.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupListSubCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupListSubCommandTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index c5419a62df7..3039cf5c97c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -131,6 +131,7 @@ import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody; import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupList; import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; @@ -282,6 +283,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return this.unlockBatchMQ(ctx, request); case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP: return this.updateAndCreateSubscriptionGroup(ctx, request); + case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_LIST: + return this.updateAndCreateSubscriptionGroupList(ctx, request); case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG: return this.getAllSubscriptionGroup(ctx, request); case RequestCode.DELETE_SUBSCRIPTIONGROUP: @@ -1571,6 +1574,41 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c return response; } + private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerContext ctx, RemotingCommand request) { + final long startTime = System.nanoTime(); + + final SubscriptionGroupList subscriptionGroupList = SubscriptionGroupList.decode(request.getBody(), SubscriptionGroupList.class); + final List groupConfigList = subscriptionGroupList.getGroupConfigList(); + + final StringBuilder builder = new StringBuilder(); + for (SubscriptionGroupConfig config : groupConfigList) { + builder.append(config.getGroupName()).append(";"); + } + final String groupNames = builder.toString(); + LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroupList: groupNames: {}, called by {}", + groupNames, + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + try { + this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfigList(groupConfigList); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + } finally { + long executionTime = (System.nanoTime() - startTime) / 1000000L; + LOGGER.info("executionTime of create updateAndCreateSubscriptionGroupList: {} is {} ms", groupNames, executionTime); + InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? + InvocationStatus.SUCCESS : InvocationStatus.FAILURE; + Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + .put(LABEL_INVOCATION_STATUS, status.getName()) + .build(); + BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes); + } + + return response; + } + + private void initConsumerOffset(String clientHost, String groupName, int mode, TopicConfig topicConfig) { String topic = topicConfig.getTopicName(); for (int queueId = 0; queueId < topicConfig.getReadQueueNums(); queueId++) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 1d9614fe582..f2a7e0482b1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -138,6 +139,11 @@ protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName } public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { + updateSubscriptionGroupConfigWithoutPersist(config); + this.persist(); + } + + private void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) { Map newAttributes = request(config); Map currentAttributes = current(config.getGroupName()); @@ -157,7 +163,13 @@ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) } updateDataVersion(); + } + public void updateSubscriptionGroupConfigList(List configList) { + if (null == configList || configList.isEmpty()) { + return; + } + configList.forEach(this::updateSubscriptionGroupConfigWithoutPersist); this.persist(); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java index 3ed4ac11a40..3384d479c6e 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java @@ -18,11 +18,12 @@ package org.apache.rocketmq.broker.subscription; import com.google.common.collect.ImmutableMap; - import java.nio.file.Paths; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.UUID; - import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.SubscriptionGroupAttributes; @@ -39,7 +40,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + @RunWith(MockitoJUnitRunner.class) public class SubscriptionGroupManagerTest { @@ -113,4 +118,52 @@ public void updateSubscriptionGroupConfig() { private boolean notToBeExecuted() { return MixAll.isMac(); } + @Test + public void testUpdateSubscriptionGroupConfigList_NullConfigList() { + if (notToBeExecuted()) { + return; + } + + subscriptionGroupManager.updateSubscriptionGroupConfigList(null); + // Verifying that persist() is not called + verify(subscriptionGroupManager, never()).persist(); + } + + @Test + public void testUpdateSubscriptionGroupConfigList_EmptyConfigList() { + if (notToBeExecuted()) { + return; + } + + subscriptionGroupManager.updateSubscriptionGroupConfigList(Collections.emptyList()); + // Verifying that persist() is not called + verify(subscriptionGroupManager, never()).persist(); + } + + @Test + public void testUpdateSubscriptionGroupConfigList_ValidConfigList() { + if (notToBeExecuted()) { + return; + } + + final List configList = new LinkedList<>(); + final List groupNames = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); + String groupName = String.format("group-%d", i); + config.setGroupName(groupName); + configList.add(config); + groupNames.add(groupName); + } + + subscriptionGroupManager.updateSubscriptionGroupConfigList(configList); + + // Verifying that persist() is called once + verify(subscriptionGroupManager, times(1)).persist(); + + groupNames.forEach(groupName -> + assertThat(subscriptionGroupManager.getSubscriptionGroupTable().get(groupName)).isNotNull()); + + } + } \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index f3d7e7c70f9..8a3d3dd0dcb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -137,6 +137,7 @@ import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody; import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupList; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; @@ -400,6 +401,22 @@ public void createSubscriptionGroup(final String addr, final SubscriptionGroupCo } + public void createSubscriptionGroupList(final String address, final List configs, + final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_LIST, null); + SubscriptionGroupList requestBody = new SubscriptionGroupList(configs); + request.setBody(requestBody.encode()); + + RemotingCommand response = this.remotingClient.invokeSync( + MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), address), request, timeoutMillis); + assert response != null; + if (response.getCode() == ResponseCode.SUCCESS) { + return; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index 3be22fc56b7..f45ff6fa484 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -148,6 +148,8 @@ public class RequestCode { public static final int GET_TOPICS_BY_CLUSTER = 224; + public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_LIST = 225; + public static final int QUERY_TOPICS_BY_CONSUMER = 343; public static final int QUERY_SUBSCRIPTION_BY_CONSUMER = 345; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupList.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupList.java new file mode 100644 index 00000000000..c343ce21118 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupList.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.body; + +import java.util.List; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +public class SubscriptionGroupList extends RemotingSerializable { + @CFNotNull + private List groupConfigList; + + public SubscriptionGroupList() {} + + public SubscriptionGroupList(List groupConfigList) { + this.groupConfigList = groupConfigList; + } + + public List getGroupConfigList() { + return groupConfigList; + } + + public void setGroupConfigList(List groupConfigList) { + this.groupConfigList = groupConfigList; + } + +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 37dd322488f..5be6d24ff76 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -238,6 +238,12 @@ public void createAndUpdateSubscriptionGroupConfig(String addr, defaultMQAdminExtImpl.createAndUpdateSubscriptionGroupConfig(addr, config); } + @Override + public void createAndUpdateSubscriptionGroupConfigList(String brokerAddr, + List configs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + defaultMQAdminExtImpl.createAndUpdateSubscriptionGroupConfigList(brokerAddr, configs); + } + @Override public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index b5a20673dab..9546235d3e8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -318,6 +318,12 @@ public void createAndUpdateSubscriptionGroupConfig(String addr, this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(addr, config, timeoutMillis); } + @Override + public void createAndUpdateSubscriptionGroupConfigList(String brokerAddr, + List configs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroupList(brokerAddr, configs, timeoutMillis); + } + @Override public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 96940c38b26..9dff3cbab95 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -118,6 +118,10 @@ void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + void createAndUpdateSubscriptionGroupConfigList(String brokerAddr, + List configs) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; + SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index d56ed053268..43e4259c4e1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -67,6 +67,7 @@ import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand; import org.apache.rocketmq.tools.command.consumer.SetConsumeModeSubCommand; import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand; +import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupListSubCommand; import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand; import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand; import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand; @@ -192,6 +193,7 @@ public static void initCommand() { initCommand(new UpdateTopicListSubCommand()); initCommand(new DeleteTopicSubCommand()); initCommand(new UpdateSubGroupSubCommand()); + initCommand(new UpdateSubGroupListSubCommand()); initCommand(new SetConsumeModeSubCommand()); initCommand(new DeleteSubscriptionGroupCommand()); initCommand(new UpdateBrokerConfigSubCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupListSubCommand.java new file mode 100644 index 00000000000..a36f50bd1b0 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupListSubCommand.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.consumer; + +import com.alibaba.fastjson2.JSON; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class UpdateSubGroupListSubCommand implements SubCommand { + @Override + public String commandName() { + return "updateSubGroupList"; + } + + @Override + public String commandDesc() { + return "Update or create subscription group in batch"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + final OptionGroup optionGroup = new OptionGroup(); + Option opt = new Option("b", "brokerAddr", true, "create groups to which broker"); + optionGroup.addOption(opt); + + opt = new Option("c", "clusterName", true, "create groups to which cluster"); + optionGroup.addOption(opt); + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); + + opt = new Option("f", "filename", true, + "Path to a file with a list of org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig in json format"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, + RPCHook rpcHook) throws SubCommandException { + final DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + final String fileName = commandLine.getOptionValue('f').trim(); + + try { + final Path filePath = Paths.get(fileName); + if (!Files.exists(filePath)) { + System.out.printf("the file path %s does not exists%n", fileName); + return; + } + final byte[] groupConfigListBytes = Files.readAllBytes(filePath); + final List groupConfigs = JSON.parseArray(groupConfigListBytes, SubscriptionGroupConfig.class); + if (null == groupConfigs || groupConfigs.isEmpty()) { + return; + } + + if (commandLine.hasOption('b')) { + String brokerAddress = commandLine.getOptionValue('b').trim(); + defaultMQAdminExt.start(); + defaultMQAdminExt.createAndUpdateSubscriptionGroupConfigList(brokerAddress, groupConfigs); + + System.out.printf("submit batch of group config to %s success, please check the result later.%n", + brokerAddress); + return; + + } else if (commandLine.hasOption('c')) { + final String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + + Set masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String brokerAddress : masterSet) { + defaultMQAdminExt.createAndUpdateSubscriptionGroupConfigList(brokerAddress, groupConfigs); + + System.out.printf("submit batch of subscription group config to %s success, please check the result later.%n", + brokerAddress); + } + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupListSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupListSubCommandTest.java new file mode 100644 index 00000000000..0c23787709b --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupListSubCommandTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.consumer; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class UpdateSubGroupListSubCommandTest { + + @Test + public void testArguments() { + UpdateSubGroupListSubCommand cmd = new UpdateSubGroupListSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + + String brokerAddress = "127.0.0.1:10911"; + String inputFileName = "groups.json"; + String[] args = new String[] {"-b " + brokerAddress, "-f " + inputFileName}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, + cmd.buildCommandlineOptions(options), new DefaultParser()); + + assertEquals(brokerAddress, commandLine.getOptionValue('b').trim()); + assertEquals(inputFileName, commandLine.getOptionValue('f').trim()); + } +} \ No newline at end of file