-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add worker status and duration metrics in live and task reports #15180
Changes from 8 commits
613023c
3a23d03
4804c2a
d6dc8d9
50a0eb7
2a8024b
ea490fd
23497f3
e98c4f6
7381596
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
package org.apache.druid.msq.indexing; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.google.common.util.concurrent.SettableFuture; | ||
|
@@ -47,6 +48,8 @@ | |
|
||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
|
@@ -55,6 +58,7 @@ | |
import java.util.Map; | ||
import java.util.OptionalLong; | ||
import java.util.Set; | ||
import java.util.TreeMap; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
@@ -111,7 +115,7 @@ private enum State | |
// Mutable state accessible only to the main loop. LinkedHashMap since order of key set matters. Tasks are added | ||
// here once they are submitted for running, but before they are fully started up. | ||
// taskId -> taskTracker | ||
private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>(); | ||
private final Map<String, TaskTracker> taskTrackers = Collections.synchronizedMap(new LinkedHashMap<>()); | ||
|
||
// Set of tasks which are issued a cancel request by the controller. | ||
private final Set<String> canceledWorkerTasks = ConcurrentHashMap.newKeySet(); | ||
|
@@ -348,6 +352,66 @@ public boolean isTaskLatest(String taskId) | |
} | ||
} | ||
|
||
public static class WorkerStats | ||
{ | ||
String workerId; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets mark these field final ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't - because of the default constructor. |
||
TaskState state; | ||
long duration; | ||
|
||
/** | ||
* For JSON deserialization only | ||
*/ | ||
public WorkerStats() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We actually want serialization only rite ? so this method can go no ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Many tests in SQLMSQStatementResourceTypeTest fail as they require deserialization -- such as |
||
{ | ||
} | ||
|
||
public WorkerStats(String workerId, TaskState state, long duration) | ||
{ | ||
this.workerId = workerId; | ||
this.state = state; | ||
this.duration = duration; | ||
} | ||
|
||
@JsonProperty | ||
public String getWorkerId() | ||
{ | ||
return workerId; | ||
} | ||
|
||
@JsonProperty | ||
public TaskState getState() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also document these properties in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added these properties. |
||
{ | ||
return state; | ||
} | ||
|
||
@JsonProperty("durationMs") | ||
public long getDuration() | ||
{ | ||
return duration; | ||
} | ||
} | ||
|
||
public Map<Integer, List<WorkerStats>> getWorkerStats() | ||
{ | ||
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>(); | ||
|
||
for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
So we should either make TaskTracker thread safe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! Now wrapped in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets use a concurrentHashMap ?
Since its every easy to use s.iteratator() no ? |
||
|
||
TaskTracker taskTracker = taskEntry.getValue(); | ||
|
||
workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>()) | ||
.add(new WorkerStats(taskEntry.getKey(), | ||
taskTracker.status.getStatusCode(), | ||
taskTracker.status.getDuration() | ||
)); | ||
} | ||
|
||
for (List<WorkerStats> workerStatsList : workerStats.values()) { | ||
workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId)); | ||
} | ||
return workerStats; | ||
} | ||
|
||
private void mainLoop() | ||
{ | ||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets update the comments as well.