From 7bab5a599994ec86d330bf4237a30652b3cd9965 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Wed, 14 Jul 2021 14:11:50 -0700 Subject: [PATCH 01/10] Add worker category as dimension in TaskSlotCountStatsMonitor --- .../indexing/overlord/ForkingTaskRunner.java | 25 +++++----- .../indexing/overlord/RemoteTaskRunner.java | 41 +++++++++++------ .../overlord/SingleTaskBackgroundRunner.java | 23 ++++++---- .../druid/indexing/overlord/TaskMaster.java | 10 ++-- .../druid/indexing/overlord/TaskRunner.java | 13 +++--- .../overlord/ThreadingTaskRunner.java | 24 ++++++---- .../overlord/hrtr/HttpRemoteTaskRunner.java | 41 +++++++++++------ .../common/task/IngestionTestBase.java | 11 +++-- .../overlord/RemoteTaskRunnerTest.java | 35 +++++++------- .../indexing/overlord/TaskQueueTest.java | 21 +++++---- .../indexing/overlord/TestTaskRunner.java | 10 ++-- .../hrtr/HttpRemoteTaskRunnerTest.java | 46 +++++++++---------- .../indexing/overlord/http/OverlordTest.java | 12 +++-- .../metrics/TaskSlotCountStatsMonitor.java | 11 +++-- .../metrics/TaskSlotCountStatsProvider.java | 12 +++-- .../TaskSlotCountStatsMonitorTest.java | 23 ++++++---- 16 files changed, 203 insertions(+), 155 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 49cae238f28a..9e458aa1cc7d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -669,36 +670,38 @@ String getMaskedCommand(List maskedProperties, List command) } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { if (config.getPorts() != null && !config.getPorts().isEmpty()) { - return config.getPorts().size(); + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, Long.valueOf(config.getPorts().size())); } - return config.getEndPort() - config.getStartPort() + 1; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, Long.valueOf(config.getEndPort() - config.getStartPort() + 1)); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0); + Map totalTaskSlots = getTotalTaskSlotCount(); + Map usedTaskSlots = getUsedTaskSlotCount(); + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, Math.max(totalTaskSlots.get(WorkerConfig.DEFAULT_CATEGORY) - usedTaskSlots.get(WorkerConfig.DEFAULT_CATEGORY), 0)); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - return portFinder.findUsedPortCount(); + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, Long.valueOf(portFinder.findUsedPortCount())); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index de0713bc5062..8443db815988 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -95,6 +95,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -1514,55 +1515,65 @@ Map getWorkersWithUnacknowledgedTask() } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - long totalPeons = 0; + Map totalPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { - totalPeons += worker.getWorker().getCapacity(); + String workerCategory = worker.getWorker().getCategory(); + Integer workerCapacity = worker.getWorker().getCapacity(); + totalPeons.put(workerCategory, totalPeons.getOrDefault(workerCategory, 0L) + workerCapacity); } return totalPeons; } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - long totalIdlePeons = 0; + Map totalIdlePeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { - totalIdlePeons += worker.getAvailableCapacity(); + String workerCategory = worker.getWorker().getCategory(); + Integer workerAvailableCapacity = worker.getAvailableCapacity(); + totalIdlePeons.put(workerCategory, totalIdlePeons.getOrDefault(workerCategory, 0L) + workerAvailableCapacity); } return totalIdlePeons; } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - long totalUsedPeons = 0; + Map totalUsedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { - totalUsedPeons += worker.getCurrCapacityUsed(); + String workerCategory = worker.getWorker().getCategory(); + Integer workerUsedCapacity = worker.getCurrCapacityUsed(); + totalUsedPeons.put(workerCategory, totalUsedPeons.getOrDefault(workerCategory, 0L) + workerUsedCapacity); } return totalUsedPeons; } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - long totalLazyPeons = 0; + Map totalLazyPeons = new HashMap<>(); for (Worker worker : getLazyWorkers()) { - totalLazyPeons += worker.getCapacity(); + String workerCategory = worker.getCategory(); + Integer workerLazyPeons = worker.getCapacity(); + totalLazyPeons.put(workerCategory, totalLazyPeons.getOrDefault(workerCategory, 0L) + workerLazyPeons); } return totalLazyPeons; } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - long totalBlacklistedPeons = 0; + Map totalBlacklistedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { - totalBlacklistedPeons += worker.getWorker().getCapacity(); + String workerCategory = worker.getWorker().getCategory(); + Integer workerBlacklistedPeons = worker.getWorker().getCapacity(); + totalBlacklistedPeons.put(workerCategory, totalBlacklistedPeons.getOrDefault(workerCategory, 0L) + workerBlacklistedPeons); } return totalBlacklistedPeons; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index 24dba4f65204..0328de161c55 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -22,6 +22,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -35,6 +36,7 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Numbers; @@ -59,6 +61,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -335,33 +338,33 @@ public Optional getScalingStats() } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - return 1; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 1L); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return runningItem == null ? 1 : 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, runningItem == null ? 1L : 0L); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - return runningItem == null ? 0 : 1; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, runningItem == null ? 0L : 1L); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 54287ed14227..5825a2fb03f3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -346,7 +346,7 @@ private void gracefulStopLeaderLifecycle() @Override @Nullable - public Long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { @@ -358,7 +358,7 @@ public Long getTotalTaskSlotCount() @Override @Nullable - public Long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { @@ -370,7 +370,7 @@ public Long getIdleTaskSlotCount() @Override @Nullable - public Long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { @@ -382,7 +382,7 @@ public Long getUsedTaskSlotCount() @Override @Nullable - public Long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { @@ -394,7 +394,7 @@ public Long getLazyTaskSlotCount() @Override @Nullable - public Long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index 8623e820b2be..5cbdabb3edad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -33,6 +33,7 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; /** @@ -124,14 +125,14 @@ default TaskLocation getTaskLocation(String taskId) /** * APIs useful for emitting statistics for @TaskSlotCountStatsMonitor - */ - long getTotalTaskSlotCount(); + */ + Map getTotalTaskSlotCount(); - long getIdleTaskSlotCount(); + Map getIdleTaskSlotCount(); - long getUsedTaskSlotCount(); + Map getUsedTaskSlotCount(); - long getLazyTaskSlotCount(); + Map getLazyTaskSlotCount(); - long getBlacklistedTaskSlotCount(); + Map getBlacklistedTaskSlotCount(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java index 84a414ce6a32..39bba7059a48 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Futures; @@ -64,6 +65,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -455,33 +457,35 @@ public Optional getScalingStats() } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - return workerConfig.getCapacity(); + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(workerConfig.getCapacity())); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0); + Map totalTaskSlots = getTotalTaskSlotCount(); + Map usedTaskSlots = getTotalTaskSlotCount(); + return ImmutableMap.of(workerConfig.getCategory(), Math.max(totalTaskSlots.get(workerConfig.getCategory()) - usedTaskSlots.get(workerConfig.getCategory()), 0)); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - return getRunningTasks().size(); + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(getRunningTasks().size())); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 0; + return ImmutableMap.of(workerConfig.getCategory(), 0L); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 0; + return ImmutableMap.of(workerConfig.getCategory(), 0L); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 59085a27afe7..2487b68fc8fb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -90,6 +90,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -1628,55 +1629,65 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - long totalPeons = 0; + Map totalPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { - totalPeons += worker.getWorker().getCapacity(); + String workerCategory = worker.getWorker().getCategory(); + Integer workerCapacity = worker.getWorker().getCapacity(); + totalPeons.put(workerCategory, totalPeons.getOrDefault(workerCategory, 0L) + workerCapacity); } return totalPeons; } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - long totalIdlePeons = 0; + Map totalIdlePeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { - totalIdlePeons += worker.getAvailableCapacity(); + String workerCategory = worker.getWorker().getCategory(); + Integer workerAvailableCapacity = worker.getAvailableCapacity(); + totalIdlePeons.put(workerCategory, totalIdlePeons.getOrDefault(workerCategory, 0L) + workerAvailableCapacity); } return totalIdlePeons; } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - long totalUsedPeons = 0; + Map totalUsedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { - totalUsedPeons += worker.getCurrCapacityUsed(); + String workerCategory = worker.getWorker().getCategory(); + Integer workerUsedCapacity = worker.getCurrCapacityUsed(); + totalUsedPeons.put(workerCategory, totalUsedPeons.getOrDefault(workerCategory, 0L) + workerUsedCapacity); } return totalUsedPeons; } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - long totalLazyPeons = 0; + Map totalLazyPeons = new HashMap<>(); for (Worker worker : getLazyWorkers()) { - totalLazyPeons += worker.getCapacity(); + String workerCategory = worker.getCategory(); + Integer workerLazyPeons = worker.getCapacity(); + totalLazyPeons.put(workerCategory, totalLazyPeons.getOrDefault(workerCategory, 0L) + workerLazyPeons); } return totalLazyPeons; } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - long totalBlacklistedPeons = 0; + Map totalBlacklistedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { - totalBlacklistedPeons += worker.getWorker().getCapacity(); + String workerCategory = worker.getWorker().getCategory(); + Integer workerBlacklistedPeons = worker.getWorker().getCapacity(); + totalBlacklistedPeons.put(workerCategory, totalBlacklistedPeons.getOrDefault(workerCategory, 0L) + workerBlacklistedPeons); } return totalBlacklistedPeons; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 81a6bcc6e9a5..f2cdf5c320e4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -82,6 +82,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; @@ -405,31 +406,31 @@ public Optional getScalingStats() } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { throw new UnsupportedOperationException(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 589c6e1f62f3..845666966ef7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -107,9 +108,9 @@ public void testRun() throws Exception { doSetup(); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); ListenableFuture result = remoteTaskRunner.run(task); @@ -124,9 +125,9 @@ public void testRun() throws Exception cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId())); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); } @Test @@ -437,8 +438,8 @@ public void testRunWithTaskComplete() throws Exception public void testWorkerRemoved() throws Exception { doSetup(); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); Future future = remoteTaskRunner.run(task); @@ -471,8 +472,8 @@ public boolean isValid() ); Assert.assertNull(cf.checkExists().forPath(STATUS_PATH)); - Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount()); + Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); } @Test @@ -677,9 +678,9 @@ public boolean apply(ImmutableWorkerInfo input) ); Assert.assertEquals(1, lazyworkers.size()); Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size()); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); } @Test @@ -990,12 +991,12 @@ public void testSuccessfulTaskOnBlacklistedWorker() throws Exception mockWorkerCompleteFailedTask(task1); Assert.assertTrue(taskFuture1.get().isFailure()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); + Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); Future taskFuture2 = remoteTaskRunner.run(task2); Assert.assertTrue(taskAnnounced(task2.getId())); mockWorkerRunningTask(task2); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); + Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); Future taskFuture3 = remoteTaskRunner.run(task3); Assert.assertTrue(taskAnnounced(task3.getId())); @@ -1003,12 +1004,12 @@ public void testSuccessfulTaskOnBlacklistedWorker() throws Exception mockWorkerCompleteFailedTask(task3); Assert.assertTrue(taskFuture3.get().isFailure()); Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); mockWorkerCompleteSuccessfulTask(task2); Assert.assertTrue(taskFuture2.get().isSuccess()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); + Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 0b3d8728a61e..dc5fc0e1c6a1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -493,33 +494,33 @@ public Optional getScalingStats() } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java index df293f0bcda6..3a5df86a294f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java @@ -273,31 +273,31 @@ public Optional getScalingStats() } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { throw new UnsupportedOperationException(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 7eeb50ee0605..dd87e6f5fb86 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -864,9 +864,9 @@ protected WorkerHolder createWorkerHolder( taskRunner.start(); - Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); AtomicInteger ticks = new AtomicInteger(); @@ -910,9 +910,9 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1)); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); taskRunner.run(task1); @@ -920,9 +920,9 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo Thread.sleep(100); } - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", false, 8080, null, true, false), @@ -952,9 +952,9 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2)); - Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); taskRunner.run(task2); @@ -962,9 +962,9 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo Thread.sleep(100); } - Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( new DruidNode("service", "host3", false, 8080, null, true, false), @@ -994,10 +994,10 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3)); - Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount()); + Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); @@ -1008,10 +1008,10 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo .getHost() ); - Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount()); + Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); } /* diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 108e2c88b875..e79c3b79f3dc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -80,9 +80,11 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; + import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -455,31 +457,31 @@ public Optional getScalingStats() } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java index d8accc8892d5..3431e1e143b2 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; +import java.util.Map; + public class TaskSlotCountStatsMonitor extends AbstractMonitor { private final TaskSlotCountStatsProvider statsProvider; @@ -47,11 +49,14 @@ public boolean doMonitor(ServiceEmitter emitter) return true; } - private void emit(ServiceEmitter emitter, String key, Long count) + private void emit(ServiceEmitter emitter, String key, Map counts) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - if (count != null) { - emitter.emit(builder.build(key, count.longValue())); + if (counts != null) { + counts.forEach((k, v) -> { + builder.setDimension("category", k); + emitter.emit(builder.build(key, v)); + }); } } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java index eb46fa5c2f1a..e7a7249b10ad 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java @@ -21,35 +21,37 @@ import javax.annotation.Nullable; +import java.util.Map; + public interface TaskSlotCountStatsProvider { /** * Return the number of total task slots during emission period. */ @Nullable - Long getTotalTaskSlotCount(); + Map getTotalTaskSlotCount(); /** * Return the number of idle task slots during emission period. */ @Nullable - Long getIdleTaskSlotCount(); + Map getIdleTaskSlotCount(); /** * Return the number of used task slots during emission period. */ @Nullable - Long getUsedTaskSlotCount(); + Map getUsedTaskSlotCount(); /** * Return the total number of task slots in lazy marked middlemanagers and indexers during emission period. */ @Nullable - Long getLazyTaskSlotCount(); + Map getLazyTaskSlotCount(); /** * Return the total number of task slots in blacklisted middlemanagers and indexers during emission period. */ @Nullable - Long getBlacklistedTaskSlotCount(); + Map getBlacklistedTaskSlotCount(); } diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java index 2c5c52b99e56..be33a4c98b68 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java @@ -19,11 +19,14 @@ package org.apache.druid.server.metrics; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Map; + public class TaskSlotCountStatsMonitorTest { private TaskSlotCountStatsProvider statsProvider; @@ -34,33 +37,33 @@ public void setUp() statsProvider = new TaskSlotCountStatsProvider() { @Override - public Long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } @Override - public Long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } @Override - public Long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } @Override - public Long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } @Override - public Long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } }; } From 78de1b47ff74ee54c2f6d623e31dc62a64011c25 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Wed, 14 Jul 2021 14:19:37 -0700 Subject: [PATCH 02/10] Change description --- docs/operations/metrics.md | 10 +++++----- .../src/main/resources/defaultMetricDimensions.json | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 2d863d4e1442..916c913ae8c4 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -203,11 +203,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| -|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| -|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| -|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| -|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| -|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| +|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| ## Shuffle metrics (Native parallel task) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 298b440e5209..8017725d82ed 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -63,11 +63,11 @@ "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, - "taskSlot/total/count" : { "dimensions" : [], "type" : "gauge" }, - "taskSlot/idle/count" : { "dimensions" : [], "type" : "gauge" }, - "taskSlot/busy/count" : { "dimensions" : [], "type" : "gauge" }, - "taskSlot/lazy/count" : { "dimensions" : [], "type" : "gauge" }, - "taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "gauge" }, + "taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" }, "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" }, "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" }, From 1a78f60b74b20bf6c00a92c8fadddffb4f3416b5 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Tue, 27 Jul 2021 14:17:41 -0700 Subject: [PATCH 03/10] Add workerConfig as field --- .../druid/indexing/overlord/ForkingTaskRunner.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 9e458aa1cc7d..6a74833620ae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -100,6 +100,7 @@ public class ForkingTaskRunner private final ListeningExecutorService exec; private final PortFinder portFinder; private final StartupLoggingConfig startupLoggingConfig; + private final WorkerConfig workerConfig; private volatile boolean stopping = false; @@ -122,6 +123,7 @@ public ForkingTaskRunner( this.node = node; this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts()); this.startupLoggingConfig = startupLoggingConfig; + this.workerConfig = workerConfig; this.exec = MoreExecutors.listeningDecorator( Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d") ); @@ -673,9 +675,9 @@ String getMaskedCommand(List maskedProperties, List command) public Map getTotalTaskSlotCount() { if (config.getPorts() != null && !config.getPorts().isEmpty()) { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, Long.valueOf(config.getPorts().size())); + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size())); } - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, Long.valueOf(config.getEndPort() - config.getStartPort() + 1)); + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1)); } @Override @@ -683,25 +685,25 @@ public Map getIdleTaskSlotCount() { Map totalTaskSlots = getTotalTaskSlotCount(); Map usedTaskSlots = getUsedTaskSlotCount(); - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, Math.max(totalTaskSlots.get(WorkerConfig.DEFAULT_CATEGORY) - usedTaskSlots.get(WorkerConfig.DEFAULT_CATEGORY), 0)); + return ImmutableMap.of(workerConfig.getCategory(), Math.max(totalTaskSlots.get(workerConfig.getCategory()) - usedTaskSlots.get(workerConfig.getCategory()), 0)); } @Override public Map getUsedTaskSlotCount() { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, Long.valueOf(portFinder.findUsedPortCount())); + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(portFinder.findUsedPortCount())); } @Override public Map getLazyTaskSlotCount() { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); + return ImmutableMap.of(workerConfig.getCategory(), 0L); } @Override public Map getBlacklistedTaskSlotCount() { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); + return ImmutableMap.of(workerConfig.getCategory(), 0L); } protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem From 6c80b1fb9c5fccb39733ce1bc3637849019e6595 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Tue, 27 Jul 2021 16:01:28 -0700 Subject: [PATCH 04/10] Modify HttpRemoteTaskRunnerTest to test worker category in taskslot metrics --- .../hrtr/HttpRemoteTaskRunnerTest.java | 66 +++++++++++-------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index dd87e6f5fb86..1dae0d1da4a0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -864,9 +864,9 @@ protected WorkerHolder createWorkerHolder( taskRunner.start(); - Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); AtomicInteger ticks = new AtomicInteger(); @@ -874,7 +874,7 @@ protected WorkerHolder createWorkerHolder( new DruidNode("service", "host1", false, 8080, null, true, false), NodeRole.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", WorkerConfig.DEFAULT_CATEGORY) + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", "c1") ) ); @@ -910,9 +910,9 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1)); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); taskRunner.run(task1); @@ -920,16 +920,16 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo Thread.sleep(100); } - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", false, 8080, null, true, false), NodeRole.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, - new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY) + new WorkerNodeService("ip2", 1, "0", "c2") ) ); @@ -952,9 +952,12 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2)); - Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c2", 0L).longValue()); taskRunner.run(task2); @@ -962,16 +965,19 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo Thread.sleep(100); } - Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c2", 0L).longValue()); DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( new DruidNode("service", "host3", false, 8080, null, true, false), NodeRole.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, - new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY) + new WorkerNodeService("ip2", 1, "0", "c1") ) ); @@ -994,10 +1000,14 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3)); - Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount().getOrDefault("c2", 0L).longValue()); Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); @@ -1008,10 +1018,14 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo .getHost() ); - Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount().getOrDefault("c2", 0L).longValue()); } /* From 4514a803ab94853a6defb923a6f39edff02ea668 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Tue, 3 Aug 2021 00:54:43 -0700 Subject: [PATCH 05/10] Fixing tests --- .../overlord/RemoteTaskRunnerTest.java | 34 ++++---- .../hrtr/HttpRemoteTaskRunnerTest.java | 81 ++++++++++--------- 2 files changed, 58 insertions(+), 57 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 845666966ef7..edb6f642b115 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -108,9 +108,9 @@ public void testRun() throws Exception { doSetup(); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); ListenableFuture result = remoteTaskRunner.run(task); @@ -125,9 +125,9 @@ public void testRun() throws Exception cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId())); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); } @Test @@ -438,8 +438,8 @@ public void testRunWithTaskComplete() throws Exception public void testWorkerRemoved() throws Exception { doSetup(); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Future future = remoteTaskRunner.run(task); @@ -472,8 +472,8 @@ public boolean isValid() ); Assert.assertNull(cf.checkExists().forPath(STATUS_PATH)); - Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertFalse(remoteTaskRunner.getTotalTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); + Assert.assertFalse(remoteTaskRunner.getIdleTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); } @Test @@ -678,9 +678,9 @@ public boolean apply(ImmutableWorkerInfo input) ); Assert.assertEquals(1, lazyworkers.size()); Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size()); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); - Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(remoteTaskRunner.getIdleTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); + Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); } @Test @@ -991,12 +991,12 @@ public void testSuccessfulTaskOnBlacklistedWorker() throws Exception mockWorkerCompleteFailedTask(task1); Assert.assertTrue(taskFuture1.get().isFailure()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); Future taskFuture2 = remoteTaskRunner.run(task2); Assert.assertTrue(taskAnnounced(task2.getId())); mockWorkerRunningTask(task2); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); Future taskFuture3 = remoteTaskRunner.run(task3); Assert.assertTrue(taskAnnounced(task3.getId())); @@ -1004,12 +1004,12 @@ public void testSuccessfulTaskOnBlacklistedWorker() throws Exception mockWorkerCompleteFailedTask(task3); Assert.assertTrue(taskFuture3.get().isFailure()); Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); mockWorkerCompleteSuccessfulTask(task2); Assert.assertTrue(taskFuture2.get().isSuccess()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount().getOrDefault(WorkerConfig.DEFAULT_CATEGORY, 0L).longValue()); + Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 1dae0d1da4a0..fe7703cdfdc6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -813,6 +813,7 @@ public void testMarkWorkersLazy() throws Exception Task task1 = NoopTask.create("task-id-1", 0); Task task2 = NoopTask.create("task-id-2", 0); + String ADDITIONAL_WORKER_CATEGORY = "category2"; ConcurrentMap workerHolders = new ConcurrentHashMap<>(); @@ -864,9 +865,9 @@ protected WorkerHolder createWorkerHolder( taskRunner.start(); - Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertTrue(taskRunner.getTotalTaskSlotCount().isEmpty()); + Assert.assertTrue(taskRunner.getIdleTaskSlotCount().isEmpty()); + Assert.assertTrue(taskRunner.getUsedTaskSlotCount().isEmpty()); AtomicInteger ticks = new AtomicInteger(); @@ -874,7 +875,7 @@ protected WorkerHolder createWorkerHolder( new DruidNode("service", "host1", false, 8080, null, true, false), NodeRole.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", "c1") + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -910,9 +911,9 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", "c druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1)); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); taskRunner.run(task1); @@ -920,16 +921,16 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", "c Thread.sleep(100); } - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", false, 8080, null, true, false), NodeRole.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, - new WorkerNodeService("ip2", 1, "0", "c2") + new WorkerNodeService("ip2", 1, "0", ADDITIONAL_WORKER_CATEGORY) ) ); @@ -952,12 +953,12 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", "c druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2)); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); taskRunner.run(task2); @@ -965,19 +966,19 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", "c Thread.sleep(100); } - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( new DruidNode("service", "host3", false, 8080, null, true, false), NodeRole.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, - new WorkerNodeService("ip2", 1, "0", "c1") + new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -1000,14 +1001,14 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", "c druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3)); - Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); + Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); @@ -1018,14 +1019,14 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", "c .getHost() ); - Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().getOrDefault("c2", 0L).longValue()); - Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().getOrDefault("c1", 0L).longValue()); - Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount().getOrDefault("c2", 0L).longValue()); + Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); } /* From 8d6bd16e81f4db2622f3bf97694e63ffab62c0a6 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Fri, 6 Aug 2021 15:25:20 -0700 Subject: [PATCH 06/10] Fixing alerts --- .../indexing/overlord/RemoteTaskRunner.java | 10 +++--- .../overlord/hrtr/HttpRemoteTaskRunner.java | 10 +++--- .../hrtr/HttpRemoteTaskRunnerTest.java | 32 +++++++++---------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 8443db815988..b8d1aee686ef 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -1520,7 +1520,7 @@ public Map getTotalTaskSlotCount() Map totalPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { String workerCategory = worker.getWorker().getCategory(); - Integer workerCapacity = worker.getWorker().getCapacity(); + int workerCapacity = worker.getWorker().getCapacity(); totalPeons.put(workerCategory, totalPeons.getOrDefault(workerCategory, 0L) + workerCapacity); } @@ -1533,7 +1533,7 @@ public Map getIdleTaskSlotCount() Map totalIdlePeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { String workerCategory = worker.getWorker().getCategory(); - Integer workerAvailableCapacity = worker.getAvailableCapacity(); + int workerAvailableCapacity = worker.getAvailableCapacity(); totalIdlePeons.put(workerCategory, totalIdlePeons.getOrDefault(workerCategory, 0L) + workerAvailableCapacity); } @@ -1546,7 +1546,7 @@ public Map getUsedTaskSlotCount() Map totalUsedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { String workerCategory = worker.getWorker().getCategory(); - Integer workerUsedCapacity = worker.getCurrCapacityUsed(); + int workerUsedCapacity = worker.getCurrCapacityUsed(); totalUsedPeons.put(workerCategory, totalUsedPeons.getOrDefault(workerCategory, 0L) + workerUsedCapacity); } @@ -1559,7 +1559,7 @@ public Map getLazyTaskSlotCount() Map totalLazyPeons = new HashMap<>(); for (Worker worker : getLazyWorkers()) { String workerCategory = worker.getCategory(); - Integer workerLazyPeons = worker.getCapacity(); + int workerLazyPeons = worker.getCapacity(); totalLazyPeons.put(workerCategory, totalLazyPeons.getOrDefault(workerCategory, 0L) + workerLazyPeons); } @@ -1572,7 +1572,7 @@ public Map getBlacklistedTaskSlotCount() Map totalBlacklistedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { String workerCategory = worker.getWorker().getCategory(); - Integer workerBlacklistedPeons = worker.getWorker().getCapacity(); + int workerBlacklistedPeons = worker.getWorker().getCapacity(); totalBlacklistedPeons.put(workerCategory, totalBlacklistedPeons.getOrDefault(workerCategory, 0L) + workerBlacklistedPeons); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 2487b68fc8fb..063fbb729515 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1634,7 +1634,7 @@ public Map getTotalTaskSlotCount() Map totalPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { String workerCategory = worker.getWorker().getCategory(); - Integer workerCapacity = worker.getWorker().getCapacity(); + int workerCapacity = worker.getWorker().getCapacity(); totalPeons.put(workerCategory, totalPeons.getOrDefault(workerCategory, 0L) + workerCapacity); } @@ -1647,7 +1647,7 @@ public Map getIdleTaskSlotCount() Map totalIdlePeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { String workerCategory = worker.getWorker().getCategory(); - Integer workerAvailableCapacity = worker.getAvailableCapacity(); + int workerAvailableCapacity = worker.getAvailableCapacity(); totalIdlePeons.put(workerCategory, totalIdlePeons.getOrDefault(workerCategory, 0L) + workerAvailableCapacity); } @@ -1660,7 +1660,7 @@ public Map getUsedTaskSlotCount() Map totalUsedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { String workerCategory = worker.getWorker().getCategory(); - Integer workerUsedCapacity = worker.getCurrCapacityUsed(); + int workerUsedCapacity = worker.getCurrCapacityUsed(); totalUsedPeons.put(workerCategory, totalUsedPeons.getOrDefault(workerCategory, 0L) + workerUsedCapacity); } @@ -1673,7 +1673,7 @@ public Map getLazyTaskSlotCount() Map totalLazyPeons = new HashMap<>(); for (Worker worker : getLazyWorkers()) { String workerCategory = worker.getCategory(); - Integer workerLazyPeons = worker.getCapacity(); + int workerLazyPeons = worker.getCapacity(); totalLazyPeons.put(workerCategory, totalLazyPeons.getOrDefault(workerCategory, 0L) + workerLazyPeons); } @@ -1686,7 +1686,7 @@ public Map getBlacklistedTaskSlotCount() Map totalBlacklistedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { String workerCategory = worker.getWorker().getCategory(); - Integer workerBlacklistedPeons = worker.getWorker().getCapacity(); + int workerBlacklistedPeons = worker.getWorker().getCapacity(); totalBlacklistedPeons.put(workerCategory, totalBlacklistedPeons.getOrDefault(workerCategory, 0L) + workerBlacklistedPeons); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index fe7703cdfdc6..5a676113eda4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -813,7 +813,7 @@ public void testMarkWorkersLazy() throws Exception Task task1 = NoopTask.create("task-id-1", 0); Task task2 = NoopTask.create("task-id-2", 0); - String ADDITIONAL_WORKER_CATEGORY = "category2"; + String additionalWorkerCategory = "category2"; ConcurrentMap workerHolders = new ConcurrentHashMap<>(); @@ -930,7 +930,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo NodeRole.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, - new WorkerNodeService("ip2", 1, "0", ADDITIONAL_WORKER_CATEGORY) + new WorkerNodeService("ip2", 1, "0", additionalWorkerCategory) ) ); @@ -954,11 +954,11 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2)); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue()); Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(additionalWorkerCategory).longValue()); Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue()); taskRunner.run(task2); @@ -967,11 +967,11 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo } Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue()); Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory)); Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue()); DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( new DruidNode("service", "host3", false, 8080, null, true, false), @@ -1002,13 +1002,13 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3)); Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue()); Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory)); Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue()); Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); - Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); + Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory)); Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); @@ -1020,13 +1020,13 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo ); Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue()); Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory)); Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(ADDITIONAL_WORKER_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue()); Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(ADDITIONAL_WORKER_CATEGORY)); + Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory)); } /* From 269b4369d2cbf93a287237d74ac50451fca56092 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Tue, 10 Aug 2021 11:50:01 -0700 Subject: [PATCH 07/10] Adding unit test in SingleTaskBackgroundRunnerTest for task slot metrics APIs --- .../SingleTaskBackgroundRunnerTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 06a4068cdd66..1ae81e13c348 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -163,6 +164,22 @@ public void testRun() throws ExecutionException, InterruptedException ); } + @Test + public void testRunWithTaskSlotMetrics() throws ExecutionException, InterruptedException + { + Assert.assertEquals(1, runner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, runner.getBlacklistedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, runner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, runner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, runner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals( + TaskState.SUCCESS, + runner.run(new NoopTask(null, null, null, 500L, 0, null, null, null)).get().getStatusCode() + ); + Assert.assertEquals(0, runner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, runner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + } + @Test public void testGetQueryRunner() throws ExecutionException, InterruptedException { From 28b36858eba4cba89a7ca79ff4eb78af35a144f7 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Tue, 10 Aug 2021 20:05:38 -0700 Subject: [PATCH 08/10] Resolving false positive spell check --- website/.spelling | 1 + 1 file changed, 1 insertion(+) diff --git a/website/.spelling b/website/.spelling index 3f7690fc25ee..8b97ca359bbf 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1125,6 +1125,7 @@ P3M PT12H STRING_ARRAY String.format +UNNEST acos args arr1 From f7c16cef4d1388726abe961fcc56e7332fdb3071 Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Fri, 10 Sep 2021 15:44:28 -0700 Subject: [PATCH 09/10] addressing comments --- .../indexing/overlord/ForkingTaskRunner.java | 17 ++++++++++--- .../indexing/overlord/RemoteTaskRunner.java | 25 +++++++++++++++---- .../overlord/ThreadingTaskRunner.java | 14 ++++++++--- .../overlord/hrtr/HttpRemoteTaskRunner.java | 25 +++++++++++++++---- website/.spelling | 1 - 5 files changed, 65 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 6a74833620ae..ea20797941e3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -680,12 +680,18 @@ public Map getTotalTaskSlotCount() return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1)); } + public long getTotalTaskSlotCountLong() + { + if (config.getPorts() != null && !config.getPorts().isEmpty()) { + return config.getPorts().size(); + } + return config.getEndPort() - config.getStartPort() + 1; + } + @Override public Map getIdleTaskSlotCount() { - Map totalTaskSlots = getTotalTaskSlotCount(); - Map usedTaskSlots = getUsedTaskSlotCount(); - return ImmutableMap.of(workerConfig.getCategory(), Math.max(totalTaskSlots.get(workerConfig.getCategory()) - usedTaskSlots.get(workerConfig.getCategory()), 0)); + return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0)); } @Override @@ -694,6 +700,11 @@ public Map getUsedTaskSlotCount() return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(portFinder.findUsedPortCount())); } + public long getUsedTaskSlotCountLong() + { + return portFinder.findUsedPortCount(); + } + @Override public Map getLazyTaskSlotCount() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index b8d1aee686ef..3c1cbb608e48 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -1521,7 +1521,10 @@ public Map getTotalTaskSlotCount() for (ImmutableWorkerInfo worker : getWorkers()) { String workerCategory = worker.getWorker().getCategory(); int workerCapacity = worker.getWorker().getCapacity(); - totalPeons.put(workerCategory, totalPeons.getOrDefault(workerCategory, 0L) + workerCapacity); + totalPeons.compute( + workerCategory, + (category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity + ); } return totalPeons; @@ -1534,7 +1537,10 @@ public Map getIdleTaskSlotCount() for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { String workerCategory = worker.getWorker().getCategory(); int workerAvailableCapacity = worker.getAvailableCapacity(); - totalIdlePeons.put(workerCategory, totalIdlePeons.getOrDefault(workerCategory, 0L) + workerAvailableCapacity); + totalIdlePeons.compute( + workerCategory, + (category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity + ); } return totalIdlePeons; @@ -1547,7 +1553,10 @@ public Map getUsedTaskSlotCount() for (ImmutableWorkerInfo worker : getWorkers()) { String workerCategory = worker.getWorker().getCategory(); int workerUsedCapacity = worker.getCurrCapacityUsed(); - totalUsedPeons.put(workerCategory, totalUsedPeons.getOrDefault(workerCategory, 0L) + workerUsedCapacity); + totalUsedPeons.compute( + workerCategory, + (category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity + ); } return totalUsedPeons; @@ -1560,7 +1569,10 @@ public Map getLazyTaskSlotCount() for (Worker worker : getLazyWorkers()) { String workerCategory = worker.getCategory(); int workerLazyPeons = worker.getCapacity(); - totalLazyPeons.put(workerCategory, totalLazyPeons.getOrDefault(workerCategory, 0L) + workerLazyPeons); + totalLazyPeons.compute( + workerCategory, + (category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons + ); } return totalLazyPeons; @@ -1573,7 +1585,10 @@ public Map getBlacklistedTaskSlotCount() for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { String workerCategory = worker.getWorker().getCategory(); int workerBlacklistedPeons = worker.getWorker().getCapacity(); - totalBlacklistedPeons.put(workerCategory, totalBlacklistedPeons.getOrDefault(workerCategory, 0L) + workerBlacklistedPeons); + totalBlacklistedPeons.compute( + workerCategory, + (category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons + ); } return totalBlacklistedPeons; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java index 39bba7059a48..60733ef95838 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java @@ -462,12 +462,15 @@ public Map getTotalTaskSlotCount() return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(workerConfig.getCapacity())); } + public long getTotalTaskSlotCountLong() + { + return workerConfig.getCapacity(); + } + @Override public Map getIdleTaskSlotCount() { - Map totalTaskSlots = getTotalTaskSlotCount(); - Map usedTaskSlots = getTotalTaskSlotCount(); - return ImmutableMap.of(workerConfig.getCategory(), Math.max(totalTaskSlots.get(workerConfig.getCategory()) - usedTaskSlots.get(workerConfig.getCategory()), 0)); + return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0)); } @Override @@ -476,6 +479,11 @@ public Map getUsedTaskSlotCount() return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(getRunningTasks().size())); } + public long getUsedTaskSlotCountLong() + { + return getRunningTasks().size(); + } + @Override public Map getLazyTaskSlotCount() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 063fbb729515..45af2157282d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1635,7 +1635,10 @@ public Map getTotalTaskSlotCount() for (ImmutableWorkerInfo worker : getWorkers()) { String workerCategory = worker.getWorker().getCategory(); int workerCapacity = worker.getWorker().getCapacity(); - totalPeons.put(workerCategory, totalPeons.getOrDefault(workerCategory, 0L) + workerCapacity); + totalPeons.compute( + workerCategory, + (category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity + ); } return totalPeons; @@ -1648,7 +1651,10 @@ public Map getIdleTaskSlotCount() for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { String workerCategory = worker.getWorker().getCategory(); int workerAvailableCapacity = worker.getAvailableCapacity(); - totalIdlePeons.put(workerCategory, totalIdlePeons.getOrDefault(workerCategory, 0L) + workerAvailableCapacity); + totalIdlePeons.compute( + workerCategory, + (category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity + ); } return totalIdlePeons; @@ -1661,7 +1667,10 @@ public Map getUsedTaskSlotCount() for (ImmutableWorkerInfo worker : getWorkers()) { String workerCategory = worker.getWorker().getCategory(); int workerUsedCapacity = worker.getCurrCapacityUsed(); - totalUsedPeons.put(workerCategory, totalUsedPeons.getOrDefault(workerCategory, 0L) + workerUsedCapacity); + totalUsedPeons.compute( + workerCategory, + (category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity + ); } return totalUsedPeons; @@ -1674,7 +1683,10 @@ public Map getLazyTaskSlotCount() for (Worker worker : getLazyWorkers()) { String workerCategory = worker.getCategory(); int workerLazyPeons = worker.getCapacity(); - totalLazyPeons.put(workerCategory, totalLazyPeons.getOrDefault(workerCategory, 0L) + workerLazyPeons); + totalLazyPeons.compute( + workerCategory, + (category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons + ); } return totalLazyPeons; @@ -1687,7 +1699,10 @@ public Map getBlacklistedTaskSlotCount() for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { String workerCategory = worker.getWorker().getCategory(); int workerBlacklistedPeons = worker.getWorker().getCapacity(); - totalBlacklistedPeons.put(workerCategory, totalBlacklistedPeons.getOrDefault(workerCategory, 0L) + workerBlacklistedPeons); + totalBlacklistedPeons.compute( + workerCategory, + (category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons + ); } return totalBlacklistedPeons; diff --git a/website/.spelling b/website/.spelling index 8b97ca359bbf..3f7690fc25ee 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1125,7 +1125,6 @@ P3M PT12H STRING_ARRAY String.format -UNNEST acos args arr1 From 0420a78e31bea88c0b4ff5cdc0b199591bccf2dc Mon Sep 17 00:00:00 2001 From: Nikhil Navadiya Date: Fri, 8 Oct 2021 00:34:50 -0700 Subject: [PATCH 10/10] throw UnsupportedOperationException for tasklotmetrics APIs in SingleTaskBackgroundRunner --- .../overlord/SingleTaskBackgroundRunner.java | 17 ++++++++++------- .../SingleTaskBackgroundRunnerTest.java | 17 ----------------- 2 files changed, 10 insertions(+), 24 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index 0328de161c55..b421f97519dc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -22,7 +22,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -36,7 +35,6 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; -import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Numbers; @@ -337,34 +335,39 @@ public Optional getScalingStats() return Optional.absent(); } + /* This method should be never called in peons */ @Override public Map getTotalTaskSlotCount() { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 1L); + throw new UnsupportedOperationException(); } + /* This method should be never called in peons */ @Override public Map getIdleTaskSlotCount() { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, runningItem == null ? 1L : 0L); + throw new UnsupportedOperationException(); } + /* This method should be never called in peons */ @Override public Map getUsedTaskSlotCount() { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, runningItem == null ? 0L : 1L); + throw new UnsupportedOperationException(); } + /* This method should be never called in peons */ @Override public Map getLazyTaskSlotCount() { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); + throw new UnsupportedOperationException(); } + /* This method should be never called in peons */ @Override public Map getBlacklistedTaskSlotCount() { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); + throw new UnsupportedOperationException(); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 1ae81e13c348..06a4068cdd66 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -35,7 +35,6 @@ import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; -import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -164,22 +163,6 @@ public void testRun() throws ExecutionException, InterruptedException ); } - @Test - public void testRunWithTaskSlotMetrics() throws ExecutionException, InterruptedException - { - Assert.assertEquals(1, runner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(0, runner.getBlacklistedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(0, runner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(1, runner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(0, runner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals( - TaskState.SUCCESS, - runner.run(new NoopTask(null, null, null, 500L, 0, null, null, null)).get().getStatusCode() - ); - Assert.assertEquals(0, runner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - Assert.assertEquals(1, runner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); - } - @Test public void testGetQueryRunner() throws ExecutionException, InterruptedException {