diff --git a/ksql-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java b/ksql-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java index 0e67f1fab5e2..6c4355b97d7c 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java +++ b/ksql-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java @@ -16,6 +16,7 @@ package io.confluent.ksql.metrics; import io.confluent.common.utils.Time; +import io.confluent.ksql.util.KsqlConfig; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -90,6 +91,15 @@ static String addCollector(final String id, final MetricCollector collector) { return finalId; } + public static void addConfigurableReporter(final KsqlConfig ksqlConfig) { + final List reporters = ksqlConfig.getConfiguredInstances( + KsqlConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + for (final MetricsReporter reporter: reporters) { + metrics.addReporter(reporter); + } + } + static void remove(final String id) { collectorMap.remove(id); } diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index c20e9cbae3b7..99cbd4b53c4e 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -31,6 +31,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -53,6 +54,11 @@ public class KsqlConfig extends AbstractConfig { static final String KSQ_FUNCTIONS_GLOBAL_PROPERTY_PREFIX = KSQL_FUNCTIONS_PROPERTY_PREFIX + "_global_."; + public static final String METRIC_REPORTER_CLASSES_CONFIG = "ksql.metric.reporters"; + + public static final String METRIC_REPORTER_CLASSES_DOC = + CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC; + public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions"; public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas"; @@ -551,6 +557,11 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { ), ConfigDef.Importance.LOW, KSQL_ACCESS_VALIDATOR_DOC + ).define(METRIC_REPORTER_CLASSES_CONFIG, + Type.LIST, + "", + Importance.LOW, + METRIC_REPORTER_CLASSES_DOC ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksql-common/src/test/java/io/confluent/ksql/metrics/MetricCollectorsTest.java b/ksql-common/src/test/java/io/confluent/ksql/metrics/MetricCollectorsTest.java index 89ce53a12d99..8bf6605e12ee 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/metrics/MetricCollectorsTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/metrics/MetricCollectorsTest.java @@ -16,23 +16,29 @@ package io.confluent.ksql.metrics; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; - +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.record.TimestampType; import org.junit.After; import org.junit.Before; @@ -42,6 +48,8 @@ public class MetricCollectorsTest { private static final String TEST_TOPIC = "shared-topic"; + private KsqlConfig ksqlConfig = mock(KsqlConfig.class); + @Before public void setUp() { MetricCollectors.initialize(); @@ -60,6 +68,18 @@ public void shouldAggregateStats() { assertThat(aggregateMetrics.values().iterator().next().getValue(), equalTo(3.0)); } + @Test + public void shouldAddConfigurableReporters() { + final MetricsReporter mockReporter = mock(MetricsReporter.class); + assertThat(MetricCollectors.getMetrics().reporters().size(), equalTo(1)); + when(ksqlConfig.getConfiguredInstances(any(), any())) + .thenReturn(Collections.singletonList(mockReporter)); + + MetricCollectors.addConfigurableReporter(ksqlConfig); + final List reporters = MetricCollectors.getMetrics().reporters(); + assertThat(reporters, hasItem(mockReporter)); + } + @Test public void shouldKeepWorkingWhenDuplicateTopicConsumerIsRemoved() { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index e741a8d0ea5c..46af45bf7b69 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -34,6 +34,7 @@ import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; @@ -440,6 +441,8 @@ static KsqlRestApplication buildApplication( final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties()); + MetricCollectors.addConfigurableReporter(ksqlConfig); + final ProcessingLogConfig processingLogConfig = new ProcessingLogConfig(restConfig.getOriginals()); final ProcessingLogContext processingLogContext @@ -451,7 +454,7 @@ static KsqlRestApplication buildApplication( Thread.setDefaultUncaughtExceptionHandler( new KsqlUncaughtExceptionHandler(LogManager::shutdown)); } - + final HybridQueryIdGenerator hybridQueryIdGenerator = new HybridQueryIdGenerator();