Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker category dimension #11554

Merged
merged 10 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|

## Shuffle metrics (Native parallel task)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },

"taskSlot/total/count" : { "dimensions" : [], "type" : "gauge" },
"taskSlot/idle/count" : { "dimensions" : [], "type" : "gauge" },
"taskSlot/busy/count" : { "dimensions" : [], "type" : "gauge" },
"taskSlot/lazy/count" : { "dimensions" : [], "type" : "gauge" },
"taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "gauge" },
"taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it looks like other emitters have files like this too:

should they be updated as well after this change? I didn't look super closely at their contents, and i'm not sure how consistent they actually are with each other since I'm not sure they are currently emitting these metrics at all... but this seemed worth discussing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #11958.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is worth discussing, but I think we can do it in #11958.

"taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" },

"task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
"segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class ForkingTaskRunner
private final ListeningExecutorService exec;
private final PortFinder portFinder;
private final StartupLoggingConfig startupLoggingConfig;
private final WorkerConfig workerConfig;

private volatile boolean stopping = false;

Expand All @@ -121,6 +123,7 @@ public ForkingTaskRunner(
this.node = node;
this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts());
this.startupLoggingConfig = startupLoggingConfig;
this.workerConfig = workerConfig;
this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d")
);
Expand Down Expand Up @@ -669,7 +672,15 @@ String getMaskedCommand(List<String> maskedProperties, List<String> command)
}

@Override
public long getTotalTaskSlotCount()
public Map<String, Long> getTotalTaskSlotCount()
{
if (config.getPorts() != null && !config.getPorts().isEmpty()) {
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size()));
}
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1));
}

public long getTotalTaskSlotCountLong()
{
if (config.getPorts() != null && !config.getPorts().isEmpty()) {
return config.getPorts().size();
Expand All @@ -678,27 +689,32 @@ public long getTotalTaskSlotCount()
}

@Override
public long getIdleTaskSlotCount()
public Map<String, Long> getIdleTaskSlotCount()
{
return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0);
return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0));
}

@Override
public long getUsedTaskSlotCount()
public Map<String, Long> getUsedTaskSlotCount()
{
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(portFinder.findUsedPortCount()));
}

public long getUsedTaskSlotCountLong()
{
return portFinder.findUsedPortCount();
}

@Override
public long getLazyTaskSlotCount()
public Map<String, Long> getLazyTaskSlotCount()
{
return 0;
return ImmutableMap.of(workerConfig.getCategory(), 0L);
}

@Override
public long getBlacklistedTaskSlotCount()
public Map<String, Long> getBlacklistedTaskSlotCount()
{
return 0;
return ImmutableMap.of(workerConfig.getCategory(), 0L);
}

protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -1514,55 +1515,80 @@ Map<String, String> getWorkersWithUnacknowledgedTask()
}

@Override
public long getTotalTaskSlotCount()
public Map<String, Long> getTotalTaskSlotCount()
{
long totalPeons = 0;
Map<String, Long> totalPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkers()) {
totalPeons += worker.getWorker().getCapacity();
String workerCategory = worker.getWorker().getCategory();
int workerCapacity = worker.getWorker().getCapacity();
totalPeons.compute(
workerCategory,
(category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity
);
}

return totalPeons;
}

@Override
public long getIdleTaskSlotCount()
public Map<String, Long> getIdleTaskSlotCount()
{
long totalIdlePeons = 0;
Map<String, Long> totalIdlePeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
totalIdlePeons += worker.getAvailableCapacity();
String workerCategory = worker.getWorker().getCategory();
int workerAvailableCapacity = worker.getAvailableCapacity();
totalIdlePeons.compute(
workerCategory,
(category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity
);
}

return totalIdlePeons;
}

@Override
public long getUsedTaskSlotCount()
public Map<String, Long> getUsedTaskSlotCount()
{
long totalUsedPeons = 0;
Map<String, Long> totalUsedPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkers()) {
totalUsedPeons += worker.getCurrCapacityUsed();
String workerCategory = worker.getWorker().getCategory();
int workerUsedCapacity = worker.getCurrCapacityUsed();
totalUsedPeons.compute(
workerCategory,
(category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity
);
}

return totalUsedPeons;
}

@Override
public long getLazyTaskSlotCount()
public Map<String, Long> getLazyTaskSlotCount()
{
long totalLazyPeons = 0;
Map<String, Long> totalLazyPeons = new HashMap<>();
for (Worker worker : getLazyWorkers()) {
totalLazyPeons += worker.getCapacity();
String workerCategory = worker.getCategory();
int workerLazyPeons = worker.getCapacity();
totalLazyPeons.compute(
workerCategory,
(category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons
);
}

return totalLazyPeons;
}

@Override
public long getBlacklistedTaskSlotCount()
public Map<String, Long> getBlacklistedTaskSlotCount()
{
long totalBlacklistedPeons = 0;
Map<String, Long> totalBlacklistedPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
totalBlacklistedPeons += worker.getWorker().getCapacity();
String workerCategory = worker.getWorker().getCategory();
int workerBlacklistedPeons = worker.getWorker().getCapacity();
totalBlacklistedPeons.compute(
workerCategory,
(category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons
);
}

return totalBlacklistedPeons;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -334,34 +335,39 @@ public Optional<ScalingStats> getScalingStats()
return Optional.absent();
}

/* This method should be never called in peons */
@Override
public long getTotalTaskSlotCount()
public Map<String, Long> getTotalTaskSlotCount()
{
return 1;
throw new UnsupportedOperationException();
}

/* This method should be never called in peons */
@Override
public long getIdleTaskSlotCount()
public Map<String, Long> getIdleTaskSlotCount()
{
return runningItem == null ? 1 : 0;
throw new UnsupportedOperationException();
}

/* This method should be never called in peons */
@Override
public long getUsedTaskSlotCount()
public Map<String, Long> getUsedTaskSlotCount()
{
return runningItem == null ? 0 : 1;
throw new UnsupportedOperationException();
}

/* This method should be never called in peons */
@Override
public long getLazyTaskSlotCount()
public Map<String, Long> getLazyTaskSlotCount()
{
return 0;
throw new UnsupportedOperationException();
}

/* This method should be never called in peons */
@Override
public long getBlacklistedTaskSlotCount()
public Map<String, Long> getBlacklistedTaskSlotCount()
{
return 0;
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private void gracefulStopLeaderLifecycle()

@Override
@Nullable
public Long getTotalTaskSlotCount()
public Map<String, Long> getTotalTaskSlotCount()
{
Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) {
Expand All @@ -358,7 +358,7 @@ public Long getTotalTaskSlotCount()

@Override
@Nullable
public Long getIdleTaskSlotCount()
public Map<String, Long> getIdleTaskSlotCount()
{
Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) {
Expand All @@ -370,7 +370,7 @@ public Long getIdleTaskSlotCount()

@Override
@Nullable
public Long getUsedTaskSlotCount()
public Map<String, Long> getUsedTaskSlotCount()
{
Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) {
Expand All @@ -382,7 +382,7 @@ public Long getUsedTaskSlotCount()

@Override
@Nullable
public Long getLazyTaskSlotCount()
public Map<String, Long> getLazyTaskSlotCount()
{
Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) {
Expand All @@ -394,7 +394,7 @@ public Long getLazyTaskSlotCount()

@Override
@Nullable
public Long getBlacklistedTaskSlotCount()
public Map<String, Long> getBlacklistedTaskSlotCount()
{
Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

/**
Expand Down Expand Up @@ -124,14 +125,14 @@ default TaskLocation getTaskLocation(String taskId)

/**
* APIs useful for emitting statistics for @TaskSlotCountStatsMonitor
*/
long getTotalTaskSlotCount();
*/
Map<String, Long> getTotalTaskSlotCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use Object2LongMap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I'm returning ImmutableMap from methods where there will be only one entry, I can't use Object2LongMap without changing return values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. My suggestion is changing all return type here including all their implementations. For singleton map, you can use Object2LongMaps.singleton(). But this comment is nit because this api is not performance critical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's okay I'm not planning to address this change in this PR.


long getIdleTaskSlotCount();
Map<String, Long> getIdleTaskSlotCount();

long getUsedTaskSlotCount();
Map<String, Long> getUsedTaskSlotCount();

long getLazyTaskSlotCount();
Map<String, Long> getLazyTaskSlotCount();

long getBlacklistedTaskSlotCount();
Map<String, Long> getBlacklistedTaskSlotCount();
}
Loading