-
Notifications
You must be signed in to change notification settings - Fork 11.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE#9097] Add new command to check async task status in broker. #9162
base: develop
Are you sure you want to change the base?
Conversation
53d1b72
to
a4a7703
Compare
import org.apache.rocketmq.remoting.protocol.RequestCode; | ||
|
||
@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, action = Action.GET) | ||
public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add resourceType for ACL
@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, resource = ResourceType.CLUSTER, action = Action.GET)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
@Override | ||
public Options buildCommandlineOptions(Options options) { | ||
Option opt = new Option("t", "taskName", true, "The name of the asynchronous task"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe need [-b] to choose single broker and [-n] [-c] to collect status from all brokers in name server or cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public class AdminAsyncTaskManager { | ||
|
||
// taskId -> AsyncTask | ||
private static final Map<String, AsyncTask> ASYNC_TASK_MAP = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not use static, it may conflict with other design, use object and make sure to be used only in admin Process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import java.util.UUID; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
public class AdminAsyncTaskManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if using map to manage task, maybe need a thread to clean up the expired task regularly. Using Caffeine LoadingCache may be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -487,11 +494,14 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re | |||
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) { | |||
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); | |||
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); | |||
String taskId = AdminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use CompletableFuture when complete to record task success or error, you can Change this runable to a future and add future arg in createTask like this :
CompletableFuture future = ......
adminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress", future);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
2104751
to
bf4a370
Compare
public AdminAsyncTaskManager() { | ||
this.asyncTaskCache = Caffeine.newBuilder() | ||
.expireAfterWrite(30, TimeUnit.MINUTES) | ||
.maximumSize(10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- add removalListener to clean dirty data in taskNameToIdsMap
- make expire time and max size configurable, expire time default one day may be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
try { | ||
CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request); | ||
LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult)); | ||
return doCheckRocksdbCqWriteProgress(ctx, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
AsyncTask task = new AsyncTask(taskName, taskId, future); | ||
|
||
asyncTaskCache.put(taskId, task); | ||
taskNameToIdsMap.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure [ add and remove object in ArrayList ] in map.compute to avoid concurrent errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
asyncExecuteWorker.submit(runnable); | ||
}, asyncExecuteWorker); | ||
|
||
asyncTaskManager.createTask("checkRocksdbCqWriteProgress", future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return task id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, resource = ResourceType.CLUSTER, action = Action.GET) | ||
public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader { | ||
|
||
private String taskName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provides a method to accurately query a task
private String taskId;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { | ||
final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class); | ||
List<String> taskIds = asyncTaskManager.getTaskIdsByName(requestHeader.getTaskName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the order of the task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return the status of the newly created task first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
modified: broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java modified: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java modified: common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java modified: remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java modified: tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java modified: broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java modified: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java modified: common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java modified: remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java modified: tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java modified: broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java modified: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java modified: common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java modified: remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java modified: tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java modified: broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java modified: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java modified: common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java modified: remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java modified: tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java
Which Issue(s) This PR Fixes
Fixes #9097
Brief Description
How Did You Test This Change?