From ae0228e94894cc5a1c8f407378468d424e9b486c Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Thu, 31 Aug 2023 23:43:15 -0400 Subject: [PATCH] Add threadpool wait time metric --- .../OpenSearchThreadPoolExecutor.java | 7 ++++++ ...ResizableOpenSearchThreadPoolExecutor.java | 9 +++++++ ...eResizingOpenSearchThreadPoolExecutor.java | 8 +++++++ .../common/util/concurrent/TimedRunnable.java | 8 +++++++ .../org/opensearch/threadpool/ThreadPool.java | 9 ++++++- .../threadpool/ThreadPoolStats.java | 21 +++++++++++++++- .../cluster/node/stats/NodeStatsTests.java | 3 ++- .../threadpool/ThreadPoolStatsTests.java | 24 +++++++++---------- 8 files changed, 74 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index d967b7423ca80..e87b92b98d672 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -205,4 +205,11 @@ protected Runnable wrapRunnable(Runnable command) { protected Runnable unwrap(Runnable runnable) { return contextHolder.unwrap(runnable); } + + /** + * Thread pool wait time is supported by TimedRunnable + */ + public long getPoolWaitTime() { + return -1; + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java index 7a0ce8244efe4..0dd781c6d8a2b 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java @@ -9,6 +9,7 @@ package org.opensearch.common.util.concurrent; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.MeanMetric; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -27,6 +28,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch private final ResizableBlockingQueue workQueue; private final Function runnableWrapper; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final MeanMetric poolWaitTime; /** * Create new resizable at runtime thread pool executor @@ -101,6 +103,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch this.workQueue = workQueue; this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0); + this.poolWaitTime = new MeanMetric(); } @Override @@ -156,6 +159,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); } /** @@ -173,4 +177,9 @@ public synchronized int resize(int capacity) { capacity ); } + + @Override + public long getPoolWaitTime() { + return (long) poolWaitTime.mean(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java index 684dd7c9d8de5..62024a100dba6 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.MeanMetric; import org.opensearch.common.unit.TimeValue; import java.util.Locale; @@ -66,6 +67,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT private final int maxQueueSize; private final long targetedResponseTimeNanos; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final MeanMetric poolWaitTime; private final AtomicLong totalTaskNanos = new AtomicLong(0); private final AtomicInteger taskCount = new AtomicInteger(0); @@ -97,6 +99,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.poolWaitTime = new MeanMetric(); logger.debug( "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), @@ -190,6 +193,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); @@ -290,4 +294,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); } + @Override + public long getPoolWaitTime() { + return (long) poolWaitTime.mean(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java index f3bc50a33453b..2eb6657898008 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java @@ -107,6 +107,14 @@ long getTotalExecutionNanos() { return Math.max(finishTimeNanos - startTimeNanos, 1); } + long getWaitTimeNanos() { + if (startTimeNanos == -1) { + // There must have been an exception thrown, the total time is unknown (-1) + return -1; + } + return Math.max(startTimeNanos - creationTimeNanos, 1); + } + /** * If the task was failed or rejected, return true. * Otherwise, false. diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 6ddf3ff6b2f6a..8ecf00ce34c9a 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -395,7 +395,14 @@ public ThreadPoolStats stats() { rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected(); } } - stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed)); + long waitTime; + if (holder.executor() instanceof OpenSearchThreadPoolExecutor) { + OpenSearchThreadPoolExecutor openSearchThreadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor(); + waitTime = openSearchThreadPoolExecutor.getPoolWaitTime(); + } else { + waitTime = -1; + } + stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTime)); } return new ThreadPoolStats(stats); } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java index b4d7e4a3fbf7a..a310903505867 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java @@ -32,6 +32,8 @@ package org.opensearch.threadpool; +import org.opensearch.Version; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -43,6 +45,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Stats for a threadpool @@ -65,8 +68,9 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L)); List copy = new ArrayList<>(stats); Collections.sort(copy); @@ -79,11 +79,11 @@ public void testThreadPoolStatsToXContent() throws IOException { try (BytesStreamOutput os = new BytesStreamOutput()) { List stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, 0L)); ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats); try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {