From 94a8ae711db66dc100a9bd4900f5c2c69bb24e05 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Fri, 19 Jul 2019 20:46:52 -0700 Subject: [PATCH] feat: add extension for custom metrics (5.3.x) (#2997) --- .../io/confluent/ksql/util/KsqlConfig.java | 11 ++ .../java/io/confluent/ksql/ServiceInfo.java | 18 ++- .../io/confluent/ksql/engine/KsqlEngine.java | 3 +- .../ksql/internal/KsqlEngineMetrics.java | 145 +++++++++--------- .../confluent/ksql/internal/KsqlMetric.java | 54 +++++++ .../ksql/internal/KsqlMetricsExtension.java | 37 +++++ .../ksql/engine/KsqlEngineTestUtil.java | 2 +- .../ksql/internal/KsqlEngineMetricsTest.java | 47 +++++- 8 files changed, 238 insertions(+), 79 deletions(-) create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlMetric.java create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlMetricsExtension.java diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index ecb27616b98d..210dbb0285bb 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -149,6 +149,11 @@ public class KsqlConfig extends AbstractConfig { "A list of tags to be included with emitted JMX metrics, formatted as a string of key:value " + "pairs separated by commas. For example, 'key1:value1,key2:value2'."; + public static final String KSQL_CUSTOM_METRICS_EXTENSION = "ksql.metrics.extension"; + private static final String KSQL_CUSTOM_METRICS_EXTENSION_DOC = + "Extension for supplying custom metrics to be emitted along with " + + "the engine's default JMX metrics"; + public static final String KSQL_STREAMS_PREFIX = "ksql.streams."; public static final String KSQL_COLLECT_UDF_METRICS = "ksql.udf.collect.metrics"; @@ -459,6 +464,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { "", ConfigDef.Importance.LOW, KSQL_CUSTOM_METRICS_TAGS_DOC + ).define( + KSQL_CUSTOM_METRICS_EXTENSION, + ConfigDef.Type.CLASS, + null, + ConfigDef.Importance.LOW, + KSQL_CUSTOM_METRICS_EXTENSION_DOC ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java b/ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java index 45bf15d34d4c..dce77c4a66f0 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java @@ -15,14 +15,17 @@ package io.confluent.ksql; +import io.confluent.ksql.internal.KsqlMetricsExtension; import io.confluent.ksql.util.KsqlConfig; import java.util.Map; import java.util.Objects; +import java.util.Optional; public final class ServiceInfo { private final String serviceId; private final Map customMetricsTags; + private final Optional metricsExtension; /** * Create an object to be passed from the KSQL context down to the KSQL engine. @@ -33,16 +36,23 @@ public static ServiceInfo create(final KsqlConfig ksqlConfig) { final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); final Map customMetricsTags = ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS); + final Optional metricsExtension = Optional.ofNullable( + ksqlConfig.getConfiguredInstance( + KsqlConfig.KSQL_CUSTOM_METRICS_EXTENSION, + KsqlMetricsExtension.class + )); - return new ServiceInfo(serviceId, customMetricsTags); + return new ServiceInfo(serviceId, customMetricsTags, metricsExtension); } private ServiceInfo( final String serviceId, - final Map customMetricsTags + final Map customMetricsTags, + final Optional metricsExtension ) { this.serviceId = Objects.requireNonNull(serviceId, "serviceId"); this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags"); + this.metricsExtension = Objects.requireNonNull(metricsExtension, "metricsExtension"); } public String serviceId() { @@ -52,4 +62,8 @@ public String serviceId() { public Map customMetricsTags() { return customMetricsTags; } + + public Optional metricsExtension() { + return metricsExtension; + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 282896ef4572..1ed9f048aeae 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -75,7 +75,8 @@ public KsqlEngine( processingLogContext, serviceInfo.serviceId(), new MetaStoreImpl(functionRegistry), - (engine) -> new KsqlEngineMetrics(engine, serviceInfo.customMetricsTags())); + (engine) -> new KsqlEngineMetrics( + engine, serviceInfo.customMetricsTags(), serviceInfo.metricsExtension())); } KsqlEngine( diff --git a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java index 227499a9a4fc..322011967aec 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.function.Supplier; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; @@ -58,41 +59,52 @@ public class KsqlEngineMetrics implements Closeable { private final String ksqlServiceId; private final Map customMetricsTags; + private final Optional metricsExtension; private final KsqlEngine ksqlEngine; private final Metrics metrics; public KsqlEngineMetrics( final KsqlEngine ksqlEngine, - final Map customMetricsTags) { - this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags); + final Map customMetricsTags, + final Optional metricsExtension) { + this( + METRIC_GROUP_PREFIX, + ksqlEngine, + MetricCollectors.getMetrics(), + customMetricsTags, + metricsExtension); } KsqlEngineMetrics( final String metricGroupPrefix, final KsqlEngine ksqlEngine, final Metrics metrics, - final Map customMetricsTags) { + final Map customMetricsTags, + final Optional metricsExtension) { this.ksqlEngine = ksqlEngine; this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId(); this.sensors = new ArrayList<>(); this.countMetrics = new ArrayList<>(); this.metricGroupName = metricGroupPrefix + "-query-stats"; this.customMetricsTags = customMetricsTags; + this.metricsExtension = metricsExtension; this.metrics = metrics; - configureNumActiveQueries(metrics); - configureNumPersistentQueries(metrics); - this.messagesIn = configureMessagesIn(metrics); - this.totalMessagesIn = configureTotalMessagesIn(metrics); - this.totalBytesIn = configureTotalBytesIn(metrics); - this.messagesOut = configureMessagesOut(metrics); - this.numIdleQueries = configureIdleQueriesSensor(metrics); - this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor(metrics); - this.errorRate = configureErrorRate(metrics); + configureNumActiveQueries(); + configureNumPersistentQueries(); + this.messagesIn = configureMessagesIn(); + this.totalMessagesIn = configureTotalMessagesIn(); + this.totalBytesIn = configureTotalBytesIn(); + this.messagesOut = configureMessagesOut(); + this.numIdleQueries = configureIdleQueriesSensor(); + this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor(); + this.errorRate = configureErrorRate(); Arrays.stream(State.values()) - .forEach(state -> configureNumActiveQueriesForGivenState(metrics, state)); + .forEach(state -> configureNumActiveQueriesForGivenState(state)); + + configureCustomMetrics(); } @Override @@ -152,7 +164,7 @@ private void recordErrorRate(final double value) { this.errorRate.record(value); } - private Sensor configureErrorRate(final Metrics metrics) { + private Sensor configureErrorRate() { final String metricName = "error-rate"; final String description = "The number of messages which were consumed but not processed. " @@ -161,40 +173,37 @@ private Sensor configureErrorRate(final Metrics metrics) { + "Alternately, a consumed messages may not have been produced, hence " + "being effectively dropped. Such messages would also be counted " + "toward the error rate."; - return createSensor(metrics, metricName, description, Value::new); + return createSensor(KsqlMetric.of(metricName, description, Value::new)); } - private Sensor configureMessagesOut(final Metrics metrics) { + private Sensor configureMessagesOut() { final String metricName = "messages-produced-per-sec"; final String description = "The number of messages produced per second across all queries"; - return createSensor(metrics, metricName, description, Value::new); + return createSensor(KsqlMetric.of(metricName, description, Value::new)); } - private Sensor configureMessagesIn(final Metrics metrics) { + private Sensor configureMessagesIn() { final String metricName = "messages-consumed-per-sec"; final String description = "The number of messages consumed per second across all queries"; - return createSensor(metrics, metricName, description, Value::new); + return createSensor(KsqlMetric.of(metricName, description, Value::new)); } - private Sensor configureTotalMessagesIn(final Metrics metrics) { + private Sensor configureTotalMessagesIn() { final String metricName = "messages-consumed-total"; final String description = "The total number of messages consumed across all queries"; - return createSensor(metrics, metricName, description, Value::new); + return createSensor(KsqlMetric.of(metricName, description, Value::new)); } - private Sensor configureTotalBytesIn(final Metrics metrics) { + private Sensor configureTotalBytesIn() { final String metricName = "bytes-consumed-total"; final String description = "The total number of bytes consumed across all queries"; - return createSensor(metrics, metricName, description, Value::new); + return createSensor(KsqlMetric.of(metricName, description, Value::new)); } - private void configureNumActiveQueries(final Metrics metrics) { + private void configureNumActiveQueries() { final String metricName = "num-active-queries"; final String description = "The current number of active queries running in this engine"; - createSensor( - metrics, - metricName, - description, + final Supplier statSupplier = () -> new MeasurableStat() { @Override public double measure(final MetricConfig metricConfig, final long l) { @@ -205,17 +214,14 @@ public double measure(final MetricConfig metricConfig, final long l) { public void record(final MetricConfig metricConfig, final double v, final long l) { // We don't want to record anything, since the engine tracks query counts internally } - } - ); + }; + createSensor(KsqlMetric.of(metricName, description, statSupplier)); } - private void configureNumPersistentQueries(final Metrics metrics) { + private void configureNumPersistentQueries() { final String metricName = "num-persistent-queries"; final String description = "The current number of persistent queries running in this engine"; - createSensor( - metrics, - metricName, - description, + final Supplier statSupplier = () -> new MeasurableStat() { @Override public double measure(final MetricConfig metricConfig, final long l) { @@ -226,78 +232,63 @@ public double measure(final MetricConfig metricConfig, final long l) { public void record(final MetricConfig metricConfig, final double v, final long l) { // We don't want to record anything, since the engine tracks query counts internally } - } - ); + }; + createSensor(KsqlMetric.of(metricName, description, statSupplier)); } - private Sensor configureIdleQueriesSensor(final Metrics metrics) { + private Sensor configureIdleQueriesSensor() { final String metricName = "num-idle-queries"; final String description = "Number of inactive queries"; - final Sensor sensor = createSensor(metrics, metricName, description, Value::new); - return sensor; + return createSensor(KsqlMetric.of(metricName, description, Value::new)); } - private Sensor configureMessageConsumptionByQuerySensor(final Metrics metrics) { - final Sensor sensor = createSensor(metrics, "message-consumption-by-query"); + private Sensor configureMessageConsumptionByQuerySensor() { + final Sensor sensor = createSensor("message-consumption-by-query"); configureMetric( - metrics, sensor, - "messages-consumed-max", - "max msgs consumed by query", - Max::new + KsqlMetric.of("messages-consumed-max", "max msgs consumed by query", Max::new) ); configureMetric( - metrics, sensor, - "messages-consumed-min", - "min msgs consumed by query", - Min::new + KsqlMetric.of("messages-consumed-min", "min msgs consumed by query", Min::new) ); configureMetric( - metrics, sensor, - "messages-consumed-avg", - "mean msgs consumed by query", - Avg::new + KsqlMetric.of("messages-consumed-avg", "mean msgs consumed by query", Avg::new) ); return sensor; } private void configureMetric( - final Metrics metrics, final Sensor sensor, - final String metricName, - final String description, - final Supplier statSupplier) { + final KsqlMetric metric) { // legacy sensor.add( - metrics.metricName(ksqlServiceId + metricName, metricGroupName, description), - statSupplier.get()); + metrics.metricName(ksqlServiceId + metric.name(), metricGroupName, metric.description()), + metric.statSupplier().get()); // new sensor.add( metrics.metricName( - metricName, ksqlServiceId + metricGroupName, description, customMetricsTags), - statSupplier.get()); + metric.name(), + ksqlServiceId + metricGroupName, + metric.description(), + customMetricsTags), + metric.statSupplier().get()); } - private Sensor createSensor(final Metrics metrics, final String sensorName) { + private Sensor createSensor(final String sensorName) { final Sensor sensor = metrics.sensor(metricGroupName + "-" + sensorName); sensors.add(sensor); return sensor; } - private Sensor createSensor( - final Metrics metrics, - final String metricName, - final String description, - final Supplier statSupplier) { - final Sensor sensor = createSensor(metrics, metricName); - configureMetric(metrics, sensor, metricName, description, statSupplier); + private Sensor createSensor(final KsqlMetric metric) { + final Sensor sensor = createSensor(metric.name()); + configureMetric(sensor, metric); return sensor; } private void configureGaugeForState( - final Metrics metrics, final String name, final String group, final Map tags, @@ -317,12 +308,10 @@ private void configureGaugeForState( } private void configureNumActiveQueriesForGivenState( - final Metrics metrics, final KafkaStreams.State state) { final String name = state + "-queries"; // legacy configureGaugeForState( - metrics, ksqlServiceId + metricGroupName + "-" + name, metricGroupName, Collections.emptyMap(), @@ -330,7 +319,6 @@ private void configureNumActiveQueriesForGivenState( ); // new configureGaugeForState( - metrics, name, ksqlServiceId + metricGroupName, customMetricsTags, @@ -338,6 +326,15 @@ private void configureNumActiveQueriesForGivenState( ); } + private void configureCustomMetrics() { + if (!metricsExtension.isPresent()) { + return; + } + + final List customMetrics = metricsExtension.get().getCustomMetrics(); + customMetrics.forEach(this::createSensor); + } + private static class CountMetric { private final Gauge count; private final MetricName metricName; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlMetric.java b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlMetric.java new file mode 100644 index 000000000000..7605589f594a --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlMetric.java @@ -0,0 +1,54 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.internal; + +import java.util.Objects; +import java.util.function.Supplier; +import org.apache.kafka.common.metrics.MeasurableStat; + +public final class KsqlMetric { + private final String name; + private final String description; + private final Supplier statSupplier; + + public static KsqlMetric of( + final String name, + final String description, + final Supplier statSupplier) { + return new KsqlMetric(name, description, statSupplier); + } + + private KsqlMetric( + final String name, + final String description, + final Supplier statSupplier) { + this.name = Objects.requireNonNull(name, "name cannot be null"); + this.description = Objects.requireNonNull(description, "description cannot be null"); + this.statSupplier = Objects.requireNonNull(statSupplier, "statSupplier cannot be null"); + } + + public String name() { + return name; + } + + public String description() { + return description; + } + + public Supplier statSupplier() { + return statSupplier; + } +} \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlMetricsExtension.java b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlMetricsExtension.java new file mode 100644 index 000000000000..90e94205049f --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlMetricsExtension.java @@ -0,0 +1,37 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.internal; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.Configurable; + +/** + * This interface provides a way for users to provide custom metrics that will be emitted alongside + * the default KSQL engine JMX metrics. + */ +public interface KsqlMetricsExtension extends Configurable { + + @Override + void configure(Map config); + + /** + * Returns custom metrics to be emitted alongside the default KSQL engine JMX metrics. + * + * @return list of metrics + */ + List getCustomMetrics(); +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java index 045746712984..9a905cfc7597 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java @@ -51,7 +51,7 @@ public static KsqlEngine createKsqlEngine( ProcessingLogContext.create(), "test_instance_", metaStore, - (engine) -> new KsqlEngineMetrics(engine, Collections.emptyMap()) + (engine) -> new KsqlEngineMetrics(engine, Collections.emptyMap(), Optional.empty()) ); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java b/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java index 5fe2f82a9b58..8816d20c81d5 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.metrics.ConsumerCollector; @@ -38,12 +39,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.streams.KafkaStreams; @@ -76,7 +81,12 @@ public void setUp() { when(ksqlEngine.getServiceId()).thenReturn(KSQL_SERVICE_ID); when(query1.getQueryApplicationId()).thenReturn("app-1"); - engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics(), CUSTOM_TAGS); + engineMetrics = new KsqlEngineMetrics( + METRIC_GROUP, + ksqlEngine, + MetricCollectors.getMetrics(), + CUSTOM_TAGS, + Optional.of(new TestKsqlMetricsExtension())); } @After @@ -242,6 +252,15 @@ public void shouldRecordMinMessagesConsumedByQuery() { assertThat(Math.floor(legacyValue), closeTo(numMessagesConsumed / 100, 0.01)); } + @Test + public void shouldRecordCustomMetric() { + final double value = getMetricValue("my-custom-metric"); + final double legacyValue = getMetricValueLegacy("my-custom-metric"); + + assertThat(value, equalTo(123.0)); + assertThat(legacyValue, equalTo(123.0)); + } + @Test public void shouldRegisterQueries() { // When: @@ -327,4 +346,30 @@ private static Answer> returnQueriesInState( return queryMetadataList; }; } + + private static class TestKsqlMetricsExtension implements KsqlMetricsExtension { + + @Override + public void configure(Map config) { + } + + @Override + public List getCustomMetrics() { + final String name = "my-custom-metric"; + final String description = ""; + final Supplier statSupplier = + () -> new MeasurableStat() { + @Override + public double measure(final MetricConfig metricConfig, final long l) { + return 123; + } + + @Override + public void record(final MetricConfig metricConfig, final double v, final long l) { + // Nothing to record + } + }; + return ImmutableList.of(KsqlMetric.of(name, description, statSupplier)); + } + } }