diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java index f925fe2b61a..fa6779f78a7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java @@ -30,7 +30,7 @@ public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader { private String taskName; - private int maxLimit; // Default limit: return a maximum of 20 tasks. + private int maxLimit; // Optional parameter for filtering return tasks nums. private Integer taskStatus; // Optional parameter for filtering tasks with specific statuses diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java index 2e69c7af3ff..80b444cc128 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java @@ -120,6 +120,10 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { if (brokerAddr != null) { // If brokerAddr is specified, check that broker first. + if (brokerAddrTable == null || brokerAddrTable.isEmpty()) { + System.out.print("Broker address table is empty."); + return; + } BrokerData brokerData = brokerAddrTable.values().stream() .filter(b -> b.getBrokerAddrs().containsValue(brokerAddr)) .findFirst() diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java index 9c98dfdc205..5a4c881e179 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java @@ -20,10 +20,12 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.cli.CommandLine; @@ -44,8 +46,10 @@ import org.mockito.MockitoAnnotations; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -150,6 +154,146 @@ public void testExecute_EmptyTaskName() throws Exception { verify(defaultMQAdminExt, never()).shutdown(); } + @Test + public void testExecute_WithBrokerAddr() throws Exception { + String brokerAddr = "127.0.0.1:" + listenPort(); + String taskName = "testTask"; + + when(commandLine.hasOption('b')).thenReturn(true); + when(commandLine.getOptionValue('b')).thenReturn(brokerAddr); + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map brokerAddrTable = new HashMap<>(); + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, brokerAddr); + when(brokerData.selectBrokerAddr()).thenReturn(brokerAddr); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + AsyncTask asyncTask = new AsyncTask(taskName, "taskId", new CompletableFuture<>()); + when(defaultMQAdminExt.checkAsyncTaskStatus(eq(brokerAddr), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenReturn(Collections.singletonList(asyncTask)); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt).checkAsyncTaskStatus(eq(brokerAddr), any(CheckAsyncTaskStatusRequestHeader.class)); + verify(defaultMQAdminExt).shutdown(); + } + + @Test + public void testExecute_WithMaxLimit() throws Exception { + String taskName = "testTask"; + int maxLimit = 5; + + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + when(commandLine.hasOption('m')).thenReturn(true); + when(commandLine.getOptionValue('m')).thenReturn(String.valueOf(maxLimit)); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map brokerAddrTable = new HashMap<>(); + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10911"); + when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10911"); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + List tasks = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + tasks.add(new AsyncTask(taskName, "taskId" + i, new CompletableFuture<>())); + } + when(defaultMQAdminExt.checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenReturn(tasks); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt).checkAsyncTaskStatus(anyString(), argThat(header -> header.getMaxLimit() == maxLimit)); + verify(defaultMQAdminExt).shutdown(); + } + + @Test + public void testExecute_WithTaskStatus() throws Exception { + String taskName = "testTask"; + int taskStatus = 1; + + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + when(commandLine.hasOption('s')).thenReturn(true); + when(commandLine.getOptionValue('s')).thenReturn(String.valueOf(taskStatus)); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map brokerAddrTable = new HashMap<>(); + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10911"); + when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10911"); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + AsyncTask asyncTask = new AsyncTask(taskName, "taskId", new CompletableFuture<>()); + asyncTask.setStatus(taskStatus); + when(defaultMQAdminExt.checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenReturn(Collections.singletonList(asyncTask)); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt).checkAsyncTaskStatus(anyString(), argThat(header -> header.getTaskStatus() == taskStatus)); + verify(defaultMQAdminExt).shutdown(); + } + + @Test + public void testExecute_BrokerAddrNotFound() throws Exception { + String brokerAddr = "127.0.0.1:10911"; + String taskName = "testTask"; + + when(commandLine.hasOption('b')).thenReturn(true); + when(commandLine.getOptionValue('b')).thenReturn(brokerAddr); + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map brokerAddrTable = new HashMap<>(); + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10900"); + when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10900"); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + PrintStream originalOut = System.out; + System.setOut(new PrintStream(outputStream)); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + System.setOut(originalOut); + + String output = outputStream.toString().trim(); + assertTrue(output.contains("Broker with address '" + brokerAddr + "' not found.")); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt, never()).checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class)); + verify(defaultMQAdminExt).shutdown(); + } + @Test(expected = RuntimeException.class) public void testExecute_ExceptionThrown() throws Exception { String clusterName = "testCluster";