From 556ca7838604e84ec3442173836551a5e8e6b807 Mon Sep 17 00:00:00 2001 From: Henry Cai Date: Mon, 25 May 2020 15:45:28 -0700 Subject: [PATCH] Switch to kafka-2.0.0 profile as the default And add the missing MicrometerMetricsCollector from https://github.com/pinterest/secor/pull/1342 --- pom.xml | 6 +- .../monitoring/MicroMeterMetricCollector.java | 75 +++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/pinterest/secor/monitoring/MicroMeterMetricCollector.java diff --git a/pom.xml b/pom.xml index 7517e858c..819870f43 100644 --- a/pom.xml +++ b/pom.xml @@ -804,6 +804,9 @@ kafka-2.0.0 + + true + @@ -862,9 +865,6 @@ - - true - diff --git a/src/main/java/com/pinterest/secor/monitoring/MicroMeterMetricCollector.java b/src/main/java/com/pinterest/secor/monitoring/MicroMeterMetricCollector.java new file mode 100644 index 000000000..78884687d --- /dev/null +++ b/src/main/java/com/pinterest/secor/monitoring/MicroMeterMetricCollector.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.pinterest.secor.monitoring; + +import com.pinterest.secor.common.SecorConfig; + +import com.google.common.util.concurrent.AtomicDouble; +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tag; +import io.micrometer.jmx.JmxConfig; +import io.micrometer.jmx.JmxMeterRegistry; +import io.micrometer.statsd.StatsdConfig; +import io.micrometer.statsd.StatsdMeterRegistry; + +import java.util.Collections; + +/** + * MicorMeter meters can integrate with many different metrics backend + * (StatsD/Promethus/Graphite/JMX etc, see https://micrometer.io/docs) + */ +public class MicroMeterMetricCollector implements MetricCollector { + @Override + public void initialize(SecorConfig config) { + if (config.getMicroMeterCollectorStatsdEnabled()) { + MeterRegistry statsdRegistry = + new StatsdMeterRegistry(StatsdConfig.DEFAULT, Clock.SYSTEM); + Metrics.addRegistry(statsdRegistry); + } + + if (config.getMicroMeterCollectorJmxEnabled()) { + MeterRegistry jmxRegistry = new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM); + Metrics.addRegistry(jmxRegistry); + } + } + + @Override + public void increment(String label, String topic) { + Metrics.counter(label, Collections.singletonList(Tag.of("topic", topic))).increment(); + } + + @Override + public void increment(String label, int delta, String topic) { + Metrics.counter(label, Collections.singletonList(Tag.of("topic", topic))).increment(delta); + } + + @Override + public void metric(String label, double value, String topic) { + Metrics.gauge(label, Collections.singletonList( + Tag.of("topic", topic)), new AtomicDouble(0)).set(value); + } + + @Override + public void gauge(String label, double value, String topic) { + Metrics.gauge(label, Collections.singletonList( + Tag.of("topic", topic)), new AtomicDouble(0)).set(value); + } +}