-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add TaskCountStatsMonitor to monitor task count stats #6657
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@QiuMM thanks for the PR. I left some comments.
@@ -100,6 +103,11 @@ | |||
|
|||
private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); | |||
|
|||
private final Map<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>(); | |||
private final Map<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>(); | |||
private Map<String, Long> prevSuccessfulTaskCount = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about prevTotalSuccessfulTaskCount
? Same for prevFailedTaskCount
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
if (status.isSuccess()) { | ||
totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()); | ||
totalSuccessfulTaskCount.get(task.getDataSource()).incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
computeIfAbsent()
returns the proper instance of AtomicLong
, so you can just call incrementAndGet()
directly on that instance like below. This is better because we can avoid unnecessary get()
.
totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
.incrementAndGet();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good, done.
totalSuccessfulTaskCount.get(task.getDataSource()).incrementAndGet(); | ||
} else { | ||
totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()); | ||
totalFailedTaskCount.get(task.getDataSource()).incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good, done.
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
/** | ||
* Encapsulates the indexer leadership lifecycle. | ||
*/ | ||
public class TaskMaster | ||
public class TaskMaster implements TaskCountStatsProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does TaskQueue
also need to be TaskCountStatsProvider
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, TaskCountStatsProvider
is bind to TaskMaster
, there is no need to let TaskQueue
be TaskCountStatsProvider
.
|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| | ||
|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| | ||
|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| | ||
|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add a metric for waiting tasks? It will be useful too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good, done.
@@ -586,4 +602,36 @@ private void syncFromStorage() | |||
return rv; | |||
} | |||
|
|||
private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev) | |||
{ | |||
return total.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the line width. The max line is 120 (https://github.com/apache/incubator-druid/blob/master/eclipse_formatting.xml#L96). Same for the below methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good, done.
|
||
import java.util.Map; | ||
|
||
public class TaskCountStatsMonitor extends AbstractMonitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a unit test? I think it should verify the values emitted from this monitor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @QiuMM. Please consider my last comment.
@Override | ||
public Map<String, Long> getSuccessfulTaskCount() | ||
{ | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fill all metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @QiuMM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. The new metrics look useful and they are pretty low footprint, once every monitor period.
Have tested in real cluster and now running in production.