From da201cb7055857acaac09b946dffd6dd1e8d9a38 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Fri, 22 Mar 2024 18:54:11 +0100 Subject: [PATCH] expose timer queue size via metrics --- doc/modules/ROOT/pages/reference/metrics.adoc | 21 ++++ .../core/metrics/MetricsConstants.java | 2 +- .../metrics/MicroProfileMetricsProvider.java | 18 ++++ .../metrics/MicrometerProvider.java | 12 +++ .../standalone/LazyDependencies.java | 2 +- .../standalone/MicrometerAdapter.java | 6 +- .../test/StandaloneMetricsTimerTest.java | 100 ++++++++++++++++++ .../programmatic/CdiMetricsTimerTest.java | 65 ++++++++++++ .../util/FaultToleranceBasicTest.java | 2 +- .../util/ResetSmallRyeMetricsExtension.java | 17 +++ 10 files changed, 241 insertions(+), 4 deletions(-) create mode 100644 implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneMetricsTimerTest.java create mode 100644 testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiMetricsTimerTest.java create mode 100644 testsuite/basic/src/test/java/io/smallrye/faulttolerance/util/ResetSmallRyeMetricsExtension.java diff --git a/doc/modules/ROOT/pages/reference/metrics.adoc b/doc/modules/ROOT/pages/reference/metrics.adoc index 815645ff2..af4904527 100644 --- a/doc/modules/ROOT/pages/reference/metrics.adoc +++ b/doc/modules/ROOT/pages/reference/metrics.adoc @@ -32,6 +32,22 @@ Those are described in the reference guides: * xref:reference/retry.adoc#metrics[Retry] * xref:reference/timeout.adoc#metrics[Timeout] +== Timer Metrics + +For task scheduling purposes (e.g. watching timeouts or delaying retries), {smallrye-fault-tolerance} maintains one thread called the _timer thread_. +Most of the time, it is kept sleeping (parked), it only wakes up (unparks) when necessary to submit tasks to executors. + +The behavior of the timer thread can be observed through the following metrics: + +[cols="1,5"] +|=== +| Name | `ft.timer.scheduled` +| Type | `Gauge` +| Unit | None +| Description | The number of tasks that are currently scheduled (for future execution) on the timer. +| Tags | None +|=== + == Micrometer Support In addition to the MicroProfile Metrics support, {smallrye-fault-tolerance} also provides support for https://micrometer.io/[Micrometer]. @@ -115,6 +131,11 @@ Metric types are mapped as closely as possible: | counter | counter | * + +| `ft.timer.scheduled` +| gauge +| gauge +| * |=== {empty}* This is a {smallrye-fault-tolerance} feature, not specified by {microprofile-fault-tolerance}. diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/metrics/MetricsConstants.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/metrics/MetricsConstants.java index 23d8bd162..7446338d6 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/metrics/MetricsConstants.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/metrics/MetricsConstants.java @@ -26,5 +26,5 @@ private MetricsConstants() { public static final String RATE_LIMIT_CALLS_TOTAL = "ft.ratelimit.calls.total"; - public static final String TIMER_QUEUE = "ft.timer.queue"; + public static final String TIMER_SCHEDULED = "ft.timer.scheduled"; } diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java index fa7c159b2..f55c8df5c 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java @@ -3,17 +3,23 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import jakarta.annotation.PostConstruct; import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.metrics.Metadata; import org.eclipse.microprofile.metrics.MetricRegistry; +import org.eclipse.microprofile.metrics.MetricUnits; import org.eclipse.microprofile.metrics.annotation.RegistryType; +import io.smallrye.faulttolerance.ExecutorHolder; import io.smallrye.faulttolerance.core.metrics.MeteredOperation; +import io.smallrye.faulttolerance.core.metrics.MetricsConstants; import io.smallrye.faulttolerance.core.metrics.MetricsProvider; import io.smallrye.faulttolerance.core.metrics.MetricsRecorder; import io.smallrye.faulttolerance.core.metrics.MicroProfileMetricsRecorder; +import io.smallrye.faulttolerance.core.timer.Timer; @Singleton public class MicroProfileMetricsProvider implements MetricsProvider { @@ -25,8 +31,20 @@ public class MicroProfileMetricsProvider implements MetricsProvider { @ConfigProperty(name = "MP_Fault_Tolerance_Metrics_Enabled", defaultValue = "true") boolean metricsEnabled; + @Inject + ExecutorHolder executorHolder; + private final Map cache = new ConcurrentHashMap<>(); + @PostConstruct + void init() { + Metadata metadata = Metadata.builder() + .withName(MetricsConstants.TIMER_SCHEDULED) + .withUnit(MetricUnits.NONE) + .build(); + registry.gauge(metadata, executorHolder.getTimer(), Timer::countScheduledTasks); + } + @Override public boolean isEnabled() { return metricsEnabled; diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java index a8ed7a6ff..f81e35d70 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java @@ -3,16 +3,20 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import jakarta.annotation.PostConstruct; import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.eclipse.microprofile.config.inject.ConfigProperty; import io.micrometer.core.instrument.MeterRegistry; +import io.smallrye.faulttolerance.ExecutorHolder; import io.smallrye.faulttolerance.core.metrics.MeteredOperation; +import io.smallrye.faulttolerance.core.metrics.MetricsConstants; import io.smallrye.faulttolerance.core.metrics.MetricsProvider; import io.smallrye.faulttolerance.core.metrics.MetricsRecorder; import io.smallrye.faulttolerance.core.metrics.MicrometerRecorder; +import io.smallrye.faulttolerance.core.timer.Timer; @Singleton public class MicrometerProvider implements MetricsProvider { @@ -23,8 +27,16 @@ public class MicrometerProvider implements MetricsProvider { @ConfigProperty(name = "MP_Fault_Tolerance_Metrics_Enabled", defaultValue = "true") boolean metricsEnabled; + @Inject + ExecutorHolder executorHolder; + private final Map cache = new ConcurrentHashMap<>(); + @PostConstruct + void init() { + registry.gauge(MetricsConstants.TIMER_SCHEDULED, executorHolder.getTimer(), Timer::countScheduledTasks); + } + @Override public boolean isEnabled() { return metricsEnabled; diff --git a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java index 656bbc1cc..fc84ec48c 100644 --- a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java +++ b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java @@ -55,7 +55,7 @@ public MetricsProvider metricsProvider() { if (metricsAdapter instanceof NoopAdapter) { metricsProvider = ((NoopAdapter) metricsAdapter).createMetricsProvider(); } else if (metricsAdapter instanceof MicrometerAdapter) { - metricsProvider = ((MicrometerAdapter) metricsAdapter).createMetricsProvider(); + metricsProvider = ((MicrometerAdapter) metricsAdapter).createMetricsProvider(timer); } else { throw new IllegalStateException("Invalid metrics adapter: " + metricsAdapter); } diff --git a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/MicrometerAdapter.java b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/MicrometerAdapter.java index e06aa0e47..401e67258 100644 --- a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/MicrometerAdapter.java +++ b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/MicrometerAdapter.java @@ -5,9 +5,11 @@ import io.micrometer.core.instrument.MeterRegistry; import io.smallrye.faulttolerance.core.metrics.MeteredOperation; +import io.smallrye.faulttolerance.core.metrics.MetricsConstants; import io.smallrye.faulttolerance.core.metrics.MetricsProvider; import io.smallrye.faulttolerance.core.metrics.MetricsRecorder; import io.smallrye.faulttolerance.core.metrics.MicrometerRecorder; +import io.smallrye.faulttolerance.core.timer.Timer; public final class MicrometerAdapter implements MetricsAdapter { private final MeterRegistry registry; @@ -16,7 +18,9 @@ public MicrometerAdapter(MeterRegistry registry) { this.registry = registry; } - MetricsProvider createMetricsProvider() { + MetricsProvider createMetricsProvider(Timer timer) { + registry.gauge(MetricsConstants.TIMER_SCHEDULED, timer, Timer::countScheduledTasks); + return new MetricsProvider() { private final Map cache = new ConcurrentHashMap<>(); diff --git a/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneMetricsTimerTest.java b/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneMetricsTimerTest.java new file mode 100644 index 000000000..b292e6b17 --- /dev/null +++ b/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneMetricsTimerTest.java @@ -0,0 +1,100 @@ +package io.smallrye.faulttolerance.standalone.test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.temporal.ChronoUnit; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.smallrye.faulttolerance.api.FaultTolerance; +import io.smallrye.faulttolerance.core.metrics.MetricsConstants; +import io.smallrye.faulttolerance.core.util.barrier.Barrier; +import io.smallrye.faulttolerance.standalone.Configuration; +import io.smallrye.faulttolerance.standalone.MetricsAdapter; +import io.smallrye.faulttolerance.standalone.MicrometerAdapter; +import io.smallrye.faulttolerance.standalone.StandaloneFaultTolerance; + +// needs to stay in sync with `CdiMetricsTimerTest` +public class StandaloneMetricsTimerTest { + static ExecutorService executor; + static MeterRegistry metrics; + + static Barrier barrier; + + @BeforeAll + public static void setUp() { + executor = Executors.newCachedThreadPool(); + metrics = new SimpleMeterRegistry(); + + StandaloneFaultTolerance.configure(new Configuration() { + @Override + public ExecutorService executor() { + return executor; + } + + @Override + public MetricsAdapter metricsAdapter() { + return new MicrometerAdapter(metrics); + } + + @Override + public void onShutdown() throws InterruptedException { + metrics.close(); + + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + }); + + barrier = Barrier.interruptible(); + } + + @AfterAll + public static void tearDown() throws InterruptedException { + StandaloneFaultTolerance.shutdown(); + } + + @Test + public void test() throws Exception { + Callable> guarded = FaultTolerance.createAsyncCallable(this::action) + .withThreadOffload(true) + .withTimeout().duration(1, ChronoUnit.MINUTES).done() + .withFallback().handler(this::fallback).done() + .build(); + + CompletableFuture future = guarded.call().toCompletableFuture(); + + assertThat(future).isNotCompleted(); + + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(metrics.get(MetricsConstants.TIMER_SCHEDULED).gauge().value()).isEqualTo(1.0); + }); + + barrier.open(); + + assertThat(future).succeedsWithin(2, TimeUnit.SECONDS) + .isEqualTo("hello"); + + assertThat(metrics.get(MetricsConstants.TIMER_SCHEDULED).gauge().value()).isEqualTo(0.0); + } + + public CompletionStage action() throws InterruptedException { + barrier.await(); + return CompletableFuture.completedStage("hello"); + } + + public CompletionStage fallback() { + return CompletableFuture.completedStage("fallback"); + } +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiMetricsTimerTest.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiMetricsTimerTest.java new file mode 100644 index 000000000..d8bffe196 --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiMetricsTimerTest.java @@ -0,0 +1,65 @@ +package io.smallrye.faulttolerance.programmatic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.temporal.ChronoUnit; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import org.eclipse.microprofile.metrics.MetricID; +import org.eclipse.microprofile.metrics.MetricRegistry; +import org.eclipse.microprofile.metrics.annotation.RegistryType; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.api.FaultTolerance; +import io.smallrye.faulttolerance.core.metrics.MetricsConstants; +import io.smallrye.faulttolerance.core.util.barrier.Barrier; +import io.smallrye.faulttolerance.util.FaultToleranceBasicTest; + +// needs to stay in sync with `StandaloneMetricsTimerTest` +@FaultToleranceBasicTest +public class CdiMetricsTimerTest { + static Barrier barrier; + + @BeforeAll + public static void setUp() { + barrier = Barrier.interruptible(); + } + + @Test + public void test(@RegistryType(type = MetricRegistry.Type.BASE) MetricRegistry metrics) throws Exception { + Callable> guarded = FaultTolerance.createAsyncCallable(this::action) + .withThreadOffload(true) + .withTimeout().duration(1, ChronoUnit.MINUTES).done() + .withFallback().handler(this::fallback).done() + .build(); + + CompletableFuture future = guarded.call().toCompletableFuture(); + + assertThat(future).isNotCompleted(); + + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(metrics.getGauge(new MetricID(MetricsConstants.TIMER_SCHEDULED)).getValue()).isEqualTo(1); + }); + + barrier.open(); + + assertThat(future).succeedsWithin(2, TimeUnit.SECONDS) + .isEqualTo("hello"); + + assertThat(metrics.getGauge(new MetricID(MetricsConstants.TIMER_SCHEDULED)).getValue()).isEqualTo(0); + } + + public CompletionStage action() throws InterruptedException { + barrier.await(); + return CompletableFuture.completedStage("hello"); + } + + public CompletionStage fallback() { + return CompletableFuture.completedStage("fallback"); + } +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/util/FaultToleranceBasicTest.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/util/FaultToleranceBasicTest.java index e78b9fb29..97ceb67e1 100644 --- a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/util/FaultToleranceBasicTest.java +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/util/FaultToleranceBasicTest.java @@ -14,7 +14,7 @@ @Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.TYPE, ElementType.METHOD }) -@ExtendWith(WeldWithFaultToleranceExtension.class) +@ExtendWith({ WeldWithFaultToleranceExtension.class, ResetSmallRyeMetricsExtension.class }) @AddExtensions({ FaultToleranceExtension.class, ConfigExtension.class, MetricCdiInjectionExtension.class }) public @interface FaultToleranceBasicTest { } diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/util/ResetSmallRyeMetricsExtension.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/util/ResetSmallRyeMetricsExtension.java new file mode 100644 index 000000000..fb1176dd4 --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/util/ResetSmallRyeMetricsExtension.java @@ -0,0 +1,17 @@ +package io.smallrye.faulttolerance.util; + +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import io.smallrye.metrics.MetricRegistries; + +public class ResetSmallRyeMetricsExtension implements BeforeAllCallback { + @Override + public void beforeAll(ExtensionContext extensionContext) { + // Since MP FT 3.0, metrics are added to the "base" scope, which persists across + // application undeployments (see https://github.com/smallrye/smallrye-metrics/issues/12). + // We drop all metric registries before tests, so that each test has its own set + // of metric registries and there's no cross-test pollution. + MetricRegistries.dropAll(); + } +}