Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE#9097] Add new command to check async task status in broker. #9162

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from

Conversation

KiteSoar
Copy link
Contributor

Which Issue(s) This PR Fixes

Fixes #9097

Brief Description

How Did You Test This Change?

@KiteSoar KiteSoar force-pushed the develop-#9097 branch 5 times, most recently from 53d1b72 to a4a7703 Compare February 1, 2025 08:27
@codecov-commenter
Copy link

codecov-commenter commented Feb 1, 2025

Codecov Report

Attention: Patch coverage is 35.52632% with 147 lines in your changes missing coverage. Please review.

Project coverage is 48.04%. Comparing base (de4e48d) to head (bf4a370).
Report is 4 commits behind head on develop.

Files with missing lines Patch % Lines
...ain/java/org/apache/rocketmq/common/AsyncTask.java 0.00% 28 Missing ⚠️
.../apache/rocketmq/broker/AdminAsyncTaskManager.java 20.58% 27 Missing ⚠️
...ocketmq/broker/processor/AdminBrokerProcessor.java 4.54% 21 Missing ⚠️
.../command/stats/CheckAsyncTaskStatusSubCommand.java 77.65% 14 Missing and 7 partials ⚠️
...ocol/header/CheckAsyncTaskStatusRequestHeader.java 0.00% 17 Missing ⚠️
...col/header/CheckAsyncTaskStatusResponseHeader.java 0.00% 14 Missing ⚠️
...in/java/org/apache/rocketmq/common/TaskStatus.java 0.00% 11 Missing ⚠️
...g/apache/rocketmq/client/impl/MQClientAPIImpl.java 0.00% 6 Missing ⚠️
...apache/rocketmq/tools/admin/DefaultMQAdminExt.java 0.00% 1 Missing ⚠️
...he/rocketmq/tools/admin/DefaultMQAdminExtImpl.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             develop    #9162      +/-   ##
=============================================
- Coverage      48.11%   48.04%   -0.07%     
- Complexity     12091    12112      +21     
=============================================
  Files           1321     1327       +6     
  Lines          93024    93263     +239     
  Branches       11926    11951      +25     
=============================================
+ Hits           44755    44807      +52     
- Misses         42754    42920     +166     
- Partials        5515     5536      +21     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

import org.apache.rocketmq.remoting.protocol.RequestCode;

@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, action = Action.GET)
public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add resourceType for ACL

@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, resource = ResourceType.CLUSTER, action = Action.GET)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "taskName", true, "The name of the asynchronous task");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe need [-b] to choose single broker and [-n] [-c] to collect status from all brokers in name server or cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public class AdminAsyncTaskManager {

// taskId -> AsyncTask
private static final Map<String, AsyncTask> ASYNC_TASK_MAP = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not use static, it may conflict with other design, use object and make sure to be used only in admin Process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class AdminAsyncTaskManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if using map to manage task, maybe need a thread to clean up the expired task regularly. Using Caffeine LoadingCache may be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -487,11 +494,14 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) {
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
String taskId = AdminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use CompletableFuture when complete to record task success or error, you can Change this runable to a future and add future arg in createTask like this :

CompletableFuture future = ......
adminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress", future);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public AdminAsyncTaskManager() {
this.asyncTaskCache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.MINUTES)
.maximumSize(10000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. add removalListener to clean dirty data in taskNameToIdsMap
  2. make expire time and max size configurable, expire time default one day may be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request);
LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult));
return doCheckRocksdbCqWriteProgress(ctx, request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

AsyncTask task = new AsyncTask(taskName, taskId, future);

asyncTaskCache.put(taskId, task);
taskNameToIdsMap.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure [ add and remove object in ArrayList ] in map.compute to avoid concurrent errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

asyncExecuteWorker.submit(runnable);
}, asyncExecuteWorker);

asyncTaskManager.createTask("checkRocksdbCqWriteProgress", future);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return task id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, resource = ResourceType.CLUSTER, action = Action.GET)
public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader {

private String taskName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provides a method to accurately query a task

private String taskId;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class);
List<String> taskIds = asyncTaskManager.getTaskIdsByName(requestHeader.getTaskName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the order of the task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return the status of the newly created task first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

	modified:   broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java
	modified:   broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
	modified:   common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
	modified:   remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java
	modified:   tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
	modified:   tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java
	modified:   tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java

	modified:   broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java
	modified:   broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
	modified:   common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
	modified:   remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java
	modified:   tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
	modified:   tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java
	modified:   tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java

	modified:   broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java
	modified:   broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
	modified:   common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
	modified:   remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java
	modified:   tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
	modified:   tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java
	modified:   tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java

	modified:   broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java
	modified:   broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
	modified:   common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
	modified:   remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java
	modified:   tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
	modified:   tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java
	modified:   tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Enhancement] Add new command to check async task status in broker
3 participants