Skip to content

Commit

Permalink
[ISSUE#9097]Refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
KiteSoar committed Feb 6, 2025
1 parent a4a7703 commit 7410626
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,111 @@
*/
package org.apache.rocketmq.broker;

import com.alibaba.fastjson.JSON;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.AsyncTask;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.TaskStatus;

public class AdminAsyncTaskManager {

// taskId -> AsyncTask
private static final Map<String, AsyncTask> ASYNC_TASK_MAP = new ConcurrentHashMap<>();
private final Cache<String, AsyncTask> asyncTaskCache;

// taskName -> taskId
private static final Map<String, List<String>> TASK_NAME_TO_IDS_MAP = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<String>> taskNameToIdsMap;

public AdminAsyncTaskManager() {
this.asyncTaskCache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.MINUTES)
.maximumSize(10000)
.build();

this.taskNameToIdsMap = new ConcurrentHashMap<>();
}

public static String createTask(String taskName) {
/**
* Creates a new asynchronous task with a unique taskId.
*
* @param taskName The name of the task.
* @param future The CompletableFuture representing the asynchronous task.
* @return The generated taskId.
*/
public String createTask(String taskName, CompletableFuture<?> future) {
String taskId = UUID.randomUUID().toString();
ASYNC_TASK_MAP.put(taskId, new AsyncTask(taskName, taskId));
TASK_NAME_TO_IDS_MAP.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId);
AsyncTask task = new AsyncTask(taskName, taskId, future);

asyncTaskCache.put(taskId, task);
taskNameToIdsMap.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId);

future.whenComplete((result, throwable) -> {
if (throwable != null) {
task.setStatus(TaskStatus.ERROR.getValue());
task.setResult(throwable.getMessage());
} else {
task.setStatus(TaskStatus.SUCCESS.getValue());
task.setResult(JSON.toJSONString(result));
}
});

return taskId;
}

public static List<String> getTaskIdsByName(String taskName) {
return TASK_NAME_TO_IDS_MAP.getOrDefault(taskName, Collections.emptyList());
/**
* Get all taskIds associated with a given task name.
*
* @param taskName The name of the task.
* @return List of taskIds for the given task name.
*/
public List<String> getTaskIdsByName(String taskName) {
return taskNameToIdsMap.getOrDefault(taskName, Collections.emptyList());
}

public static AsyncTask getTaskStatus(String taskId) {
return ASYNC_TASK_MAP.get(taskId);
/**
* Get the status of a specific task.
*
* @param taskId The unique identifier of the task.
* @return The AsyncTask object, or null if not found.
*/
public AsyncTask getTaskStatus(String taskId) {
return asyncTaskCache.getIfPresent(taskId);
}

public static void updateTaskStatus(String taskId, int status, String result) {
AsyncTask task = ASYNC_TASK_MAP.get(taskId);
/**
* Update the status and result of a specific task.
*
* @param taskId The unique identifier of the task.
* @param status The new status of the task.
* @param result The result of the task.
*/
public void updateTaskStatus(String taskId, int status, String result) {
AsyncTask task = asyncTaskCache.getIfPresent(taskId);
if (task != null) {
task.setStatus(status);
task.setResult(result);
asyncTaskCache.put(taskId, task);
}
}

public static void removeTask(String taskId) {
AsyncTask task = ASYNC_TASK_MAP.remove(taskId);
/**
* Remove a specific task from the cache and mappings.
*
* @param taskId The unique identifier of the task.
*/
public void removeTask(String taskId) {
AsyncTask task = asyncTaskCache.getIfPresent(taskId);
if (task != null) {
TASK_NAME_TO_IDS_MAP.computeIfPresent(task.getTaskName(), (k, v) -> {
asyncTaskCache.invalidate(taskId);
taskNameToIdsMap.computeIfPresent(task.getTaskName(), (k, v) -> {
v.remove(taskId);
return v.isEmpty() ? null : v;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
Expand Down Expand Up @@ -83,7 +84,6 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TaskStatus;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
Expand Down Expand Up @@ -250,9 +250,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
protected final BrokerController brokerController;
protected Set<String> configBlackList = new HashSet<>();
private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
private final AdminAsyncTaskManager asyncTaskManager;

public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.asyncTaskManager = new AdminAsyncTaskManager();
initConfigBlackList();
}

Expand Down Expand Up @@ -494,18 +496,16 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) {
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
String taskId = AdminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress");
Runnable runnable = () -> {

CompletableFuture<CheckRocksdbCqWriteResult> future = CompletableFuture.supplyAsync(() -> {
try {
CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request);
AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.SUCCESS.getValue(), JSON.toJSONString(checkResult));
LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult));
return doCheckRocksdbCqWriteProgress(ctx, request);
} catch (Exception e) {
AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.ERROR.getValue(), e.getMessage());
LOGGER.error("checkRocksdbCqWriteProgress error", e);
throw new CompletionException(e);
}
};
asyncExecuteWorker.submit(runnable);
}, asyncExecuteWorker);

asyncTaskManager.createTask("checkRocksdbCqWriteProgress", future);
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setBody(JSON.toJSONBytes(result));
Expand Down Expand Up @@ -3607,28 +3607,27 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting
}
return response;
}

private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class);
List<String> taskIds = asyncTaskManager.getTaskIdsByName(requestHeader.getTaskName());
if (CollectionUtils.isEmpty(taskIds)) {
throw new RemotingCommandException("taskName: " + requestHeader.getTaskName() + " not found");
}

try {
List<String> taskIds = AdminAsyncTaskManager.getTaskIdsByName(requestHeader.getTaskName());
if (taskIds == null || taskIds.isEmpty()) {
throw new RemotingCommandException("taskName:" + requestHeader.getTaskName() + "not found");
}

List<AsyncTask> result = new ArrayList<>();
for (String taskId : taskIds) {
AsyncTask asyncTask = AdminAsyncTaskManager.getTaskStatus(taskId);
result.add(asyncTask);

if (asyncTask.getStatus() == TaskStatus.SUCCESS.getValue()) {
AdminAsyncTaskManager.removeTask(taskId);
}
}
int maxResults = Math.min(requestHeader.getMaxLimit(), 200);
Integer filterStatus = requestHeader.getTaskStatus();

List<AsyncTask> asyncTasks = taskIds.stream()
.map(asyncTaskManager::getTaskStatus)
.filter(task -> filterStatus == null || task.getStatus() == filterStatus)
.limit(maxResults)
.collect(Collectors.toList());

RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setBody(JSON.toJSONBytes(result));
response.setBody(JSON.toJSONBytes(asyncTasks));
return response;
} catch (Exception e) {
LOGGER.error("checkAsyncTaskStatus error", e);
Expand Down
10 changes: 9 additions & 1 deletion common/src/main/java/org/apache/rocketmq/common/AsyncTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.common;

import java.util.Date;
import java.util.concurrent.CompletableFuture;

public class AsyncTask {

Expand All @@ -31,12 +32,15 @@ public class AsyncTask {

private String result;

public AsyncTask(String taskName, String taskId) {
private final CompletableFuture<?> future;

public AsyncTask(String taskName, String taskId, CompletableFuture<?> future) {
this.taskName = taskName;
this.taskId = taskId;
this.status = TaskStatus.INIT.getValue();
this.createTime = new Date();
this.result = null;
this.future = future;
}

public String getTaskName() {
Expand Down Expand Up @@ -79,6 +83,10 @@ public void setTaskId(String taskId) {
this.taskId = taskId;
}

public CompletableFuture<?> getFuture() {
return future;
}

public static String getDescFromStatus(int status) {
for (TaskStatus taskStatus : TaskStatus.values()) {
if (taskStatus.getValue() == status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,54 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;

@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, action = Action.GET)
@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, resource = ResourceType.CLUSTER, action = Action.GET)
public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader {

private String taskName;


private int maxLimit; // Default limit: return a maximum of 20 tasks.

private Integer taskStatus; // Optional parameter for filtering tasks with specific statuses

@Override
public void checkFields() throws RemotingCommandException {
if (StringUtils.isBlank(taskName)) {
throw new RemotingCommandException("taskName cannot be null or blank");
}
if (maxLimit <= 0) {
throw new RemotingCommandException("maxLimit must be greater than 0");
}
if (taskStatus != null && (taskStatus < 0 || taskStatus > 3)) {
throw new RemotingCommandException("taskStatus must be between 0 and 3");
}
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskId) {
this.taskName = taskId;

public void setTaskName(String taskName) {
this.taskName = taskName;
}

public int getMaxLimit() {
return maxLimit;
}

public void setMaxLimit(int maxLimit) {
this.maxLimit = maxLimit;
}

public Integer getTaskStatus() {
return taskStatus;
}

public void setTaskStatus(Integer taskStatus) {
this.taskStatus = taskStatus;
}
}
}
Loading

0 comments on commit 7410626

Please sign in to comment.