Skip to content

Commit

Permalink
expose timer queue size via metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Ladicek committed Mar 25, 2024
1 parent dde4c52 commit da201cb
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 4 deletions.
21 changes: 21 additions & 0 deletions doc/modules/ROOT/pages/reference/metrics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ Those are described in the reference guides:
* xref:reference/retry.adoc#metrics[Retry]
* xref:reference/timeout.adoc#metrics[Timeout]
== Timer Metrics
For task scheduling purposes (e.g. watching timeouts or delaying retries), {smallrye-fault-tolerance} maintains one thread called the _timer thread_.
Most of the time, it is kept sleeping (parked), it only wakes up (unparks) when necessary to submit tasks to executors.

The behavior of the timer thread can be observed through the following metrics:

[cols="1,5"]
|===
| Name | `ft.timer.scheduled`
| Type | `Gauge<Integer>`
| Unit | None
| Description | The number of tasks that are currently scheduled (for future execution) on the timer.
| Tags | None
|===

== Micrometer Support

In addition to the MicroProfile Metrics support, {smallrye-fault-tolerance} also provides support for https://micrometer.io/[Micrometer].
Expand Down Expand Up @@ -115,6 +131,11 @@ Metric types are mapped as closely as possible:
| counter
| counter
| *

| `ft.timer.scheduled`
| gauge
| gauge
| *
|===

{empty}* This is a {smallrye-fault-tolerance} feature, not specified by {microprofile-fault-tolerance}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ private MetricsConstants() {

public static final String RATE_LIMIT_CALLS_TOTAL = "ft.ratelimit.calls.total";

public static final String TIMER_QUEUE = "ft.timer.queue";
public static final String TIMER_SCHEDULED = "ft.timer.scheduled";
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.metrics.Metadata;
import org.eclipse.microprofile.metrics.MetricRegistry;
import org.eclipse.microprofile.metrics.MetricUnits;
import org.eclipse.microprofile.metrics.annotation.RegistryType;

import io.smallrye.faulttolerance.ExecutorHolder;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MetricsConstants;
import io.smallrye.faulttolerance.core.metrics.MetricsProvider;
import io.smallrye.faulttolerance.core.metrics.MetricsRecorder;
import io.smallrye.faulttolerance.core.metrics.MicroProfileMetricsRecorder;
import io.smallrye.faulttolerance.core.timer.Timer;

@Singleton
public class MicroProfileMetricsProvider implements MetricsProvider {
Expand All @@ -25,8 +31,20 @@ public class MicroProfileMetricsProvider implements MetricsProvider {
@ConfigProperty(name = "MP_Fault_Tolerance_Metrics_Enabled", defaultValue = "true")
boolean metricsEnabled;

@Inject
ExecutorHolder executorHolder;

private final Map<Object, MetricsRecorder> cache = new ConcurrentHashMap<>();

@PostConstruct
void init() {
Metadata metadata = Metadata.builder()
.withName(MetricsConstants.TIMER_SCHEDULED)
.withUnit(MetricUnits.NONE)
.build();
registry.gauge(metadata, executorHolder.getTimer(), Timer::countScheduledTasks);
}

@Override
public boolean isEnabled() {
return metricsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.micrometer.core.instrument.MeterRegistry;
import io.smallrye.faulttolerance.ExecutorHolder;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MetricsConstants;
import io.smallrye.faulttolerance.core.metrics.MetricsProvider;
import io.smallrye.faulttolerance.core.metrics.MetricsRecorder;
import io.smallrye.faulttolerance.core.metrics.MicrometerRecorder;
import io.smallrye.faulttolerance.core.timer.Timer;

@Singleton
public class MicrometerProvider implements MetricsProvider {
Expand All @@ -23,8 +27,16 @@ public class MicrometerProvider implements MetricsProvider {
@ConfigProperty(name = "MP_Fault_Tolerance_Metrics_Enabled", defaultValue = "true")
boolean metricsEnabled;

@Inject
ExecutorHolder executorHolder;

private final Map<Object, MetricsRecorder> cache = new ConcurrentHashMap<>();

@PostConstruct
void init() {
registry.gauge(MetricsConstants.TIMER_SCHEDULED, executorHolder.getTimer(), Timer::countScheduledTasks);
}

@Override
public boolean isEnabled() {
return metricsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public MetricsProvider metricsProvider() {
if (metricsAdapter instanceof NoopAdapter) {
metricsProvider = ((NoopAdapter) metricsAdapter).createMetricsProvider();
} else if (metricsAdapter instanceof MicrometerAdapter) {
metricsProvider = ((MicrometerAdapter) metricsAdapter).createMetricsProvider();
metricsProvider = ((MicrometerAdapter) metricsAdapter).createMetricsProvider(timer);
} else {
throw new IllegalStateException("Invalid metrics adapter: " + metricsAdapter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

import io.micrometer.core.instrument.MeterRegistry;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MetricsConstants;
import io.smallrye.faulttolerance.core.metrics.MetricsProvider;
import io.smallrye.faulttolerance.core.metrics.MetricsRecorder;
import io.smallrye.faulttolerance.core.metrics.MicrometerRecorder;
import io.smallrye.faulttolerance.core.timer.Timer;

public final class MicrometerAdapter implements MetricsAdapter {
private final MeterRegistry registry;
Expand All @@ -16,7 +18,9 @@ public MicrometerAdapter(MeterRegistry registry) {
this.registry = registry;
}

MetricsProvider createMetricsProvider() {
MetricsProvider createMetricsProvider(Timer timer) {
registry.gauge(MetricsConstants.TIMER_SCHEDULED, timer, Timer::countScheduledTasks);

return new MetricsProvider() {
private final Map<Object, MetricsRecorder> cache = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.smallrye.faulttolerance.standalone.test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.temporal.ChronoUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.smallrye.faulttolerance.api.FaultTolerance;
import io.smallrye.faulttolerance.core.metrics.MetricsConstants;
import io.smallrye.faulttolerance.core.util.barrier.Barrier;
import io.smallrye.faulttolerance.standalone.Configuration;
import io.smallrye.faulttolerance.standalone.MetricsAdapter;
import io.smallrye.faulttolerance.standalone.MicrometerAdapter;
import io.smallrye.faulttolerance.standalone.StandaloneFaultTolerance;

// needs to stay in sync with `CdiMetricsTimerTest`
public class StandaloneMetricsTimerTest {
static ExecutorService executor;
static MeterRegistry metrics;

static Barrier barrier;

@BeforeAll
public static void setUp() {
executor = Executors.newCachedThreadPool();
metrics = new SimpleMeterRegistry();

StandaloneFaultTolerance.configure(new Configuration() {
@Override
public ExecutorService executor() {
return executor;
}

@Override
public MetricsAdapter metricsAdapter() {
return new MicrometerAdapter(metrics);
}

@Override
public void onShutdown() throws InterruptedException {
metrics.close();

executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
});

barrier = Barrier.interruptible();
}

@AfterAll
public static void tearDown() throws InterruptedException {
StandaloneFaultTolerance.shutdown();
}

@Test
public void test() throws Exception {
Callable<CompletionStage<String>> guarded = FaultTolerance.createAsyncCallable(this::action)
.withThreadOffload(true)
.withTimeout().duration(1, ChronoUnit.MINUTES).done()
.withFallback().handler(this::fallback).done()
.build();

CompletableFuture<String> future = guarded.call().toCompletableFuture();

assertThat(future).isNotCompleted();

await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(metrics.get(MetricsConstants.TIMER_SCHEDULED).gauge().value()).isEqualTo(1.0);
});

barrier.open();

assertThat(future).succeedsWithin(2, TimeUnit.SECONDS)
.isEqualTo("hello");

assertThat(metrics.get(MetricsConstants.TIMER_SCHEDULED).gauge().value()).isEqualTo(0.0);
}

public CompletionStage<String> action() throws InterruptedException {
barrier.await();
return CompletableFuture.completedStage("hello");
}

public CompletionStage<String> fallback() {
return CompletableFuture.completedStage("fallback");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.smallrye.faulttolerance.programmatic;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.temporal.ChronoUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import org.eclipse.microprofile.metrics.MetricID;
import org.eclipse.microprofile.metrics.MetricRegistry;
import org.eclipse.microprofile.metrics.annotation.RegistryType;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import io.smallrye.faulttolerance.api.FaultTolerance;
import io.smallrye.faulttolerance.core.metrics.MetricsConstants;
import io.smallrye.faulttolerance.core.util.barrier.Barrier;
import io.smallrye.faulttolerance.util.FaultToleranceBasicTest;

// needs to stay in sync with `StandaloneMetricsTimerTest`
@FaultToleranceBasicTest
public class CdiMetricsTimerTest {
static Barrier barrier;

@BeforeAll
public static void setUp() {
barrier = Barrier.interruptible();
}

@Test
public void test(@RegistryType(type = MetricRegistry.Type.BASE) MetricRegistry metrics) throws Exception {
Callable<CompletionStage<String>> guarded = FaultTolerance.createAsyncCallable(this::action)
.withThreadOffload(true)
.withTimeout().duration(1, ChronoUnit.MINUTES).done()
.withFallback().handler(this::fallback).done()
.build();

CompletableFuture<String> future = guarded.call().toCompletableFuture();

assertThat(future).isNotCompleted();

await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(metrics.getGauge(new MetricID(MetricsConstants.TIMER_SCHEDULED)).getValue()).isEqualTo(1);
});

barrier.open();

assertThat(future).succeedsWithin(2, TimeUnit.SECONDS)
.isEqualTo("hello");

assertThat(metrics.getGauge(new MetricID(MetricsConstants.TIMER_SCHEDULED)).getValue()).isEqualTo(0);
}

public CompletionStage<String> action() throws InterruptedException {
barrier.await();
return CompletableFuture.completedStage("hello");
}

public CompletionStage<String> fallback() {
return CompletableFuture.completedStage("fallback");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
@ExtendWith(WeldWithFaultToleranceExtension.class)
@ExtendWith({ WeldWithFaultToleranceExtension.class, ResetSmallRyeMetricsExtension.class })
@AddExtensions({ FaultToleranceExtension.class, ConfigExtension.class, MetricCdiInjectionExtension.class })
public @interface FaultToleranceBasicTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.smallrye.faulttolerance.util;

import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

import io.smallrye.metrics.MetricRegistries;

public class ResetSmallRyeMetricsExtension implements BeforeAllCallback {
@Override
public void beforeAll(ExtensionContext extensionContext) {
// Since MP FT 3.0, metrics are added to the "base" scope, which persists across
// application undeployments (see https://github.com/smallrye/smallrye-metrics/issues/12).
// We drop all metric registries before tests, so that each test has its own set
// of metric registries and there's no cross-test pollution.
MetricRegistries.dropAll();
}
}

0 comments on commit da201cb

Please sign in to comment.