diff --git a/inlong-tubemq/bin/tubectl b/inlong-tubemq/bin/tubectl
new file mode 100755
index 00000000000..cb031a93633
--- /dev/null
+++ b/inlong-tubemq/bin/tubectl
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ -z "$BASE_DIR" ] ; then
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+ BASE_DIR=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ BASE_DIR=`cd "$BASE_DIR" && pwd`
+ #echo "TubeMQ master is at $BASE_DIR"
+fi
+source $BASE_DIR/bin/env.sh
+$JAVA $TOOLS_ARGS org.apache.inlong.tubemq.server.tools.cli.CommandToolMain $@
diff --git a/inlong-tubemq/bin/tubectl.cmd b/inlong-tubemq/bin/tubectl.cmd
new file mode 100644
index 00000000000..f6c0ebe2400
--- /dev/null
+++ b/inlong-tubemq/bin/tubectl.cmd
@@ -0,0 +1,31 @@
+@rem
+@rem Licensed to the Apache Software Foundation (ASF) under one
+@rem or more contributor license agreements. See the NOTICE file
+@rem distributed with this work for additional information
+@rem regarding copyright ownership. The ASF licenses this file
+@rem to you under the Apache License, Version 2.0 (the
+@rem "License"); you may not use this file except in compliance
+@rem with the License. You may obtain a copy of the License at
+@rem
+@rem http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing,
+@rem software distributed under the License is distributed on an
+@rem "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@rem KIND, either express or implied. See the License for the
+@rem specific language governing permissions and limitations
+@rem under the License.
+@rem
+
+REM Windows Command-line Tool for TubeMQ
+REM please do not change any command or variable in this script, check out
+REM env.cmd for details.
+
+setlocal
+call "%~dp0env.cmd"
+
+set TUBECTLMAIN=org.apache.inlong.tubemq.server.tools.cli.CommandToolMain
+
+echo on
+call %JAVA% %MASTER_JVM_OPTS% %GENERIC_ARGS% "%TUBECTLMAIN%" %@
+endlocal
\ No newline at end of file
diff --git a/inlong-tubemq/tubemq-server/pom.xml b/inlong-tubemq/tubemq-server/pom.xml
index b15950fb650..9521857a043 100644
--- a/inlong-tubemq/tubemq-server/pom.xml
+++ b/inlong-tubemq/tubemq-server/pom.xml
@@ -187,6 +187,10 @@
mockito-core
test
+
+ com.beust
+ jcommander
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommand.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommand.java
new file mode 100644
index 00000000000..0bb4d7e3ff6
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommand.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+/**
+ * Class for parse command.
+ */
+public abstract class AbstractCommand {
+
+ protected final JCommander jcommander;
+
+ @Parameter(names = {"-h", "--help"}, help = true, hidden = true)
+ private boolean help;
+
+ public AbstractCommand(String cmdName) {
+ jcommander = new JCommander();
+ jcommander.setProgramName("tubectl " + cmdName);
+ }
+
+ public boolean run(String[] args) {
+
+ if (help) {
+ jcommander.usage();
+ return true;
+ }
+
+ try {
+ jcommander.parse(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ jcommander.usage();
+ return false;
+ }
+
+ String cmd = jcommander.getParsedCommand();
+ if (cmd == null) {
+ jcommander.usage();
+ return false;
+ } else {
+ JCommander obj = jcommander.getCommands().get(cmd);
+ AbstractCommandRunner commandRunner = (AbstractCommandRunner) obj.getObjects().get(0);
+ try {
+ commandRunner.run();
+ return true;
+ } catch (ParameterException e) {
+ System.err.println(e.getMessage() + System.lineSeparator());
+ return false;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommandRunner.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommandRunner.java
new file mode 100644
index 00000000000..0e4e0d1fe7c
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommandRunner.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+/**
+ * The runner of command.
+ */
+public abstract class AbstractCommandRunner {
+
+ /**
+ * Execute the specified command.
+ */
+ abstract void run() throws Exception;
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliWebapiAdmin.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliWebapiAdmin.java
new file mode 100644
index 00000000000..26bfea71080
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliWebapiAdmin.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+import org.apache.inlong.tubemq.server.common.fielddef.CliArgDef;
+import org.apache.inlong.tubemq.server.common.utils.HttpUtils;
+
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class is use to process Web Api Request process.
+ */
+public class CliWebapiAdmin extends CliAbstractBase {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(CliBrokerAdmin.class);
+
+ private static final String defMasterPortal = "127.0.0.1:8080";
+
+ private Map requestParams;
+
+ public CliWebapiAdmin() {
+ super(null);
+ initCommandOptions();
+ }
+
+ /**
+ * Construct CliWebapiAdmin with request parameters
+ *
+ * @param requestParams Request parameters map
+ */
+ public CliWebapiAdmin(Map requestParams) {
+ this();
+ this.requestParams = requestParams;
+ }
+
+ /**
+ * Init command options
+ */
+ @Override
+ protected void initCommandOptions() {
+ // add the cli required parameters
+ addCommandOption(CliArgDef.MASTERPORTAL);
+ addCommandOption(CliArgDef.ADMINMETHOD);
+ addCommandOption(CliArgDef.METHOD);
+ }
+
+ /**
+ * Call the Web API
+ *
+ * @param args Request parameters of method name,
+ * {"--method", "admin_query_topic_info"} as an example
+ */
+ @Override
+ public boolean processParams(String[] args) throws Exception {
+ CommandLine cli = parser.parse(options, args);
+ if (cli == null) {
+ throw new ParseException("Parse args failure");
+ }
+ if (cli.hasOption(CliArgDef.VERSION.longOpt)) {
+ version();
+ }
+ if (cli.hasOption(CliArgDef.HELP.longOpt)) {
+ help();
+ }
+ String masterAddr = defMasterPortal;
+ if (cli.hasOption(CliArgDef.MASTERPORTAL.longOpt)) {
+ masterAddr = cli.getOptionValue(CliArgDef.MASTERPORTAL.longOpt);
+ if (TStringUtils.isBlank(masterAddr)) {
+ throw new Exception(CliArgDef.MASTERPORTAL.longOpt + " is required!");
+ }
+ }
+ JsonObject result = null;
+ String masterUrl = "http://" + masterAddr + "/webapi.htm";
+ if (cli.hasOption(CliArgDef.ADMINMETHOD.longOpt)) {
+ Map inParamMap = new HashMap<>();
+ inParamMap.put(CliArgDef.METHOD.longOpt, "admin_get_methods");
+ result = HttpUtils.requestWebService(masterUrl, inParamMap);
+ System.out.println(formatResult(result));
+ System.exit(0);
+ }
+ String methodStr = cli.getOptionValue(CliArgDef.METHOD.longOpt);
+ if (TStringUtils.isBlank(methodStr)) {
+ throw new Exception(CliArgDef.METHOD.longOpt + " is required!");
+ }
+ requestParams.put(CliArgDef.METHOD.longOpt, methodStr);
+ Map convertedRequestParams = convertRequestParams(requestParams);
+ result = HttpUtils.requestWebService(masterUrl, convertedRequestParams);
+ String formattedResult = formatResult(result);
+ System.out.println(formattedResult);
+ return true;
+ }
+
+ /**
+ * Convert request paramters map
+ *
+ * @param requestParamsMap Map
+ * @return a converted map
+ */
+ private Map convertRequestParams(Map requestParamsMap) {
+ // convert object values to string ones
+ Map converttedrequestParamsMap = new HashMap<>();
+ for (String k : requestParamsMap.keySet()) {
+ converttedrequestParamsMap.put(k, String.valueOf(requestParamsMap.get(k)));
+ }
+ return converttedrequestParamsMap;
+ }
+
+ /**
+ * Convert json content to specific output format
+ *
+ * @param result JsonObject
+ * @return formatted results
+ */
+ private String formatResult(JsonObject result) {
+ // format output results
+ return new GsonBuilder().setPrettyPrinting().create().toJson(result);
+ }
+
+ public static void main(String[] args) {
+ CliWebapiAdmin cliWebapiAdmin = new CliWebapiAdmin();
+ try {
+ cliWebapiAdmin.processParams(args);
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ logger.error(ex.getMessage());
+ cliWebapiAdmin.help();
+ }
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CommandToolMain.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CommandToolMain.java
new file mode 100644
index 00000000000..ba8acaa7cbe
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CommandToolMain.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+import java.util.Arrays;
+
+/**
+ * Command tool main.
+ */
+public class CommandToolMain {
+
+ private final JCommander jcommander;
+ @Parameter(names = {"-h", "--help"}, help = true, description = "Get all command about tubectl.")
+ boolean help;
+
+ CommandToolMain() {
+ jcommander = new JCommander();
+ jcommander.setProgramName("tubectl");
+ jcommander.addObject(this);
+ jcommander.addCommand("topic", new TopicCommand());
+ jcommander.addCommand("message", new MessageCommand());
+ jcommander.addCommand("group", new ConsumerGroupCommand());
+ }
+
+ boolean run(String[] args) {
+ try {
+ jcommander.parse(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ jcommander.usage();
+ return false;
+ }
+
+ if (help || args.length == 0) {
+ jcommander.usage();
+ return true;
+ }
+
+ String cmd = args[0];
+ JCommander obj = jcommander.getCommands().get(cmd);
+ AbstractCommand cmdObj = (AbstractCommand) obj.getObjects().get(0);
+ return cmdObj.run(Arrays.copyOfRange(args, 1, args.length));
+ }
+
+ public static void main(String[] args) {
+ CommandToolMain tubectlTool = new CommandToolMain();
+ if (tubectlTool.run(args)) {
+ System.exit(0);
+ } else {
+ System.exit(1);
+ }
+ }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommand.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommand.java
new file mode 100644
index 00000000000..28ab64909f7
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommand.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Consumer group management
+ */
+@Parameters(commandDescription = "Command for consumer group")
+public class ConsumerGroupCommand extends AbstractCommand {
+
+ @Parameter()
+ private List params;
+
+ final private static String[] requestMethod = new String[]{"--method", ""};
+ final private static Map requestParams = new HashMap<>();
+ final private static CliWebapiAdmin cliWebapiAdmin = new CliWebapiAdmin(requestParams);
+
+ public ConsumerGroupCommand() {
+ super("group");
+
+ jcommander.addCommand("list", new CgroupList());
+ jcommander.addCommand("create", new CgroupCreate());
+ jcommander.addCommand("delete", new CgroupDelete());
+ }
+
+ @Parameters(commandDescription = "List consumer group")
+ private static class CgroupList extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params = new ArrayList<>();
+
+ @Parameter(names = {"-t", "--topic"}, order = 0, description = "Topic name")
+ private String topicName;
+
+ @Parameter(names = {"-g", "--group"}, order = 1, description = "Consumer group name")
+ private String groupName;
+
+ @Parameter(names = {"-c", "--creator"}, order = 3, description = "Record creator")
+ private String createUser;
+
+ @Override
+ void run() {
+ try {
+ requestMethod[1] = "admin_query_allowed_consumer_group_info";
+ requestParams.clear();
+ if (topicName != null)
+ requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+ if (groupName != null)
+ requestParams.put(WebFieldDef.GROUPNAME.name, groupName);
+ if (createUser != null)
+ requestParams.put(WebFieldDef.CREATEUSER.name, createUser);
+ cliWebapiAdmin.processParams(requestMethod);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Create consumer group")
+ private static class CgroupCreate extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params = new ArrayList<>();
+
+ @Parameter(names = {"-t", "--topic"}, order = 0, required = true, description = "Topic name")
+ private String topicName;
+
+ @Parameter(names = {"-g",
+ "--group"}, order = 1, required = true, description = "Consumer group name")
+ private String groupName;
+
+ @Parameter(names = {"-at",
+ "--auth-token"}, order = 2, required = true, description = "Admin api operation authorization code")
+ private String confModAuthToken;
+
+ @Parameter(names = {"-c", "--creator"}, order = 3, required = true, description = "Record creator")
+ private String createUser;
+
+ @Parameter(names = {"-cd", "--create-date"}, order = 4, description = "Record creation date")
+ private String createDate;
+
+ @Override
+ void run() {
+ try {
+ requestMethod[1] = "admin_add_authorized_consumergroup_info";
+ requestParams.clear();
+ if (topicName != null)
+ requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+ if (groupName != null)
+ requestParams.put(WebFieldDef.GROUPNAME.name, groupName);
+ if (confModAuthToken != null)
+ requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+ if (createUser != null)
+ requestParams.put(WebFieldDef.CREATEUSER.name, createUser);
+ if (createDate != null)
+ requestParams.put(WebFieldDef.CREATEDATE.name, createDate);
+ cliWebapiAdmin.processParams(requestMethod);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Delete consumer group")
+ private static class CgroupDelete extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params = new ArrayList<>();
+
+ @Parameter(names = {"-t", "--topic"}, order = 0, required = true, description = "Topic name")
+ private String topicName;
+
+ @Parameter(names = {"-at",
+ "--auth-token"}, order = 1, required = true, description = "Admin api operation authorization code")
+ private String confModAuthToken;
+
+ @Parameter(names = {"-m", "--modifier"}, required = true, order = 2, description = "Record modifier")
+ private String modifyUser;
+
+ @Parameter(names = {"-g", "--group"}, order = 3, description = "Consumer group name")
+ private String groupName;
+
+ @Override
+ void run() {
+ try {
+ requestMethod[1] = "admin_delete_allowed_consumer_group_info";
+ requestParams.clear();
+ if (topicName != null)
+ requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+ if (confModAuthToken != null)
+ requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+ if (modifyUser != null)
+ requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+ if (groupName != null)
+ requestParams.put(WebFieldDef.GROUPNAME.name, groupName);
+ cliWebapiAdmin.processParams(requestMethod);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommand.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommand.java
new file mode 100644
index 00000000000..87534f7cce6
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommand.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.apache.inlong.tubemq.client.common.ConfirmResult;
+import org.apache.inlong.tubemq.client.common.ConsumeResult;
+import org.apache.inlong.tubemq.client.common.PeerInfo;
+import org.apache.inlong.tubemq.client.common.QueryMetaResult;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.MessageListener;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import org.apache.commons.codec.binary.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Message production and consumption
+ */
+@Parameters(commandDescription = "Command for message production and consumption")
+public class MessageCommand extends AbstractCommand {
+
+ @Parameter()
+ private List params;
+
+ public MessageCommand() {
+ super("message");
+
+ jcommander.addCommand("produce", new MessageProduce());
+ jcommander.addCommand("consume", new MessageConsume());
+ }
+
+ @Parameters(commandDescription = "Produce message")
+ private static class MessageProduce extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params;
+
+ @Parameter(names = {"-ms",
+ "--master-servers"}, required = true, order = 1, description = "The master address(es) to connect to. Format is master1_ip:port[,master2_ip:port]")
+ private String masterServers;
+
+ @Parameter(names = {"-t",
+ "--topic"}, required = true, order = 0, description = "Topic to produce messages")
+ private String topicName;
+
+ @Parameter(names = {"-mt",
+ "--msg-total"}, order = 2, description = "The total number of messages to be produced. -1 means unlimited.")
+ private long msgTotal = -1;
+
+ @Parameter(names = {"-m", "--mode"}, order = 3, description = "Produce mode, must in { sync | async }")
+ private String mode = "async";
+
+ private String body = "";
+
+ private MessageProducer messageProducer;
+
+ private AtomicLong msgCount = new AtomicLong(0L);
+
+ /**
+ * Send messages in synchronous mode
+ *
+ * @param message Message to send
+ * @throws TubeClientException
+ * @throws InterruptedException
+ */
+ private void syncProduce(Message message) throws TubeClientException, InterruptedException {
+ MessageSentResult result = messageProducer.sendMessage(message);
+ if (!result.isSuccess()) {
+ System.out.println("sync send message failed : " + result.getErrMsg());
+ } else {
+ msgCount.getAndIncrement();
+ }
+ }
+
+ /**
+ * Send messages in asynchronous mode
+ *
+ * @param message Message to send
+ * @throws TubeClientException
+ * @throws InterruptedException
+ */
+ private void asyncProduce(Message message) throws TubeClientException, InterruptedException {
+ messageProducer.sendMessage(message, new MessageSentCallback() {
+
+ @Override
+ public void onMessageSent(MessageSentResult result) {
+ if (!result.isSuccess()) {
+ System.out.println("async send message failed : " + result.getErrMsg());
+ } else {
+ msgCount.getAndIncrement();
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ System.out.println("async send message error : " + e);
+ }
+ });
+ }
+
+ /**
+ * Stop a producer and print the total number of messages produced
+ *
+ * @param v total number of messages
+ * @throws TubeClientException
+ * @throws InterruptedException
+ */
+ private void stopProducer(long v) {
+ try {
+ messageProducer.shutdown();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ System.out.println("\n" + v + " message(s) has been produced. Exited.");
+ }
+
+ @Override
+ void run() {
+ try {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (msgTotal == -1)
+ stopProducer(msgCount.get());
+ }));
+
+ final TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
+ final TubeSingleSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
+ messageProducer = messageSessionFactory.createProducer();
+ messageProducer.publish(topicName);
+ byte[] bodyData;
+ final Message message = new Message(topicName, null);
+
+ BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
+ int c = 0;
+ while (msgTotal == -1 || c < msgTotal) {
+ System.out.print(">");
+ body = input.readLine();
+ if (body == null || "".equals(body) || "".equals(body.trim()))
+ continue;
+ bodyData = StringUtils.getBytesUtf8(body);
+ message.setData(bodyData);
+
+ switch (mode) {
+ case "sync":
+ syncProduce(message);
+ break;
+ case "async":
+ asyncProduce(message);
+ break;
+ default:
+ throw new ParameterException("Produce mode, must in { sync | async }");
+ }
+ c++;
+ }
+ stopProducer(msgTotal);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Consume message")
+ private static class MessageConsume extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params;
+
+ @Parameter(names = {"-ms",
+ "--master-servers"}, required = true, order = 2, description = "The master address(es) to connect to. Format is master1_ip:port[,master2_ip:port]")
+ private String masterServers;
+
+ @Parameter(names = {"-t",
+ "--topic"}, required = true, order = 0, description = "Topic to consume messages")
+ private String topicName;
+
+ @Parameter(names = {"-g", "--group"}, required = true, order = 1, description = "Consumer group")
+ private String groupName;
+
+ @Parameter(names = {"-m", "--mode"}, order = 5, description = "Consume mode, must in { pull | push }")
+ private String mode = "pull";
+
+ @Parameter(names = {"-p",
+ "--position"}, order = 3, description = "Consume position, must in { first | latest | max }")
+ private String consumePosition = "first";
+
+ @Parameter(names = {"-po",
+ "--partitions-offsets"}, order = 4, description = "Consume partition ids and their offsets, format is id1:offset1[,id2:offset2][...], for example: 0:0,1:0,2:0")
+ private String consumePartitionsAndOffsets;
+
+ private ClientBalanceConsumer clientBalanceConsumer;
+ private PullMessageConsumer messagePullConsumer;
+ private PushMessageConsumer messagePushConsumer;
+
+ private AtomicLong msgCount = new AtomicLong(0L);
+
+ /**
+ * Create a pullConsumer and consume messages
+ *
+ * @param messageSessionFactory
+ * @param consumerConfig
+ * @throws TubeClientException
+ */
+ private void pullConsumer(MessageSessionFactory messageSessionFactory, ConsumerConfig consumerConfig)
+ throws TubeClientException {
+ messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
+ messagePullConsumer.subscribe(topicName, null);
+ messagePullConsumer.completeSubscribe();
+ while (!messagePullConsumer.isPartitionsReady(1000)) {
+ ThreadUtils.sleep(1000);
+ }
+ System.out.println("Ready to consume messages......");
+ while (true) {
+ ConsumerResult result = messagePullConsumer.getMessage();
+ if (result.isSuccess()) {
+ List messageList = result.getMessageList();
+ for (Message message : messageList) {
+ System.out.println(new String(message.getData()));
+ msgCount.getAndIncrement();
+ }
+ messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
+ }
+ }
+ }
+
+ /**
+ * Create a pushConsumer and consume messages
+ *
+ * @param messageSessionFactory
+ * @param consumerConfig
+ * @throws TubeClientException
+ * @throws InterruptedException
+ */
+ private void pushConsumer(MessageSessionFactory messageSessionFactory, ConsumerConfig consumerConfig)
+ throws TubeClientException, InterruptedException {
+ messagePushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
+ messagePushConsumer.subscribe(topicName, null, new MessageListener() {
+
+ @Override
+ public void receiveMessages(PeerInfo peerInfo, List messages) throws InterruptedException {
+ for (Message message : messages) {
+ System.out.println(new String(message.getData()));
+ msgCount.getAndIncrement();
+ }
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ }
+ });
+ messagePushConsumer.completeSubscribe();
+ CountDownLatch latch = new CountDownLatch(1);
+ latch.await(10, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Create a clientBalanceConsumer and consume messages
+ *
+ * @param messageSessionFactory
+ * @param consumerConfig
+ * @throws TubeClientException
+ */
+ private void balanceConsumer(MessageSessionFactory messageSessionFactory, ConsumerConfig consumerConfig)
+ throws TubeClientException {
+ clientBalanceConsumer = messageSessionFactory.createBalanceConsumer(consumerConfig);
+ ProcessResult procResult = new ProcessResult();
+ QueryMetaResult qryResult = new QueryMetaResult();
+ final Map> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(topicName);
+ if (!clientBalanceConsumer.start(topicAndFiltersMap, -1, 0, procResult)) {
+ System.out.println("Initial balance consumer failure, errcode is " + procResult.getErrCode()
+ + " errMsg is " + procResult.getErrMsg());
+ return;
+ }
+ clientBalanceConsumer.getPartitionMetaInfo(qryResult);
+ Map partMetaInfoMap = qryResult.getPartStatusMap();
+ if (partMetaInfoMap != null && !partMetaInfoMap.isEmpty()) {
+ Set configuredTopicPartitions = partMetaInfoMap.keySet();
+ // parse the consumePartitionsAndOffsets parameters
+ Map assignedPartitionsAndOffsets = new HashMap<>();
+ for (String str : consumePartitionsAndOffsets.split(",")) {
+ String[] splits = str.split(":");
+ assignedPartitionsAndOffsets.put(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
+ }
+ Set assignedPartitionIds = assignedPartitionsAndOffsets.keySet();
+ Set assignedPartitions = new HashSet<>();
+ for (String partKey : configuredTopicPartitions) {
+ long parId = Long.parseLong(partKey.split(":")[2]);
+ if (partMetaInfoMap.get(partKey) && assignedPartitionIds.contains(parId)) {
+ assignedPartitions.add(partKey);
+ Long boostrapOffset = assignedPartitionsAndOffsets.get(parId);
+ // connect to the partitions based on consumePartitionsAndOffsets parameters
+ if (!clientBalanceConsumer.connect2Partition(partKey,
+ boostrapOffset == null ? -1L : boostrapOffset, procResult))
+ System.out.println("connect2Partition failed.");
+ }
+ }
+
+ ConsumeResult csmResult = new ConsumeResult();
+ ConfirmResult cfmResult = new ConfirmResult();
+ while (!clientBalanceConsumer.isPartitionsReady(1000)) {
+ ThreadUtils.sleep(1000);
+ }
+ System.out.println("Ready to consume messages......");
+ while (true) {
+ // get messages
+ if (clientBalanceConsumer.getMessage(csmResult)) {
+ List messageList = csmResult.getMessageList();
+ for (Message message : messageList) {
+ System.out.println(new String(message.getData()));
+ msgCount.getAndIncrement();
+ }
+ // confirm messages to server
+ clientBalanceConsumer.confirmConsume(csmResult.getConfirmContext(), true, cfmResult);
+ }
+ }
+
+ } else {
+ System.out.println("No partitions of the topic are available now.");
+ }
+
+ }
+
+ @Override
+ void run() {
+ try {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ if (clientBalanceConsumer != null)
+ clientBalanceConsumer.shutdown();
+ if (messagePullConsumer != null)
+ messagePullConsumer.shutdown();
+ if (messagePushConsumer != null)
+ messagePushConsumer.shutdown();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ System.out.println(msgCount.get() + " message(s) has been consumed. Exited.");
+ }));
+
+ final ConsumerConfig consumerConfig = new ConsumerConfig(masterServers, groupName);
+ switch (consumePosition) {
+ case "first":
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+ break;
+ case "latest":
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ break;
+ case "max":
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS);
+ break;
+ default:
+ throw new ParameterException("Consume position, must in { first | latest | max }");
+ }
+ final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ if (consumePartitionsAndOffsets != null) {
+ balanceConsumer(messageSessionFactory, consumerConfig);
+ } else {
+ switch (mode) {
+ case "pull":
+ pullConsumer(messageSessionFactory, consumerConfig);
+ break;
+ case "push":
+ pushConsumer(messageSessionFactory, consumerConfig);
+ break;
+ case "balance":
+ balanceConsumer(messageSessionFactory, consumerConfig);
+ break;
+ default:
+ throw new ParameterException("Consume mode, must in { pull | push | balance }");
+ }
+ }
+
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommand.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommand.java
new file mode 100644
index 00000000000..c807d4b80c8
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommand.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Topic management
+ */
+@Parameters(commandDescription = "Command for topic management")
+public class TopicCommand extends AbstractCommand {
+
+ @Parameter()
+ private List params;
+
+ final private static String[] requestMethod = new String[]{"--method", ""};
+
+ final private static Map requestParams = new HashMap<>();
+
+ final private static CliWebapiAdmin cliWebapiAdmin = new CliWebapiAdmin(requestParams);
+
+ public TopicCommand() {
+ super("topic");
+
+ jcommander.addCommand("list", new TopicList());
+ jcommander.addCommand("update", new TopicUpdate());
+ jcommander.addCommand("create", new TopicCreate());
+ jcommander.addCommand("delete", new TopicDelete());
+ }
+
+ @Parameters(commandDescription = "List topic")
+ private static class TopicList extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params;
+
+ @Parameter(names = {"-t", "--topic"}, order = 0, description = "Topic name")
+ private String topicName;
+
+ @Parameter(names = {"-sid", "--topic-status-id"}, order = 1, description = "Topic status ID")
+ private int topicStatusId = 0;
+
+ @Parameter(names = {"-bid", "--broker-id"}, order = 2, description = "Brokers' ID, separated by commas")
+ private String brokerId;
+
+ @Parameter(names = {"-dp", "--delete-policy"}, order = 3, description = "File aging strategy")
+ private String deletePolicy;
+
+ @Parameter(names = {"-np", "--num-partitions"}, order = 4, description = "Number of partitions")
+ private int numPartitions = 3;
+
+ @Parameter(names = {"-nts", "--num-topic-stores"}, order = 5, description = "Number of topic stores")
+ private int numTopicStores = 1;
+
+ @Parameter(names = {"-uft",
+ "--unflush-threshold"}, order = 6, description = "Maximum allowed disk unflushing message count")
+ private int unflushThreshold = 1000;
+
+ @Parameter(names = {"-ufi",
+ "--unflush-interval"}, order = 7, description = "Maximum allowed disk unflushing interval")
+ private int unflushInterval = 10000;
+
+ @Parameter(names = {"-ufd",
+ "--unflush-datahold"}, order = 8, description = "Maximum allowed disk unflushing data size")
+ private int unflushDataHold = 0;
+
+ @Parameter(names = {"-mc",
+ "--memcache-msgcnt-ink"}, order = 9, description = "Maximum allowed memory cache unflushing message count")
+ private int memCacheMsgCntInK = 10;
+
+ @Parameter(names = {"-ms",
+ "--memcache-msgsize-inmb"}, order = 10, description = "Maximum allowed memory cache size in MB")
+ private int memCacheMsgSizeInMB = 2;
+
+ @Parameter(names = {"-mfi",
+ "--memcache-flush-intvl"}, order = 11, description = "Maximum allowed disk unflushing data size")
+ private int memCacheFlushIntvl = 20000;
+
+ @Parameter(names = {"-c", "--creator"}, order = 12, description = "Record creator")
+ private String createUser;
+
+ @Parameter(names = {"-m", "--modifier"}, order = 13, description = "Record modifier")
+ private String modifyUser;
+
+ @Override
+ void run() {
+ try {
+ requestMethod[1] = "admin_query_topic_info";
+ requestParams.clear();
+ if (topicName != null)
+ requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+ requestParams.put(WebFieldDef.TOPICSTATUSID.name, topicStatusId);
+ if (brokerId != null)
+ requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+ if (deletePolicy != null)
+ requestParams.put(WebFieldDef.DELETEPOLICY.name, deletePolicy);
+ requestParams.put(WebFieldDef.NUMPARTITIONS.name, numPartitions);
+ requestParams.put(WebFieldDef.NUMTOPICSTORES.name, numTopicStores);
+ requestParams.put(WebFieldDef.UNFLUSHTHRESHOLD.name, unflushThreshold);
+ requestParams.put(WebFieldDef.UNFLUSHINTERVAL.name, unflushInterval);
+ requestParams.put(WebFieldDef.UNFLUSHDATAHOLD.name, unflushDataHold);
+ requestParams.put(WebFieldDef.UNFMCACHECNTINK.name, memCacheMsgCntInK);
+ requestParams.put(WebFieldDef.MCACHESIZEINMB.name, memCacheMsgSizeInMB);
+ requestParams.put(WebFieldDef.UNFMCACHEINTERVAL.name, memCacheFlushIntvl);
+ if (createUser != null)
+ requestParams.put(WebFieldDef.CREATEUSER.name, createUser);
+ if (modifyUser != null)
+ requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+ cliWebapiAdmin.processParams(requestMethod);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Update topic")
+ private static class TopicUpdate extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params;
+
+ @Parameter(names = {"-t", "--topic"}, order = 0, required = true, description = "Topic name")
+ private String topicName;
+
+ @Parameter(names = {"-bid",
+ "--broker-id"}, order = 1, required = true, description = "Brokers' ID, separated by commas")
+ private String brokerId;
+
+ @Parameter(names = {"-dp", "--delete-policy"}, order = 4, description = "File aging strategy")
+ private String deletePolicy;
+
+ @Parameter(names = {"-np", "--num-partitions"}, order = 5, description = "Number of partitions")
+ private int numPartitions = 3;
+
+ @Parameter(names = {"-uft",
+ "--unflush-threshold"}, order = 6, description = "Maximum allowed disk unflushing message count")
+ private int unflushThreshold = 1000;
+
+ @Parameter(names = {"-ufi",
+ "--unflush-interval"}, order = 7, description = "Maximum allowed disk unflushing interval")
+ private int unflushInterval = 10000;
+
+ @Parameter(names = {"-ufd",
+ "--unflush-datahold"}, order = 8, description = "Maximum allowed disk unflushing data size")
+ private int unflushDataHold = 0;
+
+ @Parameter(names = {"-nts", "--num-topic-stores"}, order = 9, description = "Number of topic stores")
+ private int numTopicStores = 1;
+
+ @Parameter(names = {"-mc",
+ "--memcache-msgcnt-ink"}, order = 10, description = "Maximum allowed memory cache unflushing message count")
+ private int memCacheMsgCntInK = 10;
+
+ @Parameter(names = {"-ms",
+ "--memcache-msgsize-inmb"}, order = 11, description = "Maximum allowed memory cache size in MB")
+ private int memCacheMsgSizeInMB = 2;
+
+ @Parameter(names = {"-mfi",
+ "--memcache-flush-intvl"}, order = 12, description = "Maximum allowed disk unflushing data size")
+ private int memCacheFlushIntvl = 20000;
+
+ @Parameter(names = {"-ap", "--accept-publish"}, order = 13, description = "Enable publishing")
+ private boolean acceptPublish = true;
+
+ @Parameter(names = {"-as", "--accept-subscribe"}, order = 14, description = "Enable subscription")
+ private boolean acceptSubscribe = true;
+
+ @Parameter(names = {"-mms",
+ "--max-msgsize-inmb"}, order = 15, description = "Maximum allowed message length, unit MB")
+ private int maxMsgSizeInMB = 1;
+
+ @Parameter(names = {"-m", "--modifier"}, order = 2, required = true, description = "Record modifier")
+ private String modifyUser;
+
+ @Parameter(names = {"-md", "--modify-date"}, order = 16, description = "Record modification date")
+ private String modifyDate;
+
+ @Parameter(names = {"-at",
+ "--auth-token"}, order = 3, required = true, description = "Admin api operation authorization code")
+ private String confModAuthToken;
+
+ @Override
+ void run() {
+ try {
+ requestMethod[1] = "admin_modify_topic_info";
+ requestParams.clear();
+ if (topicName != null)
+ requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+ if (brokerId != null)
+ requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+ if (deletePolicy != null)
+ requestParams.put(WebFieldDef.DELETEPOLICY.name, deletePolicy);
+ requestParams.put(WebFieldDef.NUMPARTITIONS.name, numPartitions);
+ requestParams.put(WebFieldDef.UNFLUSHTHRESHOLD.name, unflushThreshold);
+ requestParams.put(WebFieldDef.UNFLUSHINTERVAL.name, unflushInterval);
+ requestParams.put(WebFieldDef.UNFLUSHDATAHOLD.name, unflushDataHold);
+ requestParams.put(WebFieldDef.NUMTOPICSTORES.name, numTopicStores);
+ requestParams.put(WebFieldDef.UNFMCACHECNTINK.name, memCacheMsgCntInK);
+ requestParams.put(WebFieldDef.MCACHESIZEINMB.name, memCacheMsgSizeInMB);
+ requestParams.put(WebFieldDef.UNFMCACHEINTERVAL.name, memCacheFlushIntvl);
+ requestParams.put(WebFieldDef.ACCEPTPUBLISH.name, acceptPublish);
+ requestParams.put(WebFieldDef.ACCEPTSUBSCRIBE.name, acceptSubscribe);
+ requestParams.put(WebFieldDef.MAXMSGSIZEINMB.name, maxMsgSizeInMB);
+ if (modifyUser != null)
+ requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+ if (modifyDate != null)
+ requestParams.put(WebFieldDef.MODIFYDATE.name, modifyDate);
+ if (confModAuthToken != null)
+ requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+ cliWebapiAdmin.processParams(requestMethod);
+
+ System.out.println("Reloading broker configure...");
+ requestParams.clear();
+ requestMethod[1] = "admin_reload_broker_configure";
+ if (brokerId != null)
+ requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+ if (modifyUser != null)
+ requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+ if (modifyDate != null)
+ requestParams.put(WebFieldDef.MODIFYDATE.name, modifyDate);
+ if (confModAuthToken != null)
+ requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+ cliWebapiAdmin.processParams(requestMethod);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Create topic")
+ private static class TopicCreate extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params = new ArrayList<>();
+
+ @Parameter(names = {"-t", "--topic"}, order = 0, required = true, description = "Topic name")
+ private String topicName;
+
+ @Parameter(names = {"-bid",
+ "--broker-id"}, order = 1, required = true, description = "Brokers' ID, separated by commas")
+ private String brokerId;
+
+ @Parameter(names = {"-dp", "--delete-policy"}, order = 4, description = "File aging strategy")
+ private String deletePolicy;
+
+ @Parameter(names = {"-np", "--num-partitions"}, order = 5, description = "Number of partitions")
+ private int numPartitions = -1;
+
+ @Parameter(names = {"-uft",
+ "--unflush-threshold"}, order = 6, description = "Maximum allowed disk unflushing message count")
+ private int unflushThreshold = -1;
+
+ @Parameter(names = {"-ufi",
+ "--unflush-interval"}, order = 7, description = "Maximum allowed disk unflushing interval")
+ private int unflushInterval = -1;
+
+ @Parameter(names = {"-ufd",
+ "--unflush-datahold"}, order = 8, description = "Maximum allowed disk unflushing data size")
+ private int unflushDataHold = 0;
+
+ @Parameter(names = {"-nts", "--num-topic-stores"}, order = 9, description = "Number of topic stores")
+ private int numTopicStores = 1;
+
+ @Parameter(names = {"-mc",
+ "--memcache-msgcnt-ink"}, order = 10, description = "Maximum allowed memory cache unflushing message count")
+ private int memCacheMsgCntInK = 10;
+
+ @Parameter(names = {"-ms",
+ "--memcache-msgsize-inmb"}, order = 11, description = "Maximum allowed memory cache size in MB")
+ private int memCacheMsgSizeInMB = 2;
+
+ @Parameter(names = {"-mfi",
+ "--memcache-flush-intvl"}, order = 12, description = "Maximum allowed disk unflushing data size")
+ private int memCacheFlushIntvl = 20000;
+
+ @Parameter(names = {"-ap", "--accept-publish"}, order = 13, description = "Enable publishing")
+ private boolean acceptPublish = true;
+
+ @Parameter(names = {"-as", "--accept-subscribe"}, order = 14, description = "Enable subscription")
+ private boolean acceptSubscribe = true;
+
+ @Parameter(names = {"-mms",
+ "--max-msgsize-inmb"}, order = 15, description = "Maximum allowed message length, unit MB")
+ private int maxMsgSizeInMB = 1;
+
+ @Parameter(names = {"-c", "--creator"}, order = 2, required = true, description = "Record creator")
+ private String createUser;
+
+ @Parameter(names = {"-cd", "--create-date"}, order = 16, description = "Record creation date")
+ private String createDate;
+
+ @Parameter(names = {"-at",
+ "--auth-token"}, order = 3, required = true, description = "Admin api operation authorization code")
+ private String confModAuthToken;
+
+ @Override
+ void run() {
+ try {
+ requestMethod[1] = "admin_add_new_topic_record";
+ requestParams.clear();
+ if (topicName != null)
+ requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+ if (brokerId != null)
+ requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+ if (deletePolicy != null)
+ requestParams.put(WebFieldDef.DELETEPOLICY.name, deletePolicy);
+ if (numPartitions != -1)
+ requestParams.put(WebFieldDef.NUMPARTITIONS.name, numPartitions);
+ if (unflushThreshold != -1)
+ requestParams.put(WebFieldDef.UNFLUSHTHRESHOLD.name, unflushThreshold);
+ if (unflushInterval != -1)
+ requestParams.put(WebFieldDef.UNFLUSHINTERVAL.name, unflushInterval);
+ requestParams.put(WebFieldDef.UNFLUSHDATAHOLD.name, unflushDataHold);
+ requestParams.put(WebFieldDef.NUMTOPICSTORES.name, numTopicStores);
+ requestParams.put(WebFieldDef.UNFMCACHECNTINK.name, memCacheMsgCntInK);
+ requestParams.put(WebFieldDef.MCACHESIZEINMB.name, memCacheMsgSizeInMB);
+ requestParams.put(WebFieldDef.UNFMCACHEINTERVAL.name, memCacheFlushIntvl);
+ requestParams.put(WebFieldDef.ACCEPTPUBLISH.name, acceptPublish);
+ requestParams.put(WebFieldDef.ACCEPTSUBSCRIBE.name, acceptSubscribe);
+ requestParams.put(WebFieldDef.MAXMSGSIZEINMB.name, maxMsgSizeInMB);
+ if (createUser != null)
+ requestParams.put(WebFieldDef.CREATEUSER.name, createUser);
+ if (createDate != null)
+ requestParams.put(WebFieldDef.CREATEDATE.name, createDate);
+ if (confModAuthToken != null)
+ requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+ cliWebapiAdmin.processParams(requestMethod);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Delete topic")
+ private static class TopicDelete extends AbstractCommandRunner {
+
+ @Parameter()
+ private List params = new ArrayList<>();
+
+ @Parameter(names = {"-o",
+ "--delete-opt"}, order = 0, description = "Delete options, must in { soft | redo | hard }")
+ private String deleteOpt = "soft";
+
+ @Parameter(names = {"-t", "--topic"}, order = 1, required = true, description = "Topic name")
+ private String topicName;
+
+ @Parameter(names = {"-bid",
+ "--broker-id"}, order = 2, required = true, description = "Brokers' ID, separated by commas")
+ private String brokerId;
+
+ @Parameter(names = {"-m", "--modifier"}, order = 3, required = true, description = "Record modifier")
+ private String modifyUser;
+
+ @Parameter(names = {"-md", "--modify-date"}, order = 5, description = "Record modification date")
+ private String modifyDate;
+
+ @Parameter(names = {"-at",
+ "--auth-token"}, order = 4, required = true, description = "Admin api operation authorization code")
+ private String confModAuthToken;
+
+ private void softDelete() throws Exception {
+ System.out.println("Turning publish and subscribe status to false...");
+ requestMethod[1] = "admin_modify_topic_info";
+ requestParams.put(WebFieldDef.ACCEPTPUBLISH.name, false);
+ requestParams.put(WebFieldDef.ACCEPTSUBSCRIBE.name, false);
+ cliWebapiAdmin.processParams(requestMethod);
+ requestParams.remove(WebFieldDef.ACCEPTPUBLISH.name);
+ requestParams.remove(WebFieldDef.ACCEPTSUBSCRIBE.name);
+
+ System.out.println("Beginning to soft delete...");
+ requestMethod[1] = "admin_delete_topic_info";
+ cliWebapiAdmin.processParams(requestMethod);
+ }
+
+ private void redoDelete() throws Exception {
+ requestMethod[1] = "admin_redo_deleted_topic_info";
+ cliWebapiAdmin.processParams(requestMethod);
+ }
+
+ private void hardDelete() throws Exception {
+ softDelete();
+
+ System.out.println("Beginning to hard delete...");
+ requestMethod[1] = "admin_remove_topic_info";
+ cliWebapiAdmin.processParams(requestMethod);
+ }
+
+ @Override
+ void run() {
+ try {
+ requestParams.clear();
+ if (topicName != null)
+ requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+ if (brokerId != null)
+ requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+ if (modifyUser != null)
+ requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+ if (modifyDate != null)
+ requestParams.put(WebFieldDef.MODIFYDATE.name, modifyDate);
+ if (confModAuthToken != null)
+ requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+ switch (deleteOpt) {
+ case "soft":
+ softDelete();
+ break;
+ case "redo":
+ redoDelete();
+ break;
+ case "hard":
+ hardDelete();
+ break;
+ default:
+ throw new ParameterException("delete option must in { soft | redo | hard }");
+ }
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+
+ }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommandTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommandTest.java
new file mode 100644
index 00000000000..c4f26030746
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommandTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConsumerGroupCommandTest {
+
+ CommandToolMain tubectlTool = null;
+
+ @Before
+ public void setUp() {
+ tubectlTool = new CommandToolMain();
+ }
+
+ @Test
+ public void testConsumerGroupCreate() {
+ String[] arg = {"group", "create", "-t", "b4t1", "-g", "b4t1g1", "-at", "abc", "-c", "admin", "-cd",
+ "20151117151129"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+
+ @Test
+ public void testConsumerGroupList() {
+ String[] arg = {"group", "list", "-t", "b4t1", "-g", "b4t1g1", "-c", "admin"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+
+ @Test
+ public void testConsumerGroupDelete() {
+ String[] arg = {"group", "delete", "-t", "b4t1", "-at", "abc", "-m", "admin", "-g", "b4t1g1"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommandTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommandTest.java
new file mode 100644
index 00000000000..a43ed58a226
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommandTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class MessageCommandTest {
+
+ CommandToolMain tubectlTool = null;
+
+ @Before
+ public void setUp() {
+ tubectlTool = new CommandToolMain();
+ }
+
+ @Test
+ public void testMessageProduceSync() throws UnknownHostException, InterruptedException {
+ InetAddress addr = InetAddress.getLocalHost();
+ int port = 8715;
+ String masterservers = addr.getHostAddress() + ":" + String.valueOf(port);
+
+ String messageBody = "This is a message from testMessageProduceSync.";
+ InputStream in = new ByteArrayInputStream(messageBody.getBytes());
+ System.setIn(in);
+
+ String[] arg = {"message", "produce", "-ms", masterservers, "-t", "b4t4", "-m", "sync", "-mt", "1"};
+ Assert.assertTrue(tubectlTool.run(arg));
+
+ }
+
+ @Test
+ public void testMessageProduceAsync() throws UnknownHostException, InterruptedException {
+ InetAddress addr = InetAddress.getLocalHost();
+ int port = 8715;
+ String masterservers = addr.getHostAddress() + ":" + String.valueOf(port);
+
+ String messageBody = "This is a message from testMessageProduceAsync.";
+ InputStream in = new ByteArrayInputStream(messageBody.getBytes());
+ System.setIn(in);
+
+ String[] arg = {"message", "produce", "-ms", masterservers, "-t", "b4t4", "-m", "async", "-mt", "1"};
+ Assert.assertTrue(tubectlTool.run(arg));
+
+ }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommandTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommandTest.java
new file mode 100644
index 00000000000..b730359f82e
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommandTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TopicCommandTest {
+
+ CommandToolMain tubectlTool = null;
+
+ @Before
+ public void setUp() {
+ tubectlTool = new CommandToolMain();
+ }
+
+ @Test
+ public void testTopicCreate() {
+ String[] arg = {"topic", "create", "-t", "b4t1", "-bid", "4", "-c", "admin", "-at", "abc"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+
+ @Test
+ public void testTopicList() {
+ String[] arg = {"topic", "list"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+
+ @Test
+ public void testTopicUpdate() {
+ String[] arg = {"topic", "update", "-t", "b4t1", "-bid", "4", "-m", "admin", "-at", "abc"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+
+ @Test
+ public void testTopicDeleteSoft() {
+ String[] arg = {"topic", "delete", "-o", "soft", "-t", "b4t1", "-bid", "4", "-m", "admin", "-at", "abc"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+
+ @Test
+ public void testTopicDeleteRedo() {
+ String[] arg = {"topic", "delete", "-o", "redo", "-t", "b4t1", "-bid", "4", "-m", "admin", "-at", "abc"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+
+ @Test
+ public void testTopicDeleteHard() {
+ String[] arg = {"topic", "delete", "-o", "hard", "-t", "b4t1", "-bid", "4", "-m", "admin", "-at", "abc"};
+ Assert.assertTrue(tubectlTool.run(arg));
+ }
+}
diff --git a/licenses/inlong-tubemq-server/LICENSE b/licenses/inlong-tubemq-server/LICENSE
index b5418502d83..abdf55808b4 100644
--- a/licenses/inlong-tubemq-server/LICENSE
+++ b/licenses/inlong-tubemq-server/LICENSE
@@ -418,6 +418,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
log4j:log4j:1.2.17 - Apache Log4j (http://logging.apache.org/log4j/1.2/), (The Apache Software License, Version 2.0)
org.apache.logging.log4j:log4j-core:2.17.2 - Apache Log4j Core (https://logging.apache.org/log4j/2.x/log4j-core/), (Apache License, Version 2.0)
+ com.beust:jcommander:1.78 - jcommander (https://github.com/cbeust/jcommander/tree/1.78), (Apache License, Version 2.0)
org.eclipse.jetty:jetty-http:9.4.48.v20220622 - Jetty :: Http Utility (http://www.eclipse.org/jetty), (Apache Software License - Version 2.0), (Apache 2.0 and EPL 1.0)
org.eclipse.jetty:jetty-io:9.4.48.v20220622 - Jetty :: IO Utility (http://www.eclipse.org/jetty), (Apache Software License - Version 2.0), (Apache 2.0 and EPL 1.0)
org.eclipse.jetty:jetty-security:9.4.48.v20220622 - Jetty :: Security (http://www.eclipse.org/jetty), (Apache Software License - Version 2.0), (Apache 2.0 and EPL 1.0)
diff --git a/licenses/inlong-tubemq-server/NOTICE b/licenses/inlong-tubemq-server/NOTICE
index c5ea4b97d68..bec5a8bbe9b 100644
--- a/licenses/inlong-tubemq-server/NOTICE
+++ b/licenses/inlong-tubemq-server/NOTICE
@@ -215,6 +215,18 @@ limitations under the License.
+========================================================================
+
+jcommander NOTICE
+========================================================================
+
+JCommander Copyright Notices
+============================
+
+Copyright 2010 Cedric Beust
+
+
+
========================================================================
Apache Log4j NOTICE
diff --git a/licenses/inlong-tubemq-server/licenses/LICENSE-jcommander.txt b/licenses/inlong-tubemq-server/licenses/LICENSE-jcommander.txt
new file mode 100644
index 00000000000..477eb7b7ba8
--- /dev/null
+++ b/licenses/inlong-tubemq-server/licenses/LICENSE-jcommander.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2012, Cedric Beust
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.