Skip to content

Commit

Permalink
[ISSUE#9097]Add UT.
Browse files Browse the repository at this point in the history
  • Loading branch information
KiteSoar committed Feb 6, 2025
1 parent 7410626 commit 2104751
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader {

private String taskName;

private int maxLimit; // Default limit: return a maximum of 20 tasks.
private int maxLimit; // Optional parameter for filtering return tasks nums.

private Integer taskStatus; // Optional parameter for filtering tasks with specific statuses

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {

if (brokerAddr != null) {
// If brokerAddr is specified, check that broker first.
if (brokerAddrTable == null || brokerAddrTable.isEmpty()) {
System.out.print("Broker address table is empty.");
return;
}
BrokerData brokerData = brokerAddrTable.values().stream()
.filter(b -> b.getBrokerAddrs().containsValue(brokerAddr))
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
Expand All @@ -44,8 +46,10 @@
import org.mockito.MockitoAnnotations;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -150,6 +154,146 @@ public void testExecute_EmptyTaskName() throws Exception {
verify(defaultMQAdminExt, never()).shutdown();
}

@Test
public void testExecute_WithBrokerAddr() throws Exception {
String brokerAddr = "127.0.0.1:" + listenPort();
String taskName = "testTask";

when(commandLine.hasOption('b')).thenReturn(true);
when(commandLine.getOptionValue('b')).thenReturn(brokerAddr);
when(commandLine.hasOption('t')).thenReturn(true);
when(commandLine.getOptionValue('t')).thenReturn(taskName);

ClusterInfo clusterInfo = new ClusterInfo();
Map<String, BrokerData> brokerAddrTable = new HashMap<>();
BrokerData brokerData = spy(new BrokerData());
brokerData.setBrokerAddrs(new HashMap<>());
brokerData.getBrokerAddrs().put(0L, brokerAddr);
when(brokerData.selectBrokerAddr()).thenReturn(brokerAddr);
brokerAddrTable.put("brokerA", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddrTable);

when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);

AsyncTask asyncTask = new AsyncTask(taskName, "taskId", new CompletableFuture<>());
when(defaultMQAdminExt.checkAsyncTaskStatus(eq(brokerAddr), any(CheckAsyncTaskStatusRequestHeader.class)))
.thenReturn(Collections.singletonList(asyncTask));

checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook);

verify(defaultMQAdminExt).start();
verify(defaultMQAdminExt).examineBrokerClusterInfo();
verify(defaultMQAdminExt).checkAsyncTaskStatus(eq(brokerAddr), any(CheckAsyncTaskStatusRequestHeader.class));
verify(defaultMQAdminExt).shutdown();
}

@Test
public void testExecute_WithMaxLimit() throws Exception {
String taskName = "testTask";
int maxLimit = 5;

when(commandLine.hasOption('t')).thenReturn(true);
when(commandLine.getOptionValue('t')).thenReturn(taskName);
when(commandLine.hasOption('m')).thenReturn(true);
when(commandLine.getOptionValue('m')).thenReturn(String.valueOf(maxLimit));

ClusterInfo clusterInfo = new ClusterInfo();
Map<String, BrokerData> brokerAddrTable = new HashMap<>();
BrokerData brokerData = spy(new BrokerData());
brokerData.setBrokerAddrs(new HashMap<>());
brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10911");
when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10911");
brokerAddrTable.put("brokerA", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddrTable);

when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);

List<AsyncTask> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(new AsyncTask(taskName, "taskId" + i, new CompletableFuture<>()));
}
when(defaultMQAdminExt.checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class)))
.thenReturn(tasks);

checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook);

verify(defaultMQAdminExt).start();
verify(defaultMQAdminExt).examineBrokerClusterInfo();
verify(defaultMQAdminExt).checkAsyncTaskStatus(anyString(), argThat(header -> header.getMaxLimit() == maxLimit));
verify(defaultMQAdminExt).shutdown();
}

@Test
public void testExecute_WithTaskStatus() throws Exception {
String taskName = "testTask";
int taskStatus = 1;

when(commandLine.hasOption('t')).thenReturn(true);
when(commandLine.getOptionValue('t')).thenReturn(taskName);
when(commandLine.hasOption('s')).thenReturn(true);
when(commandLine.getOptionValue('s')).thenReturn(String.valueOf(taskStatus));

ClusterInfo clusterInfo = new ClusterInfo();
Map<String, BrokerData> brokerAddrTable = new HashMap<>();
BrokerData brokerData = spy(new BrokerData());
brokerData.setBrokerAddrs(new HashMap<>());
brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10911");
when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10911");
brokerAddrTable.put("brokerA", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddrTable);

when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);

AsyncTask asyncTask = new AsyncTask(taskName, "taskId", new CompletableFuture<>());
asyncTask.setStatus(taskStatus);
when(defaultMQAdminExt.checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class)))
.thenReturn(Collections.singletonList(asyncTask));

checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook);

verify(defaultMQAdminExt).start();
verify(defaultMQAdminExt).examineBrokerClusterInfo();
verify(defaultMQAdminExt).checkAsyncTaskStatus(anyString(), argThat(header -> header.getTaskStatus() == taskStatus));
verify(defaultMQAdminExt).shutdown();
}

@Test
public void testExecute_BrokerAddrNotFound() throws Exception {
String brokerAddr = "127.0.0.1:10911";
String taskName = "testTask";

when(commandLine.hasOption('b')).thenReturn(true);
when(commandLine.getOptionValue('b')).thenReturn(brokerAddr);
when(commandLine.hasOption('t')).thenReturn(true);
when(commandLine.getOptionValue('t')).thenReturn(taskName);

ClusterInfo clusterInfo = new ClusterInfo();
Map<String, BrokerData> brokerAddrTable = new HashMap<>();
BrokerData brokerData = spy(new BrokerData());
brokerData.setBrokerAddrs(new HashMap<>());
brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10900");
when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10900");
brokerAddrTable.put("brokerA", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddrTable);
when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PrintStream originalOut = System.out;
System.setOut(new PrintStream(outputStream));

checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook);

System.setOut(originalOut);

String output = outputStream.toString().trim();
assertTrue(output.contains("Broker with address '" + brokerAddr + "' not found."));

verify(defaultMQAdminExt).start();
verify(defaultMQAdminExt).examineBrokerClusterInfo();
verify(defaultMQAdminExt, never()).checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class));
verify(defaultMQAdminExt).shutdown();
}

@Test(expected = RuntimeException.class)
public void testExecute_ExceptionThrown() throws Exception {
String clusterName = "testCluster";
Expand Down

0 comments on commit 2104751

Please sign in to comment.