Skip to content

Commit

Permalink
feat: add extension for custom metrics (5.3.x) (#2997)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Jul 20, 2019
1 parent 925a7c2 commit 94a8ae7
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 79 deletions.
11 changes: 11 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public class KsqlConfig extends AbstractConfig {
"A list of tags to be included with emitted JMX metrics, formatted as a string of key:value "
+ "pairs separated by commas. For example, 'key1:value1,key2:value2'.";

public static final String KSQL_CUSTOM_METRICS_EXTENSION = "ksql.metrics.extension";
private static final String KSQL_CUSTOM_METRICS_EXTENSION_DOC =
"Extension for supplying custom metrics to be emitted along with "
+ "the engine's default JMX metrics";

public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";

public static final String KSQL_COLLECT_UDF_METRICS = "ksql.udf.collect.metrics";
Expand Down Expand Up @@ -459,6 +464,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
"",
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_TAGS_DOC
).define(
KSQL_CUSTOM_METRICS_EXTENSION,
ConfigDef.Type.CLASS,
null,
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_EXTENSION_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
18 changes: 16 additions & 2 deletions ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@

package io.confluent.ksql;

import io.confluent.ksql.internal.KsqlMetricsExtension;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public final class ServiceInfo {

private final String serviceId;
private final Map<String, String> customMetricsTags;
private final Optional<KsqlMetricsExtension> metricsExtension;

/**
* Create an object to be passed from the KSQL context down to the KSQL engine.
Expand All @@ -33,16 +36,23 @@ public static ServiceInfo create(final KsqlConfig ksqlConfig) {
final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final Map<String, String> customMetricsTags =
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS);
final Optional<KsqlMetricsExtension> metricsExtension = Optional.ofNullable(
ksqlConfig.getConfiguredInstance(
KsqlConfig.KSQL_CUSTOM_METRICS_EXTENSION,
KsqlMetricsExtension.class
));

return new ServiceInfo(serviceId, customMetricsTags);
return new ServiceInfo(serviceId, customMetricsTags, metricsExtension);
}

private ServiceInfo(
final String serviceId,
final Map<String, String> customMetricsTags
final Map<String, String> customMetricsTags,
final Optional<KsqlMetricsExtension> metricsExtension
) {
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags");
this.metricsExtension = Objects.requireNonNull(metricsExtension, "metricsExtension");
}

public String serviceId() {
Expand All @@ -52,4 +62,8 @@ public String serviceId() {
public Map<String, String> customMetricsTags() {
return customMetricsTags;
}

public Optional<KsqlMetricsExtension> metricsExtension() {
return metricsExtension;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public KsqlEngine(
processingLogContext,
serviceInfo.serviceId(),
new MetaStoreImpl(functionRegistry),
(engine) -> new KsqlEngineMetrics(engine, serviceInfo.customMetricsTags()));
(engine) -> new KsqlEngineMetrics(
engine, serviceInfo.customMetricsTags(), serviceInfo.metricsExtension()));
}

KsqlEngine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
Expand Down Expand Up @@ -58,41 +59,52 @@ public class KsqlEngineMetrics implements Closeable {

private final String ksqlServiceId;
private final Map<String, String> customMetricsTags;
private final Optional<KsqlMetricsExtension> metricsExtension;

private final KsqlEngine ksqlEngine;
private final Metrics metrics;

public KsqlEngineMetrics(
final KsqlEngine ksqlEngine,
final Map<String, String> customMetricsTags) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags);
final Map<String, String> customMetricsTags,
final Optional<KsqlMetricsExtension> metricsExtension) {
this(
METRIC_GROUP_PREFIX,
ksqlEngine,
MetricCollectors.getMetrics(),
customMetricsTags,
metricsExtension);
}

KsqlEngineMetrics(
final String metricGroupPrefix,
final KsqlEngine ksqlEngine,
final Metrics metrics,
final Map<String, String> customMetricsTags) {
final Map<String, String> customMetricsTags,
final Optional<KsqlMetricsExtension> metricsExtension) {
this.ksqlEngine = ksqlEngine;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId();
this.sensors = new ArrayList<>();
this.countMetrics = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";
this.customMetricsTags = customMetricsTags;
this.metricsExtension = metricsExtension;

this.metrics = metrics;

configureNumActiveQueries(metrics);
configureNumPersistentQueries(metrics);
this.messagesIn = configureMessagesIn(metrics);
this.totalMessagesIn = configureTotalMessagesIn(metrics);
this.totalBytesIn = configureTotalBytesIn(metrics);
this.messagesOut = configureMessagesOut(metrics);
this.numIdleQueries = configureIdleQueriesSensor(metrics);
this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor(metrics);
this.errorRate = configureErrorRate(metrics);
configureNumActiveQueries();
configureNumPersistentQueries();
this.messagesIn = configureMessagesIn();
this.totalMessagesIn = configureTotalMessagesIn();
this.totalBytesIn = configureTotalBytesIn();
this.messagesOut = configureMessagesOut();
this.numIdleQueries = configureIdleQueriesSensor();
this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor();
this.errorRate = configureErrorRate();
Arrays.stream(State.values())
.forEach(state -> configureNumActiveQueriesForGivenState(metrics, state));
.forEach(state -> configureNumActiveQueriesForGivenState(state));

configureCustomMetrics();
}

@Override
Expand Down Expand Up @@ -152,7 +164,7 @@ private void recordErrorRate(final double value) {
this.errorRate.record(value);
}

private Sensor configureErrorRate(final Metrics metrics) {
private Sensor configureErrorRate() {
final String metricName = "error-rate";
final String description =
"The number of messages which were consumed but not processed. "
Expand All @@ -161,40 +173,37 @@ private Sensor configureErrorRate(final Metrics metrics) {
+ "Alternately, a consumed messages may not have been produced, hence "
+ "being effectively dropped. Such messages would also be counted "
+ "toward the error rate.";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureMessagesOut(final Metrics metrics) {
private Sensor configureMessagesOut() {
final String metricName = "messages-produced-per-sec";
final String description = "The number of messages produced per second across all queries";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureMessagesIn(final Metrics metrics) {
private Sensor configureMessagesIn() {
final String metricName = "messages-consumed-per-sec";
final String description = "The number of messages consumed per second across all queries";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureTotalMessagesIn(final Metrics metrics) {
private Sensor configureTotalMessagesIn() {
final String metricName = "messages-consumed-total";
final String description = "The total number of messages consumed across all queries";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureTotalBytesIn(final Metrics metrics) {
private Sensor configureTotalBytesIn() {
final String metricName = "bytes-consumed-total";
final String description = "The total number of bytes consumed across all queries";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private void configureNumActiveQueries(final Metrics metrics) {
private void configureNumActiveQueries() {
final String metricName = "num-active-queries";
final String description = "The current number of active queries running in this engine";
createSensor(
metrics,
metricName,
description,
final Supplier<MeasurableStat> statSupplier =
() -> new MeasurableStat() {
@Override
public double measure(final MetricConfig metricConfig, final long l) {
Expand All @@ -205,17 +214,14 @@ public double measure(final MetricConfig metricConfig, final long l) {
public void record(final MetricConfig metricConfig, final double v, final long l) {
// We don't want to record anything, since the engine tracks query counts internally
}
}
);
};
createSensor(KsqlMetric.of(metricName, description, statSupplier));
}

private void configureNumPersistentQueries(final Metrics metrics) {
private void configureNumPersistentQueries() {
final String metricName = "num-persistent-queries";
final String description = "The current number of persistent queries running in this engine";
createSensor(
metrics,
metricName,
description,
final Supplier<MeasurableStat> statSupplier =
() -> new MeasurableStat() {
@Override
public double measure(final MetricConfig metricConfig, final long l) {
Expand All @@ -226,78 +232,63 @@ public double measure(final MetricConfig metricConfig, final long l) {
public void record(final MetricConfig metricConfig, final double v, final long l) {
// We don't want to record anything, since the engine tracks query counts internally
}
}
);
};
createSensor(KsqlMetric.of(metricName, description, statSupplier));
}

private Sensor configureIdleQueriesSensor(final Metrics metrics) {
private Sensor configureIdleQueriesSensor() {
final String metricName = "num-idle-queries";
final String description = "Number of inactive queries";
final Sensor sensor = createSensor(metrics, metricName, description, Value::new);
return sensor;
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureMessageConsumptionByQuerySensor(final Metrics metrics) {
final Sensor sensor = createSensor(metrics, "message-consumption-by-query");
private Sensor configureMessageConsumptionByQuerySensor() {
final Sensor sensor = createSensor("message-consumption-by-query");
configureMetric(
metrics,
sensor,
"messages-consumed-max",
"max msgs consumed by query",
Max::new
KsqlMetric.of("messages-consumed-max", "max msgs consumed by query", Max::new)
);
configureMetric(
metrics,
sensor,
"messages-consumed-min",
"min msgs consumed by query",
Min::new
KsqlMetric.of("messages-consumed-min", "min msgs consumed by query", Min::new)
);
configureMetric(
metrics,
sensor,
"messages-consumed-avg",
"mean msgs consumed by query",
Avg::new
KsqlMetric.of("messages-consumed-avg", "mean msgs consumed by query", Avg::new)
);
return sensor;
}

private void configureMetric(
final Metrics metrics,
final Sensor sensor,
final String metricName,
final String description,
final Supplier<MeasurableStat> statSupplier) {
final KsqlMetric metric) {
// legacy
sensor.add(
metrics.metricName(ksqlServiceId + metricName, metricGroupName, description),
statSupplier.get());
metrics.metricName(ksqlServiceId + metric.name(), metricGroupName, metric.description()),
metric.statSupplier().get());
// new
sensor.add(
metrics.metricName(
metricName, ksqlServiceId + metricGroupName, description, customMetricsTags),
statSupplier.get());
metric.name(),
ksqlServiceId + metricGroupName,
metric.description(),
customMetricsTags),
metric.statSupplier().get());
}

private Sensor createSensor(final Metrics metrics, final String sensorName) {
private Sensor createSensor(final String sensorName) {
final Sensor sensor = metrics.sensor(metricGroupName + "-" + sensorName);
sensors.add(sensor);
return sensor;
}

private Sensor createSensor(
final Metrics metrics,
final String metricName,
final String description,
final Supplier<MeasurableStat> statSupplier) {
final Sensor sensor = createSensor(metrics, metricName);
configureMetric(metrics, sensor, metricName, description, statSupplier);
private Sensor createSensor(final KsqlMetric metric) {
final Sensor sensor = createSensor(metric.name());
configureMetric(sensor, metric);
return sensor;
}

private void configureGaugeForState(
final Metrics metrics,
final String name,
final String group,
final Map<String, String> tags,
Expand All @@ -317,27 +308,33 @@ private void configureGaugeForState(
}

private void configureNumActiveQueriesForGivenState(
final Metrics metrics,
final KafkaStreams.State state) {
final String name = state + "-queries";
// legacy
configureGaugeForState(
metrics,
ksqlServiceId + metricGroupName + "-" + name,
metricGroupName,
Collections.emptyMap(),
state
);
// new
configureGaugeForState(
metrics,
name,
ksqlServiceId + metricGroupName,
customMetricsTags,
state
);
}

private void configureCustomMetrics() {
if (!metricsExtension.isPresent()) {
return;
}

final List<KsqlMetric> customMetrics = metricsExtension.get().getCustomMetrics();
customMetrics.forEach(this::createSensor);
}

private static class CountMetric {
private final Gauge<Long> count;
private final MetricName metricName;
Expand Down
Loading

0 comments on commit 94a8ae7

Please sign in to comment.