From 72027260ba0e82287b232f33966124c2cc55de50 Mon Sep 17 00:00:00 2001 From: Rohan <desai.p.rohan@gmail.com> Date: Tue, 29 Sep 2020 19:30:31 -0700 Subject: [PATCH] feat: add aggregated rocksdb metrics This patch adds metrics that return aggregates of the rocksdb metrics added by KIP-607. Specifically, this particular PR adds the following metics: num-running-compactions-total: the total number of running compactions estimate-num-keys-total: an estimate of the total number of rocksdb keys block-cache-usage-total: total memory usage of all block cache block-cache-pinned-usage-total: total memory used by pinned blocks estimate-table-readers-mem-total: estimate of the total table readers mem ksqlDB registers for notification about new rocksdb metrics by creating a MetricsReporter implementation called RocksDBMetricCollector. The metrics system calls into MetricsReporter.metricChange when a new metric is added. RocksDBMetricCollector looks out for rocksdb property metrics it cares about and tracks them under the relevant aggregates. Each aggregate is registered with the ksql metrics context on the first instantiation of RocksDBMetricCollector. Metrics are computed lazily when read, and are rate-limited to a configurable interval. The interval is set using the property ksql.rocksdb.metrics.update.interval.seconds --- config/ksql-server.properties | 2 +- .../confluent/ksql/query/QueryExecutor.java | 6 + .../metrics/RocksDBMetricsCollector.java | 258 ++++++++++++++++++ .../metrics/RocksDBMetricsCollectorTest.java | 239 ++++++++++++++++ 4 files changed, 504 insertions(+), 1 deletion(-) create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollector.java create mode 100644 ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollectorTest.java diff --git a/config/ksql-server.properties b/config/ksql-server.properties index c238389cd1a7..7735072ff3c8 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -20,7 +20,7 @@ # The default is any IPv4 interface on the machine. # NOTE: If set to wildcard or loopback set 'advertised.listener' to enable pull queries across machines listeners=http://0.0.0.0:8088 - +ksql.service.id=foo # Use the 'listeners' line below for any IPv6 interface on the machine. # listeners=http://[::]:8088 diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 88ed6c9ca8d5..09af0e9a9330 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -31,6 +31,7 @@ import io.confluent.ksql.execution.streams.KSPlanBuilder; import io.confluent.ksql.execution.streams.materialization.KsqlMaterializationFactory; import io.confluent.ksql.execution.streams.materialization.ks.KsMaterializationFactory; +import io.confluent.ksql.execution.streams.metrics.RocksDBMetricsCollector; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogger; @@ -288,6 +289,11 @@ private Map<String, Object> buildStreamsProperties( StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), ProducerCollector.class.getCanonicalName() ); + updateListProperty( + newStreamsProperties, + StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + RocksDBMetricsCollector.class.getName() + ); return newStreamsProperties; } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollector.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollector.java new file mode 100644 index 000000000000..98cf2736db5b --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollector.java @@ -0,0 +1,258 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.metrics.MetricCollectors; +import java.math.BigInteger; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RocksDBMetricsCollector implements MetricsReporter { + private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBMetricsCollector.class); + + static final String KSQL_ROCKSDB_METRICS_GROUP = "io.confluent.ksql.rocksdb"; + static final String NUMBER_OF_RUNNING_COMPACTIONS = "num-running-compactions"; + static final String BLOCK_CACHE_USAGE = "block-cache-usage"; + static final String BLOCK_CACHE_PINNED_USAGE = "block-cache-pinned-usage"; + static final String ESTIMATE_NUM_KEYS = "estimate-num-keys"; + static final String ESTIMATE_TABLE_READERS_MEM = "estimate-table-readers-mem"; + static final String UPDATE_INTERVAL_CONFIG = "ksql.rocksdb.metrics.update.interval.seconds"; + private static final int UPDATE_INTERVAL_DEFAULT = 15; + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define( + UPDATE_INTERVAL_CONFIG, + Type.INT, + UPDATE_INTERVAL_DEFAULT, + Importance.LOW, + "minimum interval between computations of a metric value" + ); + + private static final Object lock = new Object(); + + private static Map<String, Collection<AggregatedMetric<?>>> registeredMetrics = null; + private final Metrics metrics; + + public RocksDBMetricsCollector() { + this(MetricCollectors.getMetrics()); + } + + @VisibleForTesting + RocksDBMetricsCollector(final Metrics metrics) { + this.metrics = Objects.requireNonNull(metrics); + } + + @Override + public void configure(Map<String, ?> map) { + final AbstractConfig config = new AbstractConfig(CONFIG_DEF, map); + configureShared(config, metrics); + } + + @Override + public Set<String> reconfigurableConfigs() { + return Collections.emptySet(); + } + + @Override + public void init(final List<KafkaMetric> initial) { + initial.forEach(this::metricChange); + } + + @Override + public void metricChange(final KafkaMetric metric) { + if (!metric.metricName().group().equals(StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP)) { + return; + } + metricRemoval(metric); + final Collection<AggregatedMetric<?>> registered + = registeredMetrics.get(metric.metricName().name()); + if (registered == null) { + return; + } + registered.forEach(r -> r.add(metric)); + } + + @Override + public void metricRemoval(final KafkaMetric metric) { + final MetricName metricName = metric.metricName(); + if (!metricName.group().equals(StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP)) { + return; + } + final Collection<AggregatedMetric<?>> registered + = registeredMetrics.get(metricName.name()); + if (registered == null) { + return; + } + registered.forEach(r -> r.remove(metricName)); + } + + @VisibleForTesting + static void reset() { + registeredMetrics = null; + } + + @Override + public void close() { + } + + public static void update() { + registeredMetrics.values().stream() + .flatMap(Collection::stream) + .forEach(AggregatedMetric::update); + } + + private static void registerBigIntTotal( + final Interval interval, + final Map<String, Collection<AggregatedMetric<?>>> registeredMetrics, + final String name, + final Metrics metrics + ) { + registeredMetrics.putIfAbsent(name, new LinkedList<>()); + final AggregatedMetric<BigInteger> registered = new AggregatedMetric<>( + BigInteger.class, + BigInteger::add, + BigInteger.ZERO, + interval + ); + registeredMetrics.get(name).add(registered); + final MetricName metricName = metrics.metricName(name + "-total", KSQL_ROCKSDB_METRICS_GROUP); + metrics.addMetric(metricName, (Gauge<BigInteger>) (c, t) -> registered.getValue()); + } + + static class AggregatedMetric<T> { + private final Class<T> clazz; + private final BinaryOperator<T> aggregator; + private final T identity; + private final Interval interval; + private final Map<MetricName, KafkaMetric> metrics = new ConcurrentHashMap<>(); + private volatile T value; + + private AggregatedMetric( + final Class<T> clazz, + final BinaryOperator<T> aggregator, + final T identity, + final Interval interval + ) { + this.clazz = Objects.requireNonNull(clazz, "clazz"); + this.aggregator = Objects.requireNonNull(aggregator, "aggregator"); + this.identity = Objects.requireNonNull(identity, "identity"); + this.value = identity; + this.interval = interval; + } + + private void add(final KafkaMetric metric) { + metrics.put(metric.metricName(), metric); + } + + private void remove(final MetricName name) { + metrics.remove(name); + } + + private T getValue() { + if (interval.check()) { + value = update(); + } + return value; + } + + private T update() { + T current = identity; + for (final KafkaMetric metric : metrics.values()) { + final Object value = metric.metricValue(); + if (!clazz.isInstance(value)) { + LOGGER.debug( + "Skipping metric update due to unexpected value type returned by {}", + metric.metricName().toString() + ); + return identity; + } + current = aggregator.apply(current, clazz.cast(value)); + } + return current; + } + } + + static class Interval { + private final int intervalSeconds; + private final AtomicReference<Instant> last; + private final Supplier<Instant> clock; + + private Interval(final int intervalSeconds) { + this(intervalSeconds, Instant::now); + } + + Interval(final int intervalSeconds, final Supplier<Instant> clock) { + this.intervalSeconds = intervalSeconds; + this.clock = Objects.requireNonNull(clock, "clock"); + this.last = new AtomicReference<>(Instant.EPOCH); + } + + boolean check() { + final Instant now = clock.get(); + return last.accumulateAndGet( + now, + (l, n) -> n.isAfter(l.plusSeconds(intervalSeconds)) ? n : l + ) == now; + } + } + + private static void configureShared(final AbstractConfig config, final Metrics metrics) { + synchronized (lock) { + if (registeredMetrics != null) { + return; + } + final Interval interval = new Interval(config.getInt(UPDATE_INTERVAL_CONFIG)); + final Map<String, Collection<AggregatedMetric<?>>> builder = new HashMap<>(); + registerBigIntTotal(interval, builder, NUMBER_OF_RUNNING_COMPACTIONS, metrics); + registerBigIntTotal(interval, builder, BLOCK_CACHE_USAGE, metrics); + registerBigIntTotal(interval, builder, BLOCK_CACHE_PINNED_USAGE, metrics); + registerBigIntTotal(interval, builder, ESTIMATE_NUM_KEYS, metrics); + registerBigIntTotal(interval, builder, ESTIMATE_TABLE_READERS_MEM, metrics); + registeredMetrics = ImmutableMap.copyOf( + builder.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> ImmutableList.copyOf(e.getValue()))) + ); + } + } +} diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollectorTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollectorTest.java new file mode 100644 index 000000000000..2b4363c8a238 --- /dev/null +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollectorTest.java @@ -0,0 +1,239 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.execution.streams.metrics.RocksDBMetricsCollector.Interval; +import java.math.BigInteger; +import java.time.Instant; +import java.util.Collections; +import java.util.function.Supplier; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class RocksDBMetricsCollectorTest { + private static final int UPDATE_INTERVAL = 123; + + @Mock + private Metrics metrics; + @Mock + private Supplier<Instant> clock; + @Captor + private ArgumentCaptor<MetricValueProvider<?>> metricValueProvider; + + private RocksDBMetricsCollector collector; + + @Rule + public final MockitoRule rule = MockitoJUnit.rule(); + + @Before + public void setup() { + collector = new RocksDBMetricsCollector(metrics); + when(metrics.metricName(any(), any())).thenAnswer( + a -> new MetricName(a.getArgument(0), a.getArgument(1), "", Collections.emptyMap())); + collector.configure( + ImmutableMap.of(RocksDBMetricsCollector.UPDATE_INTERVAL_CONFIG, UPDATE_INTERVAL) + ); + } + + @After + public void cleanup() { + RocksDBMetricsCollector.reset(); + } + + private void shouldComputeSumOfAllStoreMetrics( + final String name, + final String otherName + ) { + // Given: + collector.metricChange(mockMetric( + StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, name, "a", BigInteger.valueOf(2))); + collector.metricChange(mockMetric( + StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, otherName, "a", BigInteger.valueOf(123))); + collector.metricChange(mockMetric( + StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, name, "b", BigInteger.valueOf(3))); + + // When: + final Gauge<?> gauge = verifyAndGetRegisteredMetric(name + "-total"); + final Object value = gauge.value(null, 0); + + // Then: + assertThat(value, equalTo(BigInteger.valueOf(5))); + } + + @Test + public void shouldComputeSumOfRunningCompactions() { + shouldComputeSumOfAllStoreMetrics( + RocksDBMetricsCollector.NUMBER_OF_RUNNING_COMPACTIONS, + RocksDBMetricsCollector.BLOCK_CACHE_PINNED_USAGE + ); + } + + @Test + public void shouldComputeSumOfBlockCachePinnedUsage() { + shouldComputeSumOfAllStoreMetrics( + RocksDBMetricsCollector.BLOCK_CACHE_PINNED_USAGE, + RocksDBMetricsCollector.NUMBER_OF_RUNNING_COMPACTIONS + ); + } + + @Test + public void shouldComputeSumOfEstimateNumKeys() { + shouldComputeSumOfAllStoreMetrics( + RocksDBMetricsCollector.ESTIMATE_NUM_KEYS, + RocksDBMetricsCollector.NUMBER_OF_RUNNING_COMPACTIONS + ); + } + + @Test + public void shouldComputeSumOfEstimateTableReadersMem() { + shouldComputeSumOfAllStoreMetrics( + RocksDBMetricsCollector.ESTIMATE_TABLE_READERS_MEM, + RocksDBMetricsCollector.NUMBER_OF_RUNNING_COMPACTIONS + ); + } + + @Test + public void shouldComputeSumOfBlockCacheUsage() { + shouldComputeSumOfAllStoreMetrics( + RocksDBMetricsCollector.BLOCK_CACHE_USAGE, + RocksDBMetricsCollector.NUMBER_OF_RUNNING_COMPACTIONS + ); + } + + @Test + public void shouldIgnoreMetricsFromWrongGroup() { + // When: + collector.metricChange(mockMetric( + "some-group", + RocksDBMetricsCollector.BLOCK_CACHE_USAGE, + "a", + BigInteger.valueOf(123) + )); + + // Then: + final Gauge<?> value = + verifyAndGetRegisteredMetric(RocksDBMetricsCollector.BLOCK_CACHE_USAGE + "-total"); + assertThat(value.value(null, 0), equalTo(BigInteger.valueOf(0))); + } + + @Test + public void shouldRemoveMetric() { + // Given: + final KafkaMetric metric = mockMetric( + StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, + RocksDBMetricsCollector.BLOCK_CACHE_USAGE, + "a", + BigInteger.valueOf(2) + ); + collector.metricChange(metric); + + // When: + collector.metricRemoval(metric); + + // Then: + final Gauge<?> gauge + = verifyAndGetRegisteredMetric(RocksDBMetricsCollector.BLOCK_CACHE_USAGE + "-total"); + final Object value = gauge.value(null, 0); + assertThat(value, equalTo(BigInteger.valueOf(0))); + } + + @Test + public void shouldComputeIntervalChangeCorrectly() { + // Given: + final Instant now = Instant.now(); + when(clock.get()).thenReturn( + now, + now.plusSeconds(UPDATE_INTERVAL - 10), + now.plusSeconds(UPDATE_INTERVAL + 1), + now.plusSeconds(UPDATE_INTERVAL + 10) + ); + final Interval interval = new Interval(UPDATE_INTERVAL, clock); + + // When/Then: + assertThat(interval.check(), is(true)); + assertThat(interval.check(), is(false)); + assertThat(interval.check(), is(true)); + assertThat(interval.check(), is(false)); + } + + @Test + public void shouldNotUpdateIfWithinInterval() { + // Given: + final RocksDBMetricsCollector collector = new RocksDBMetricsCollector(metrics); + final KafkaMetric metric = mockMetric( + StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, + RocksDBMetricsCollector.BLOCK_CACHE_USAGE, + "a", + BigInteger.valueOf(2) + ); + collector.metricChange(metric); + collector.configure(ImmutableMap.of(RocksDBMetricsCollector.UPDATE_INTERVAL_CONFIG, 3600)); + final Gauge<?> gauge = verifyAndGetRegisteredMetric( + RocksDBMetricsCollector.BLOCK_CACHE_USAGE + "-total"); + + // When: + gauge.value(null, 0); + gauge.value(null, 0); + gauge.value(null, 0); + + // Then: + verify(metric, times(1)).metricValue(); + } + + private KafkaMetric mockMetric( + final String group, final String name, final String store, Object value) { + final KafkaMetric metric = mock(KafkaMetric.class); + when(metric.metricName()).thenReturn( + new MetricName(name, group, "", ImmutableMap.of("store", store))); + when(metric.metricValue()).thenReturn(value); + return metric; + } + + private Gauge<?> verifyAndGetRegisteredMetric(final String name) { + verify(metrics).addMetric( + argThat( + n -> n.group().equals(RocksDBMetricsCollector.KSQL_ROCKSDB_METRICS_GROUP) + && n.name().equals(name) + ), + metricValueProvider.capture() + ); + return (Gauge<?>) metricValueProvider.getValue(); + } +} \ No newline at end of file