From 5dd63e265a5344bc6e1d5b6c048173af59d174ac Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Tue, 10 Jan 2017 18:56:48 +0900 Subject: [PATCH 01/28] Initial commit --- extensions-contrib/kafka-emitter/pom.xml | 91 ++++++++++++++++++ .../io/druid/emitter/kafka/KafkaEmitter.java | 96 +++++++++++++++++++ .../emitter/kafka/KafkaEmitterConfig.java | 62 ++++++++++++ .../emitter/kafka/KafkaEmitterModule.java | 54 +++++++++++ .../io.druid.initialization.DruidModule | 1 + .../emitter/kafka/KafkaEmitterConfigTest.java | 47 +++++++++ pom.xml | 1 + 7 files changed, 352 insertions(+) create mode 100644 extensions-contrib/kafka-emitter/pom.xml create mode 100644 extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java create mode 100644 extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java create mode 100644 extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java create mode 100644 extensions-contrib/kafka-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml new file mode 100644 index 000000000000..94f679ff143c --- /dev/null +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -0,0 +1,91 @@ + + + + + 4.0.0 + + + io.druid + druid + 0.9.3-SNAPSHOT + ../../pom.xml + + + io.druid.extensions.contrib + kafka-emitter + kafka-emitter + Druid emitter extension to support kafka + + + + org.apache.kafka + kafka-clients + 0.10.1.0 + + + io.druid + druid-common + ${project.parent.version} + provided + + + io.druid + druid-api + ${project.parent.version} + provided + + + com.metamx + emitter + provided + + + junit + junit + test + + + org.easymock + easymock + test + + + pl.pragmatists + JUnitParams + 1.0.4 + test + + + io.druid + druid-server + ${project.parent.version} + test-jar + test + + + io.druid + druid-processing + ${project.parent.version} + test-jar + test + + + diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java new file mode 100644 index 000000000000..67744c32559c --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -0,0 +1,96 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.emitter.core.Emitter; +import com.metamx.emitter.core.Event; + +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.common.logger.Logger; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.io.IOException; +import java.util.Properties; + +public class KafkaEmitter implements Emitter { + private static Logger log = new Logger(KafkaEmitter.class); + + private final static String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private final static String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + + private final KafkaEmitterConfig config; + private final Producer producer; + private final ObjectMapper jsonMapper; + + + public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { + this.config = config; + this.jsonMapper = jsonMapper; + this.producer = getKafkaProducer(config); + + } + + private Producer getKafkaProducer(KafkaEmitterConfig config) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIALIZER); + + return new KafkaProducer<>(props); + } + + @Override + @LifecycleStart + public void start() { + log.info("Starting Kafka Emitter."); + } + + @Override + public void emit(Event event) { + if(event instanceof ServiceMetricEvent) { + if(event == null) { + return; + } + } + try { + producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(event))); + } catch (JsonProcessingException e) { + log.warn(e, "Failed to generate json"); + } + } + + @Override + public void flush() throws IOException { + producer.flush(); + } + + @Override + @LifecycleStop + public void close() throws IOException { + producer.close(); + } +} diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java new file mode 100644 index 000000000000..f413a2418398 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -0,0 +1,62 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.kafka.clients.producer.ProducerConfig; + +public class KafkaEmitterConfig { + + @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + final private String bootstrapServers; + @JsonProperty + final private String topic; + + @Override + public boolean equals(Object o) { + if(this == o) { + return true; + } + if(!(o instanceof KafkaEmitterConfig)) { + return false; + } + + KafkaEmitterConfig that = (KafkaEmitterConfig) o; + + if(!getBootstrapServers().equals(that.getBootstrapServers())) { + return false; + } + return getTopic().equals(that.getTopic()); + } + @JsonCreator + public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, + @JsonProperty("topic") String topic) { + this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); + this.topic = Preconditions.checkNotNull(topic, "topic can not be null"); + } + + @JsonProperty + public String getBootstrapServers() { return bootstrapServers; } + + @JsonProperty + public String getTopic() { return topic; } +} diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java new file mode 100644 index 000000000000..42508b01be8f --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java @@ -0,0 +1,54 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import com.metamx.emitter.core.Emitter; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class KafkaEmitterModule implements DruidModule { + private static final String EMITTER_TYPE = "kafka"; + + @Override + public List getJacksonModules() { + return Collections.EMPTY_LIST; + } + + @Override + public void configure(Binder binder) { + JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, KafkaEmitterConfig.class); + } + + @Provides + @ManageLifecycle + @Named(EMITTER_TYPE) + public Emitter getEmitter(KafkaEmitterConfig kafkaEmitterConfig, ObjectMapper mapper) { + return new KafkaEmitter(kafkaEmitterConfig, mapper); + } +} diff --git a/extensions-contrib/kafka-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/kafka-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..7fccf54ec6c8 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.emitter.kafka.KafkaEmitterModule \ No newline at end of file diff --git a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java new file mode 100644 index 000000000000..99304aa01104 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class KafkaEmitterConfigTest { + private ObjectMapper mapper = new DefaultObjectMapper(); + + @Before + public void setUp() { + mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); + } + + @Test + public void testSerDeserKafkaEmitterConfig() throws IOException { + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "test"); + String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.reader(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); + Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); + } +} diff --git a/pom.xml b/pom.xml index 4be69663c1e9..01b01434daf5 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ extensions-contrib/ambari-metrics-emitter extensions-contrib/scan-query extensions-contrib/sqlserver-metadata-storage + extensions-contrib/kafka-emitter From a303a1fd0713303a5ca6e168e48240cd61bf7ab0 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Thu, 19 Jan 2017 18:19:59 +0900 Subject: [PATCH 02/28] Apply another config: clustername --- .../java/io/druid/emitter/kafka/KafkaEmitter.java | 11 ++++++++--- .../druid/emitter/kafka/KafkaEmitterConfig.java | 15 ++++++++++++--- .../emitter/kafka/KafkaEmitterConfigTest.java | 2 +- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 67744c32559c..b3e4791d49f4 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -19,7 +19,7 @@ package io.druid.emitter.kafka; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.emitter.core.Emitter; import com.metamx.emitter.core.Event; @@ -34,6 +34,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; public class KafkaEmitter implements Emitter { @@ -77,8 +79,11 @@ public void emit(Event event) { } } try { - producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(event))); - } catch (JsonProcessingException e) { + TypeReference> typeRef = new TypeReference>() {}; + HashMap result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); + result.put("clustername", config.getClusterName()); + producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result))); + } catch (Exception e) { log.warn(e, "Failed to generate json"); } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index f413a2418398..e12eab04fda6 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -30,6 +30,8 @@ public class KafkaEmitterConfig { final private String bootstrapServers; @JsonProperty final private String topic; + @JsonProperty("clustername") + final private String clusterName; @Override public boolean equals(Object o) { @@ -42,16 +44,20 @@ public boolean equals(Object o) { KafkaEmitterConfig that = (KafkaEmitterConfig) o; - if(!getBootstrapServers().equals(that.getBootstrapServers())) { + if(!getBootstrapServers().equals(that.getBootstrapServers()) + || !getTopic().equals(that.getTopic()) + || !getClusterName().equals(that.getClusterName())) { return false; + } else { + return true; } - return getTopic().equals(that.getTopic()); } @JsonCreator public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, - @JsonProperty("topic") String topic) { + @JsonProperty("topic") String topic, @JsonProperty("clustername") String clusterName) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); this.topic = Preconditions.checkNotNull(topic, "topic can not be null"); + this.clusterName = (clusterName == null || clusterName.isEmpty()) ? "NONAME" : clusterName; } @JsonProperty @@ -59,4 +65,7 @@ public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) @JsonProperty public String getTopic() { return topic; } + + @JsonProperty + public String getClusterName() { return clusterName; } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java index 99304aa01104..3b36d1204db4 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -38,7 +38,7 @@ public void setUp() { @Test public void testSerDeserKafkaEmitterConfig() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "test"); + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "test", "test"); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.reader(KafkaEmitterConfig.class) .readValue(kafkaEmitterConfigString); From e911b35070f43c522b6c4792d4287b57ffc8bb7e Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Fri, 20 Jan 2017 11:05:27 +0900 Subject: [PATCH 03/28] Rename variable --- .../src/main/java/io/druid/emitter/kafka/KafkaEmitter.java | 2 +- .../main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index b3e4791d49f4..8bd6a5dfca76 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -81,7 +81,7 @@ public void emit(Event event) { try { TypeReference> typeRef = new TypeReference>() {}; HashMap result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); - result.put("clustername", config.getClusterName()); + result.put("clusterName", config.getClusterName()); producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result))); } catch (Exception e) { log.warn(e, "Failed to generate json"); diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index e12eab04fda6..d60bb8083281 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -30,7 +30,7 @@ public class KafkaEmitterConfig { final private String bootstrapServers; @JsonProperty final private String topic; - @JsonProperty("clustername") + @JsonProperty final private String clusterName; @Override @@ -54,7 +54,7 @@ public boolean equals(Object o) { } @JsonCreator public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, - @JsonProperty("topic") String topic, @JsonProperty("clustername") String clusterName) { + @JsonProperty("topic") String topic, @JsonProperty("clusterName") String clusterName) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); this.topic = Preconditions.checkNotNull(topic, "topic can not be null"); this.clusterName = (clusterName == null || clusterName.isEmpty()) ? "NONAME" : clusterName; From 18126b9cb311bf378358aed0f55d655656d923ff Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Fri, 20 Jan 2017 11:57:07 +0900 Subject: [PATCH 04/28] Fix bug --- .../src/main/java/io/druid/emitter/kafka/KafkaEmitter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 8bd6a5dfca76..e4a295c97da9 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -79,8 +79,8 @@ public void emit(Event event) { } } try { - TypeReference> typeRef = new TypeReference>() {}; - HashMap result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); + TypeReference> typeRef = new TypeReference>() {}; + HashMap result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); result.put("clusterName", config.getClusterName()); producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result))); } catch (Exception e) { From 5ddbdb70d9061b4a9ea63b7956dae552fd24efce Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Thu, 2 Feb 2017 13:59:24 +0900 Subject: [PATCH 05/28] Add retry logic --- .../io/druid/emitter/kafka/KafkaEmitter.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index e4a295c97da9..d8a7f81682d9 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -28,10 +28,12 @@ import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.logger.Logger; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import java.io.IOException; import java.util.HashMap; @@ -48,12 +50,10 @@ public class KafkaEmitter implements Emitter { private final Producer producer; private final ObjectMapper jsonMapper; - public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; this.jsonMapper = jsonMapper; this.producer = getKafkaProducer(config); - } private Producer getKafkaProducer(KafkaEmitterConfig config) { @@ -72,7 +72,7 @@ public void start() { } @Override - public void emit(Event event) { + public void emit(final Event event) { if(event instanceof ServiceMetricEvent) { if(event == null) { return; @@ -82,7 +82,16 @@ public void emit(Event event) { TypeReference> typeRef = new TypeReference>() {}; HashMap result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); result.put("clusterName", config.getClusterName()); - producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result))); + producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result)), + new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if(exception != null) { + log.warn(exception, "Exception is occured! Retry."); + emit(event); + } + } + }); } catch (Exception e) { log.warn(e, "Failed to generate json"); } From 5e634d3a52b728b013c0848c9ae30b97ccbbe784 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Sat, 4 Feb 2017 10:55:44 +0900 Subject: [PATCH 06/28] Edit retry logic --- .../src/main/java/io/druid/emitter/kafka/KafkaEmitter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index d8a7f81682d9..20dd9b84465f 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -45,6 +45,7 @@ public class KafkaEmitter implements Emitter { private final static String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private final static String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private final static int DEFAULT_RETRIES = 3; private final KafkaEmitterConfig config; private final Producer producer; @@ -61,6 +62,7 @@ private Producer getKafkaProducer(KafkaEmitterConfig config) { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIALIZER); + props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); return new KafkaProducer<>(props); } @@ -88,7 +90,6 @@ public void emit(final Event event) { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null) { log.warn(exception, "Exception is occured! Retry."); - emit(event); } } }); From efc938d5f63638115baf6cea333cc04882ca8a62 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Sat, 4 Feb 2017 11:13:35 +0900 Subject: [PATCH 07/28] Upgrade kafka-clients version to the most recent release --- extensions-contrib/kafka-emitter/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml index 94f679ff143c..74b6409fde32 100644 --- a/extensions-contrib/kafka-emitter/pom.xml +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -38,7 +38,7 @@ org.apache.kafka kafka-clients - 0.10.1.0 + 0.10.1.1 io.druid From 86fcb78a54c10ce2bbd3418a582399618e394779 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Sat, 4 Feb 2017 12:16:46 +0900 Subject: [PATCH 08/28] Make callback single object --- .../io/druid/emitter/kafka/KafkaEmitter.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 20dd9b84465f..461376b2bcb0 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -49,12 +49,21 @@ public class KafkaEmitter implements Emitter { private final KafkaEmitterConfig config; private final Producer producer; + private final Callback producerCallback; private final ObjectMapper jsonMapper; public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; this.jsonMapper = jsonMapper; this.producer = getKafkaProducer(config); + this.producerCallback = new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if(e != null) { + log.warn(e, "Exception is occured! Retry."); + } + } + }; } private Producer getKafkaProducer(KafkaEmitterConfig config) { @@ -85,14 +94,7 @@ public void emit(final Event event) { HashMap result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); result.put("clusterName", config.getClusterName()); producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result)), - new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if(exception != null) { - log.warn(exception, "Exception is occured! Retry."); - } - } - }); + producerCallback); } catch (Exception e) { log.warn(e, "Failed to generate json"); } From 887b29311d7bf823ff86256496f32fb481f695f3 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Sat, 4 Feb 2017 12:53:20 +0900 Subject: [PATCH 09/28] Write documentation --- .../extensions-contrib/kafka-emitter.md | 24 +++++++++++++++++++ docs/content/development/extensions.md | 1 + 2 files changed, 25 insertions(+) create mode 100644 docs/content/development/extensions-contrib/kafka-emitter.md diff --git a/docs/content/development/extensions-contrib/kafka-emitter.md b/docs/content/development/extensions-contrib/kafka-emitter.md new file mode 100644 index 000000000000..300f1f60f2d9 --- /dev/null +++ b/docs/content/development/extensions-contrib/kafka-emitter.md @@ -0,0 +1,24 @@ +--- +layout: doc_page +--- + +# Kafka Emitter + +To use this extension, make sure to [include](../../operations/including-extensions.html) `kafka-emitter` extension. + +## Introduction + +This extension emits druid metrics to a Kafka(https://kafka.apache.org) directly with JSON format.
+Currently, Kafka has not only their nice ecosystem but also consumer API readily available. +So, If you currently use kafka, It's easy to integrate various tool or UI +to monitor the status of your druid cluster with this extension. + +## Configuration + +All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`. + +|property|description|required?|default| +|--------|-----------|---------|-------| +|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| +|`druid.emitter.kafka.topic`|Kafka topic name for emitter's emitting target.|yes|none| +|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 45c116d22b0d..779ea6b65d99 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -65,6 +65,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)| |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| +|kafka-emitter|Kafka metrics emitter|[link](../development/extensions-contrib/kafka-emitter.html)| |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)| |scan-query|Scan query|[link](../development/extensions-contrib/scan-query.html)| From e6fa868bf50a6a8aa9250054891ccaec7b1f839e Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Sat, 4 Feb 2017 14:26:47 +0900 Subject: [PATCH 10/28] Rewrite error message and emit logic --- .../io/druid/emitter/kafka/KafkaEmitter.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 461376b2bcb0..927deda67cef 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -19,8 +19,8 @@ package io.druid.emitter.kafka; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.metamx.emitter.core.Emitter; import com.metamx.emitter.core.Event; @@ -36,7 +36,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -60,7 +59,7 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null) { - log.warn(e, "Exception is occured! Retry."); + log.warn(e, "Exception occured!"); } } }; @@ -85,19 +84,17 @@ public void start() { @Override public void emit(final Event event) { if(event instanceof ServiceMetricEvent) { - if(event == null) { - return; + try { + Map result = ImmutableMap.builder() + .putAll(event.toMap()) + .put("clusterName", config.getClusterName()) + .build(); + producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result)), + producerCallback); + } catch (Exception e) { + log.warn(e, "Failed to generate json"); } } - try { - TypeReference> typeRef = new TypeReference>() {}; - HashMap result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); - result.put("clusterName", config.getClusterName()); - producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result)), - producerCallback); - } catch (Exception e) { - log.warn(e, "Failed to generate json"); - } } @Override From 80ccfb8d547f220e6674220c0d078d707724931a Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Mon, 6 Feb 2017 10:14:24 +0900 Subject: [PATCH 11/28] Handling AlertEvent --- .../extensions-contrib/kafka-emitter.md | 3 ++- .../io/druid/emitter/kafka/KafkaEmitter.java | 22 ++++++++++++------- .../emitter/kafka/KafkaEmitterConfig.java | 21 +++++++++++++----- .../emitter/kafka/KafkaEmitterConfigTest.java | 3 ++- 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/docs/content/development/extensions-contrib/kafka-emitter.md b/docs/content/development/extensions-contrib/kafka-emitter.md index 300f1f60f2d9..cc003e3aa611 100644 --- a/docs/content/development/extensions-contrib/kafka-emitter.md +++ b/docs/content/development/extensions-contrib/kafka-emitter.md @@ -20,5 +20,6 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter. |property|description|required?|default| |--------|-----------|---------|-------| |`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| -|`druid.emitter.kafka.topic`|Kafka topic name for emitter's emitting target.|yes|none| +|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none| +|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none| |`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 927deda67cef..91c0c9bf8bdb 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -19,11 +19,13 @@ package io.druid.emitter.kafka; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.metamx.emitter.core.Emitter; import com.metamx.emitter.core.Event; +import com.metamx.emitter.service.AlertEvent; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; @@ -83,15 +85,19 @@ public void start() { @Override public void emit(final Event event) { - if(event instanceof ServiceMetricEvent) { + if(event != null) { + Map result = ImmutableMap.builder() + .putAll(event.toMap()) + .put("clusterName", config.getClusterName()) + .build(); try { - Map result = ImmutableMap.builder() - .putAll(event.toMap()) - .put("clusterName", config.getClusterName()) - .build(); - producer.send(new ProducerRecord(config.getTopic(), jsonMapper.writeValueAsString(result)), - producerCallback); - } catch (Exception e) { + String resultJson = jsonMapper.writeValueAsString(result); + if(event instanceof ServiceMetricEvent) { + producer.send(new ProducerRecord(config.getMetricTopic(), resultJson), producerCallback); + } else if(event instanceof AlertEvent) { + producer.send(new ProducerRecord(config.getAlertTopic(), resultJson), producerCallback); + } + } catch (JsonProcessingException e) { log.warn(e, "Failed to generate json"); } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index d60bb8083281..cf53b78f96c4 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -28,8 +28,10 @@ public class KafkaEmitterConfig { @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) final private String bootstrapServers; - @JsonProperty - final private String topic; + @JsonProperty("metric.topic") + final private String metricTopic; + @JsonProperty("alert.topic") + final private String alertTopic; @JsonProperty final private String clusterName; @@ -45,7 +47,8 @@ public boolean equals(Object o) { KafkaEmitterConfig that = (KafkaEmitterConfig) o; if(!getBootstrapServers().equals(that.getBootstrapServers()) - || !getTopic().equals(that.getTopic()) + || !getMetricTopic().equals(that.getMetricTopic()) + || !getAlertTopic().equals(that.getAlertTopic()) || !getClusterName().equals(that.getClusterName())) { return false; } else { @@ -54,9 +57,12 @@ public boolean equals(Object o) { } @JsonCreator public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, - @JsonProperty("topic") String topic, @JsonProperty("clusterName") String clusterName) { + @JsonProperty("metric.topic") String metricTopic, + @JsonProperty("alert.topic") String alertTopic, + @JsonProperty("clusterName") String clusterName) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); - this.topic = Preconditions.checkNotNull(topic, "topic can not be null"); + this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); + this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); this.clusterName = (clusterName == null || clusterName.isEmpty()) ? "NONAME" : clusterName; } @@ -64,7 +70,10 @@ public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) public String getBootstrapServers() { return bootstrapServers; } @JsonProperty - public String getTopic() { return topic; } + public String getMetricTopic() { return metricTopic; } + + @JsonProperty + public String getAlertTopic() { return alertTopic; } @JsonProperty public String getClusterName() { return clusterName; } diff --git a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java index 3b36d1204db4..f16725d1ed01 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -38,7 +38,8 @@ public void setUp() { @Test public void testSerDeserKafkaEmitterConfig() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "test", "test"); + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", + "alertTest", "clusterNameTest"); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.reader(KafkaEmitterConfig.class) .readValue(kafkaEmitterConfigString); From 4bf0a69f1d0ed87fa7b84d60aef8e51dc128f473 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Tue, 7 Feb 2017 11:00:51 +0900 Subject: [PATCH 12/28] Override toString() --- .../io/druid/emitter/kafka/KafkaEmitterConfig.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index cf53b78f96c4..2e8566832985 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -77,4 +77,15 @@ public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) @JsonProperty public String getClusterName() { return clusterName; } + + @Override + public String toString() + { + return "KafkaEmitterConfig{" + + "bootstrap.servers='" + bootstrapServers + + ", metric.topic=" + metricTopic + + ", alert.topic=" + alertTopic + + ", clusterName=" + clusterName + + '}'; + } } From e635bbff54f617325cfcaa5527d2e368b4b87c80 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Tue, 7 Feb 2017 14:06:26 +0900 Subject: [PATCH 13/28] make clusterName more optional --- .../main/java/io/druid/emitter/kafka/KafkaEmitter.java | 10 ++++++---- .../io/druid/emitter/kafka/KafkaEmitterConfig.java | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 91c0c9bf8bdb..1df3a9f35d2f 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -86,10 +86,12 @@ public void start() { @Override public void emit(final Event event) { if(event != null) { - Map result = ImmutableMap.builder() - .putAll(event.toMap()) - .put("clusterName", config.getClusterName()) - .build(); + ImmutableMap.Builder resultBuilder = ImmutableMap.builder().putAll(event.toMap()); + if (config.getClusterName() != null) { + resultBuilder.put("clusterName", config.getClusterName()); + } + Map result = resultBuilder.build(); + try { String resultJson = jsonMapper.writeValueAsString(result); if(event instanceof ServiceMetricEvent) { diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index 2e8566832985..769722a98c7e 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -63,7 +63,7 @@ public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); - this.clusterName = (clusterName == null || clusterName.isEmpty()) ? "NONAME" : clusterName; + this.clusterName = (clusterName == null || clusterName.isEmpty()) ? null : clusterName; } @JsonProperty From 01800f27ed21d3c77d4b75ed0e84bf91cad78294 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Tue, 7 Feb 2017 14:15:03 +0900 Subject: [PATCH 14/28] bump up druid version --- extensions-contrib/kafka-emitter/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml index 74b6409fde32..5101f381718d 100644 --- a/extensions-contrib/kafka-emitter/pom.xml +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -25,7 +25,7 @@ io.druid druid - 0.9.3-SNAPSHOT + 0.10.0-SNAPSHOT ../../pom.xml From ea12d9288cfb14d3a0c7bda024f8c2794497c82a Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Wed, 8 Feb 2017 13:02:59 +0900 Subject: [PATCH 15/28] add producer.config option which make user can apply another optional config value of kafka producer --- .../extensions-contrib/kafka-emitter.md | 12 +++++++++++- .../java/io/druid/emitter/kafka/KafkaEmitter.java | 1 + .../io/druid/emitter/kafka/KafkaEmitterConfig.java | 14 ++++++++++++-- .../emitter/kafka/KafkaEmitterConfigTest.java | 5 ++++- 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/docs/content/development/extensions-contrib/kafka-emitter.md b/docs/content/development/extensions-contrib/kafka-emitter.md index cc003e3aa611..487a3e77b4f2 100644 --- a/docs/content/development/extensions-contrib/kafka-emitter.md +++ b/docs/content/development/extensions-contrib/kafka-emitter.md @@ -22,4 +22,14 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter. |`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| |`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none| |`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none| -|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| +|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none| +|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| + +### Example + +``` +druid.emitter.kafka.bootstrap.servers=hostname1:9092,hotname2:9092 +druid.emitter.kafka.metric.topic=druid-metric +druid.emitter.kafka.alert.topic=druid-alert +druid.emitter.kafka.producer.config={"max.block.ms":10000} +``` diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 1df3a9f35d2f..11d74cff1b89 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -73,6 +73,7 @@ private Producer getKafkaProducer(KafkaEmitterConfig config) { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIALIZER); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); + props.putAll(config.getKafkaProducerConfig()); return new KafkaProducer<>(props); } diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index 769722a98c7e..57eacabc5656 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -24,6 +24,8 @@ import com.google.common.base.Preconditions; import org.apache.kafka.clients.producer.ProducerConfig; +import java.util.Map; + public class KafkaEmitterConfig { @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) @@ -34,6 +36,8 @@ public class KafkaEmitterConfig { final private String alertTopic; @JsonProperty final private String clusterName; + @JsonProperty("producer.config") + private Map kafkaProducerConfig; @Override public boolean equals(Object o) { @@ -59,11 +63,13 @@ public boolean equals(Object o) { public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, @JsonProperty("metric.topic") String metricTopic, @JsonProperty("alert.topic") String alertTopic, - @JsonProperty("clusterName") String clusterName) { + @JsonProperty("clusterName") String clusterName, + @JsonProperty("producer.config") Map kafkaProducerConfig) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); - this.clusterName = (clusterName == null || clusterName.isEmpty()) ? null : clusterName; + this.clusterName = clusterName; + this.kafkaProducerConfig = kafkaProducerConfig; } @JsonProperty @@ -78,6 +84,9 @@ public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) @JsonProperty public String getClusterName() { return clusterName; } + @JsonProperty + public Map getKafkaProducerConfig() { return kafkaProducerConfig; } + @Override public String toString() { @@ -86,6 +95,7 @@ public String toString() ", metric.topic=" + metricTopic + ", alert.topic=" + alertTopic + ", clusterName=" + clusterName + + ", producer.config=" + kafkaProducerConfig.toString() + '}'; } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java index f16725d1ed01..6212247b4cf8 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import io.druid.jackson.DefaultObjectMapper; import org.junit.Assert; import org.junit.Before; @@ -39,7 +40,9 @@ public void setUp() { @Test public void testSerDeserKafkaEmitterConfig() throws IOException { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", - "alertTest", "clusterNameTest"); + "alertTest", "clusterNameTest", + ImmutableMap.builder() + .put("testKey", "testValue").build()); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.reader(KafkaEmitterConfig.class) .readValue(kafkaEmitterConfigString); From 8d1e80c0ddc73586754cfd0dcd9aa818380d5525 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Sun, 12 Feb 2017 17:20:22 +0900 Subject: [PATCH 16/28] remove potential blocking in emit() --- .../io/druid/emitter/kafka/KafkaEmitter.java | 44 ++++++++++++++----- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 11d74cff1b89..14be8a79a1c3 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -40,6 +40,11 @@ import java.io.IOException; import java.util.Map; import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public class KafkaEmitter implements Emitter { private static Logger log = new Logger(KafkaEmitter.class); @@ -50,21 +55,16 @@ public class KafkaEmitter implements Emitter { private final KafkaEmitterConfig config; private final Producer producer; - private final Callback producerCallback; private final ObjectMapper jsonMapper; + private final Queue> metricQueue; + private final ScheduledExecutorService scheduler; public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; this.jsonMapper = jsonMapper; this.producer = getKafkaProducer(config); - this.producerCallback = new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if(e != null) { - log.warn(e, "Exception occured!"); - } - } - }; + this.metricQueue = new ConcurrentLinkedQueue<>(); + this.scheduler = Executors.newScheduledThreadPool(1); } private Producer getKafkaProducer(KafkaEmitterConfig config) { @@ -81,9 +81,30 @@ private Producer getKafkaProducer(KafkaEmitterConfig config) { @Override @LifecycleStart public void start() { + scheduler.scheduleWithFixedDelay(new Runnable() { + public void run() { + sendToKafka(); + } + }, 10, 10, TimeUnit.SECONDS); log.info("Starting Kafka Emitter."); } + private void sendToKafka() { + ProducerRecord recordToSend; + while((recordToSend = metricQueue.poll()) != null) { + final ProducerRecord finalRecordToSend = recordToSend; + producer.send(recordToSend, new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if(e != null) { + log.warn(e, "Exception occured!"); + metricQueue.add(finalRecordToSend); + } + } + }); + } + } + @Override public void emit(final Event event) { if(event != null) { @@ -96,9 +117,9 @@ public void emit(final Event event) { try { String resultJson = jsonMapper.writeValueAsString(result); if(event instanceof ServiceMetricEvent) { - producer.send(new ProducerRecord(config.getMetricTopic(), resultJson), producerCallback); + metricQueue.add(new ProducerRecord(config.getMetricTopic(), resultJson)); } else if(event instanceof AlertEvent) { - producer.send(new ProducerRecord(config.getAlertTopic(), resultJson), producerCallback); + metricQueue.add(new ProducerRecord(config.getAlertTopic(), resultJson)); } } catch (JsonProcessingException e) { log.warn(e, "Failed to generate json"); @@ -114,6 +135,7 @@ public void flush() throws IOException { @Override @LifecycleStop public void close() throws IOException { + scheduler.shutdownNow(); producer.close(); } } From 2d9178ce0ac90ed02ff0379cb535e4d57f0c0fce Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Sun, 19 Feb 2017 22:24:18 +0900 Subject: [PATCH 17/28] using MemoryBoundLinkedBlockingQueue --- .../io/druid/emitter/kafka/KafkaEmitter.java | 88 ++++++++++++++----- .../emitter/kafka/KafkaEmitterConfig.java | 4 +- .../kafka/MemoryBoundLinkedBlockingQueue.java | 87 ++++++++++++++++++ 3 files changed, 155 insertions(+), 24 deletions(-) create mode 100644 extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 14be8a79a1c3..9c57127ff266 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableMap; import com.metamx.emitter.core.Emitter; import com.metamx.emitter.core.Event; - import com.metamx.emitter.service.AlertEvent; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.logger.Logger; @@ -40,11 +40,10 @@ import java.io.IOException; import java.util.Map; import java.util.Properties; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public class KafkaEmitter implements Emitter { private static Logger log = new Logger(KafkaEmitter.class); @@ -52,19 +51,28 @@ public class KafkaEmitter implements Emitter { private final static String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private final static String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private final static int DEFAULT_RETRIES = 3; + private long queueMemoryBound = 33554432L; // same with default value of kafka producer's buffer.memory + private final AtomicLong metricLost; + private final AtomicLong alertLost; + private final AtomicLong invalidLost; private final KafkaEmitterConfig config; private final Producer producer; private final ObjectMapper jsonMapper; - private final Queue> metricQueue; + private final MemoryBoundLinkedBlockingQueue metricQueue; + private final MemoryBoundLinkedBlockingQueue alertQueue; private final ScheduledExecutorService scheduler; public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; this.jsonMapper = jsonMapper; this.producer = getKafkaProducer(config); - this.metricQueue = new ConcurrentLinkedQueue<>(); - this.scheduler = Executors.newScheduledThreadPool(1); + this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.scheduler = Executors.newScheduledThreadPool(2); + this.metricLost = new AtomicLong(0L); + this.alertLost = new AtomicLong(0L); + this.invalidLost = new AtomicLong(0L); } private Producer getKafkaProducer(KafkaEmitterConfig config) { @@ -74,6 +82,8 @@ private Producer getKafkaProducer(KafkaEmitterConfig config) { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIALIZER); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); props.putAll(config.getKafkaProducerConfig()); + queueMemoryBound = (props.containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? + Long.parseLong(props.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG)) : queueMemoryBound); return new KafkaProducer<>(props); } @@ -83,25 +93,52 @@ private Producer getKafkaProducer(KafkaEmitterConfig config) { public void start() { scheduler.scheduleWithFixedDelay(new Runnable() { public void run() { - sendToKafka(); + sendMetricToKafka(); + } + }, 10, 10, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(new Runnable() { + public void run() { + sendAlertToKafka(); } }, 10, 10, TimeUnit.SECONDS); log.info("Starting Kafka Emitter."); } - private void sendToKafka() { - ProducerRecord recordToSend; - while((recordToSend = metricQueue.poll()) != null) { - final ProducerRecord finalRecordToSend = recordToSend; - producer.send(recordToSend, new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if(e != null) { - log.warn(e, "Exception occured!"); - metricQueue.add(finalRecordToSend); + private void sendMetricToKafka() { + sendToKafka(config.getMetricTopic(), metricQueue); + } + + private void sendAlertToKafka() { + sendToKafka(config.getAlertTopic(), alertQueue); + } + + private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue) { + ObjectContainer objectToSend; + try { + while((objectToSend = recordQueue.take()) != null) { + final ObjectContainer finalObjectToSend = objectToSend; + producer.send(new ProducerRecord(topic, finalObjectToSend.getData()), new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if(e != null) { + log.warn(e, "Exception occured!"); + if(topic.equals(config.getMetricTopic())) { + if(!metricQueue.offer(finalObjectToSend)) { + log.warn("metricQueue is now memory bounded! metricLost: " + metricLost.incrementAndGet()); + } + } else if(topic.equals(config.getAlertTopic())) { + if(!alertQueue.offer(finalObjectToSend)) { + log.warn("alertQueue is now memory bounded! alertLost: " + alertLost.incrementAndGet()); + } + } else { + log.warn("Invalid topic name! invalidLost: " + invalidLost.incrementAndGet()); + } + } } - } - }); + }); + } + } catch (InterruptedException e) { + log.warn(e, "Failed to take record from queue!"); } } @@ -116,13 +153,20 @@ public void emit(final Event event) { try { String resultJson = jsonMapper.writeValueAsString(result); + ObjectContainer objectContainer = new ObjectContainer<>(resultJson, resultJson.getBytes().length); if(event instanceof ServiceMetricEvent) { - metricQueue.add(new ProducerRecord(config.getMetricTopic(), resultJson)); + if(!metricQueue.offer(objectContainer)) { + log.warn("metricQueue is now memory bounded! metricLost: " + metricLost.incrementAndGet()); + } } else if(event instanceof AlertEvent) { - metricQueue.add(new ProducerRecord(config.getAlertTopic(), resultJson)); + if(!alertQueue.offer(objectContainer)) { + log.warn("alertQueue is now memory bounded! alertLost: " + alertLost.incrementAndGet()); + } + } else { + log.warn("Unsupported event type! invalidLost: " + invalidLost.incrementAndGet()); } } catch (JsonProcessingException e) { - log.warn(e, "Failed to generate json"); + log.warn(e, "Failed to generate json! invalidLost: " + invalidLost.incrementAndGet()); } } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index 57eacabc5656..7e0fc3a417a8 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -95,7 +95,7 @@ public String toString() ", metric.topic=" + metricTopic + ", alert.topic=" + alertTopic + ", clusterName=" + clusterName + - ", producer.config=" + kafkaProducerConfig.toString() + - '}'; + ", producer.config={" + kafkaProducerConfig.toString() + + "}}"; } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java new file mode 100644 index 000000000000..c16cbb51c452 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java @@ -0,0 +1,87 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Similar to LinkedBlockingQueue but can be bounded by the total byte size of the items present in the queue + * rather than number of items. + */ +public class MemoryBoundLinkedBlockingQueue { + private final long memoryBound; + private AtomicLong currentMemory; + private LinkedBlockingQueue> queue; + + public MemoryBoundLinkedBlockingQueue(long memoryBound) { + this.memoryBound = memoryBound; + this.currentMemory = new AtomicLong(0L); + this.queue = new LinkedBlockingQueue<>(); + } + + // returns true/false depending on whether item was added or not + public boolean offer(ObjectContainer item) { + final long itemLength = item.getSize(); + + if (currentMemory.addAndGet(itemLength) <= memoryBound) { + if (queue.offer(item)) { + return true; + } + } + currentMemory.addAndGet(-itemLength); + return false; + } + + // blocks until at least one item is available to take + public ObjectContainer take() throws InterruptedException { + final ObjectContainer ret = queue.take(); + currentMemory.addAndGet(-ret.getSize()); + return ret; + } + + public long getAvailableBuffer() { + return memoryBound - currentMemory.get(); + } + + public int size() { + return queue.size(); + } + + public static class ObjectContainer { + private T data; + private long size; + + ObjectContainer(T data, long size) { + this.data = data; + this.size = size; + } + + public T getData() + { + return data; + } + + public long getSize() + { + return size; + } + } +} From e4fb6ebdfa97c87c2a58c4be10dd95f46f24907b Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Sun, 19 Feb 2017 22:32:55 +0900 Subject: [PATCH 18/28] Fixing coding convention --- .../emitter/kafka/KafkaEmitterConfig.java | 23 +++++++++++++------ .../kafka/MemoryBoundLinkedBlockingQueue.java | 6 ++--- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index 7e0fc3a417a8..32e3a01bcfe5 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -73,23 +73,32 @@ public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) } @JsonProperty - public String getBootstrapServers() { return bootstrapServers; } + public String getBootstrapServers() { + return bootstrapServers; + } @JsonProperty - public String getMetricTopic() { return metricTopic; } + public String getMetricTopic() { + return metricTopic; + } @JsonProperty - public String getAlertTopic() { return alertTopic; } + public String getAlertTopic() { + return alertTopic; + } @JsonProperty - public String getClusterName() { return clusterName; } + public String getClusterName() { + return clusterName; + } @JsonProperty - public Map getKafkaProducerConfig() { return kafkaProducerConfig; } + public Map getKafkaProducerConfig() { + return kafkaProducerConfig; + } @Override - public String toString() - { + public String toString() { return "KafkaEmitterConfig{" + "bootstrap.servers='" + bootstrapServers + ", metric.topic=" + metricTopic + diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java index c16cbb51c452..e4ed7717f111 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java @@ -74,13 +74,11 @@ public static class ObjectContainer { this.size = size; } - public T getData() - { + public T getData() { return data; } - public long getSize() - { + public long getSize() { return size; } } From 4d316ae483d3ddae728ed1a24fbf9d7d74d11875 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Mon, 27 Feb 2017 13:46:33 +0900 Subject: [PATCH 19/28] Remove logging every exception and just increment counting --- .../io/druid/emitter/kafka/KafkaEmitter.java | 55 ++++++++++--------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 9c57127ff266..059c09ad787f 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -58,18 +58,33 @@ public class KafkaEmitter implements Emitter { private final KafkaEmitterConfig config; private final Producer producer; + private final Callback producerCallback; private final ObjectMapper jsonMapper; private final MemoryBoundLinkedBlockingQueue metricQueue; private final MemoryBoundLinkedBlockingQueue alertQueue; private final ScheduledExecutorService scheduler; - public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { + public KafkaEmitter(final KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; this.jsonMapper = jsonMapper; this.producer = getKafkaProducer(config); + this.producerCallback = new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if(e != null) { + if(recordMetadata.topic().equals(config.getMetricTopic())) { + metricLost.incrementAndGet(); + } else if (recordMetadata.topic().equals(config.getAlertTopic())) { + alertLost.incrementAndGet(); + } else { + invalidLost.incrementAndGet(); + } + } + } + }; this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); - this.scheduler = Executors.newScheduledThreadPool(2); + this.scheduler = Executors.newScheduledThreadPool(3); this.metricLost = new AtomicLong(0L); this.alertLost = new AtomicLong(0L); this.invalidLost = new AtomicLong(0L); @@ -101,6 +116,13 @@ public void run() { sendAlertToKafka(); } }, 10, 10, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + log.info("Message lost counter: metricLost=[%d] / alertLost=[%d] / invalidLost=[%d]", + metricLost.get(), alertLost.get(), invalidLost.get()); + } + }, 5, 5, TimeUnit.MINUTES); log.info("Starting Kafka Emitter."); } @@ -116,26 +138,7 @@ private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue objectToSend; try { while((objectToSend = recordQueue.take()) != null) { - final ObjectContainer finalObjectToSend = objectToSend; - producer.send(new ProducerRecord(topic, finalObjectToSend.getData()), new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if(e != null) { - log.warn(e, "Exception occured!"); - if(topic.equals(config.getMetricTopic())) { - if(!metricQueue.offer(finalObjectToSend)) { - log.warn("metricQueue is now memory bounded! metricLost: " + metricLost.incrementAndGet()); - } - } else if(topic.equals(config.getAlertTopic())) { - if(!alertQueue.offer(finalObjectToSend)) { - log.warn("alertQueue is now memory bounded! alertLost: " + alertLost.incrementAndGet()); - } - } else { - log.warn("Invalid topic name! invalidLost: " + invalidLost.incrementAndGet()); - } - } - } - }); + producer.send(new ProducerRecord(topic, objectToSend.getData()), producerCallback); } } catch (InterruptedException e) { log.warn(e, "Failed to take record from queue!"); @@ -156,17 +159,17 @@ public void emit(final Event event) { ObjectContainer objectContainer = new ObjectContainer<>(resultJson, resultJson.getBytes().length); if(event instanceof ServiceMetricEvent) { if(!metricQueue.offer(objectContainer)) { - log.warn("metricQueue is now memory bounded! metricLost: " + metricLost.incrementAndGet()); + metricLost.incrementAndGet(); } } else if(event instanceof AlertEvent) { if(!alertQueue.offer(objectContainer)) { - log.warn("alertQueue is now memory bounded! alertLost: " + alertLost.incrementAndGet()); + alertLost.incrementAndGet(); } } else { - log.warn("Unsupported event type! invalidLost: " + invalidLost.incrementAndGet()); + invalidLost.incrementAndGet(); } } catch (JsonProcessingException e) { - log.warn(e, "Failed to generate json! invalidLost: " + invalidLost.incrementAndGet()); + invalidLost.incrementAndGet(); } } } From c7e8fce45c47d239e288f0a9a9aaf1dbf92aa732 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Mon, 27 Feb 2017 13:55:20 +0900 Subject: [PATCH 20/28] refactoring --- .../io/druid/emitter/kafka/KafkaEmitter.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 059c09ad787f..a27fadfa46cd 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -64,11 +64,21 @@ public class KafkaEmitter implements Emitter { private final MemoryBoundLinkedBlockingQueue alertQueue; private final ScheduledExecutorService scheduler; - public KafkaEmitter(final KafkaEmitterConfig config, ObjectMapper jsonMapper) { + public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; this.jsonMapper = jsonMapper; - this.producer = getKafkaProducer(config); - this.producerCallback = new Callback() { + this.producer = setKafkaProducer(); + this.producerCallback = setProducerCallback(); + this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.scheduler = Executors.newScheduledThreadPool(3); + this.metricLost = new AtomicLong(0L); + this.alertLost = new AtomicLong(0L); + this.invalidLost = new AtomicLong(0L); + } + + private Callback setProducerCallback() { + return new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null) { @@ -82,15 +92,9 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { } } }; - this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); - this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); - this.scheduler = Executors.newScheduledThreadPool(3); - this.metricLost = new AtomicLong(0L); - this.alertLost = new AtomicLong(0L); - this.invalidLost = new AtomicLong(0L); } - private Producer getKafkaProducer(KafkaEmitterConfig config) { + private Producer setKafkaProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); From e1706f73dee9cf0b2e54956358390ad0c39b3e60 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Mon, 27 Feb 2017 15:10:09 +0900 Subject: [PATCH 21/28] trivial modification --- .../src/main/java/io/druid/emitter/kafka/KafkaEmitter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index a27fadfa46cd..26416a88d7ba 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -123,7 +123,7 @@ public void run() { scheduler.scheduleWithFixedDelay(new Runnable() { @Override public void run() { - log.info("Message lost counter: metricLost=[%d] / alertLost=[%d] / invalidLost=[%d]", + log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]", metricLost.get(), alertLost.get(), invalidLost.get()); } }, 5, 5, TimeUnit.MINUTES); From 6b383ea5c5a8bb73badf7f7a5aa559cf7cffe074 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Wed, 1 Mar 2017 18:05:21 +0900 Subject: [PATCH 22/28] logging when callback has exception --- .../src/main/java/io/druid/emitter/kafka/KafkaEmitter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 26416a88d7ba..1255c27e6d92 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -82,6 +82,7 @@ private Callback setProducerCallback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null) { + log.debug("Event send failed [%s]", e.getMessage()); if(recordMetadata.topic().equals(config.getMetricTopic())) { metricLost.incrementAndGet(); } else if (recordMetadata.topic().equals(config.getAlertTopic())) { From 14081240c774a010da17d9854347db751eb593bf Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Wed, 1 Mar 2017 18:06:27 +0900 Subject: [PATCH 23/28] Replace kafka-clients 0.10.1.1 with 0.10.2.0 --- extensions-contrib/kafka-emitter/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml index 5101f381718d..ed74ca02f56a 100644 --- a/extensions-contrib/kafka-emitter/pom.xml +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -38,7 +38,7 @@ org.apache.kafka kafka-clients - 0.10.1.1 + 0.10.2.0 io.druid From d2058d5980ce2c8f8f499b5fb7f321793400e48e Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Thu, 16 Mar 2017 17:35:54 +0900 Subject: [PATCH 24/28] Resolve the problem related of classloader --- .../main/java/io/druid/emitter/kafka/KafkaEmitter.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 1255c27e6d92..88a591f31147 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -36,6 +36,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; import java.io.IOException; import java.util.Map; @@ -48,8 +49,6 @@ public class KafkaEmitter implements Emitter { private static Logger log = new Logger(KafkaEmitter.class); - private final static String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private final static String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private final static int DEFAULT_RETRIES = 3; private long queueMemoryBound = 33554432L; // same with default value of kafka producer's buffer.memory private final AtomicLong metricLost; @@ -96,10 +95,12 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { } private Producer setKafkaProducer() { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIALIZER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); props.putAll(config.getKafkaProducerConfig()); queueMemoryBound = (props.containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? From ecb5c6906a36ce79fe40a66fb68b1be573cce532 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Fri, 17 Mar 2017 13:35:04 +0900 Subject: [PATCH 25/28] adopt try statement --- .../io/druid/emitter/kafka/KafkaEmitter.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 88a591f31147..f5e63a77d01a 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -95,18 +95,23 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { } private Producer setKafkaProducer() { - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); - props.putAll(config.getKafkaProducerConfig()); - queueMemoryBound = (props.containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? - Long.parseLong(props.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG)) : queueMemoryBound); - - return new KafkaProducer<>(props); + ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); + props.putAll(config.getKafkaProducerConfig()); + queueMemoryBound = (props.containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? + Long.parseLong(props.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG)) : queueMemoryBound); + + return new KafkaProducer<>(props); + } finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } } @Override From 3a4ca9fef20296d0e61c0917b4900fed5a430ce1 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Mon, 27 Mar 2017 11:33:19 +0900 Subject: [PATCH 26/28] code reformatting --- .../extensions-contrib/kafka-emitter.md | 8 +- .../io/druid/emitter/kafka/KafkaEmitter.java | 94 ++++++++++------ .../emitter/kafka/KafkaEmitterConfig.java | 106 +++++++++++------- .../emitter/kafka/KafkaEmitterModule.java | 12 +- .../kafka/MemoryBoundLinkedBlockingQueue.java | 30 +++-- .../emitter/kafka/KafkaEmitterConfigTest.java | 14 ++- 6 files changed, 169 insertions(+), 95 deletions(-) diff --git a/docs/content/development/extensions-contrib/kafka-emitter.md b/docs/content/development/extensions-contrib/kafka-emitter.md index 487a3e77b4f2..fb1371db5c75 100644 --- a/docs/content/development/extensions-contrib/kafka-emitter.md +++ b/docs/content/development/extensions-contrib/kafka-emitter.md @@ -8,10 +8,10 @@ To use this extension, make sure to [include](../../operations/including-extensi ## Introduction -This extension emits druid metrics to a Kafka(https://kafka.apache.org) directly with JSON format.
+This extension emits Druid metrics to a [Kafka](https://kafka.apache.org) directly with JSON format.
Currently, Kafka has not only their nice ecosystem but also consumer API readily available. -So, If you currently use kafka, It's easy to integrate various tool or UI -to monitor the status of your druid cluster with this extension. +So, If you currently use Kafka, It's easy to integrate various tool or UI +to monitor the status of your Druid cluster with this extension. ## Configuration @@ -28,7 +28,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter. ### Example ``` -druid.emitter.kafka.bootstrap.servers=hostname1:9092,hotname2:9092 +druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092 druid.emitter.kafka.metric.topic=druid-metric druid.emitter.kafka.alert.topic=druid-alert druid.emitter.kafka.producer.config={"max.block.ms":10000} diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index f5e63a77d01a..9d67dc9c78ce 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -46,11 +46,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -public class KafkaEmitter implements Emitter { +public class KafkaEmitter implements Emitter +{ private static Logger log = new Logger(KafkaEmitter.class); private final static int DEFAULT_RETRIES = 3; - private long queueMemoryBound = 33554432L; // same with default value of kafka producer's buffer.memory + private final static long DEFAULT_QUEUE_MEMORY_BOUND = 33554432L; // same with default value of kafka producer's buffer.memory + private final long queueMemoryBound; private final AtomicLong metricLost; private final AtomicLong alertLost; private final AtomicLong invalidLost; @@ -63,11 +65,14 @@ public class KafkaEmitter implements Emitter { private final MemoryBoundLinkedBlockingQueue alertQueue; private final ScheduledExecutorService scheduler; - public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { + public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) + { this.config = config; + this.config.getKafkaProducerConfig().containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? this.jsonMapper = jsonMapper; this.producer = setKafkaProducer(); this.producerCallback = setProducerCallback(); + this.queueMemoryBound = config.getKafkaProducerConfig().get this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.scheduler = Executors.newScheduledThreadPool(3); @@ -76,13 +81,16 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.invalidLost = new AtomicLong(0L); } - private Callback setProducerCallback() { - return new Callback() { + private Callback setProducerCallback() + { + return new Callback() + { @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if(e != null) { + public void onCompletion(RecordMetadata recordMetadata, Exception e) + { + if (e != null) { log.debug("Event send failed [%s]", e.getMessage()); - if(recordMetadata.topic().equals(config.getMetricTopic())) { + if (recordMetadata.topic().equals(config.getMetricTopic())) { metricLost.incrementAndGet(); } else if (recordMetadata.topic().equals(config.getAlertTopic())) { alertLost.incrementAndGet(); @@ -94,7 +102,8 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { }; } - private Producer setKafkaProducer() { + private Producer setKafkaProducer() + { ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); @@ -105,60 +114,74 @@ private Producer setKafkaProducer() { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); props.putAll(config.getKafkaProducerConfig()); - queueMemoryBound = (props.containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? - Long.parseLong(props.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG)) : queueMemoryBound); +// queueMemoryBound = (props.containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? +// Long.parseLong(props.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG)) : queueMemoryBound); return new KafkaProducer<>(props); - } finally { + } + finally { Thread.currentThread().setContextClassLoader(currCtxCl); } } @Override @LifecycleStart - public void start() { - scheduler.scheduleWithFixedDelay(new Runnable() { - public void run() { + public void start() + { + scheduler.scheduleWithFixedDelay(new Runnable() + { + public void run() + { sendMetricToKafka(); } }, 10, 10, TimeUnit.SECONDS); - scheduler.scheduleWithFixedDelay(new Runnable() { - public void run() { + scheduler.scheduleWithFixedDelay(new Runnable() + { + public void run() + { sendAlertToKafka(); } }, 10, 10, TimeUnit.SECONDS); - scheduler.scheduleWithFixedDelay(new Runnable() { + scheduler.scheduleWithFixedDelay(new Runnable() + { @Override - public void run() { + public void run() + { log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]", - metricLost.get(), alertLost.get(), invalidLost.get()); + metricLost.get(), alertLost.get(), invalidLost.get() + ); } }, 5, 5, TimeUnit.MINUTES); log.info("Starting Kafka Emitter."); } - private void sendMetricToKafka() { + private void sendMetricToKafka() + { sendToKafka(config.getMetricTopic(), metricQueue); } - private void sendAlertToKafka() { + private void sendAlertToKafka() + { sendToKafka(config.getAlertTopic(), alertQueue); } - private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue) { + private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue) + { ObjectContainer objectToSend; try { - while((objectToSend = recordQueue.take()) != null) { + while ((objectToSend = recordQueue.take()) != null) { producer.send(new ProducerRecord(topic, objectToSend.getData()), producerCallback); } - } catch (InterruptedException e) { + } + catch (InterruptedException e) { log.warn(e, "Failed to take record from queue!"); } } @Override - public void emit(final Event event) { - if(event != null) { + public void emit(final Event event) + { + if (event != null) { ImmutableMap.Builder resultBuilder = ImmutableMap.builder().putAll(event.toMap()); if (config.getClusterName() != null) { resultBuilder.put("clusterName", config.getClusterName()); @@ -168,31 +191,34 @@ public void emit(final Event event) { try { String resultJson = jsonMapper.writeValueAsString(result); ObjectContainer objectContainer = new ObjectContainer<>(resultJson, resultJson.getBytes().length); - if(event instanceof ServiceMetricEvent) { - if(!metricQueue.offer(objectContainer)) { + if (event instanceof ServiceMetricEvent) { + if (!metricQueue.offer(objectContainer)) { metricLost.incrementAndGet(); } - } else if(event instanceof AlertEvent) { - if(!alertQueue.offer(objectContainer)) { + } else if (event instanceof AlertEvent) { + if (!alertQueue.offer(objectContainer)) { alertLost.incrementAndGet(); } } else { invalidLost.incrementAndGet(); } - } catch (JsonProcessingException e) { + } + catch (JsonProcessingException e) { invalidLost.incrementAndGet(); } } } @Override - public void flush() throws IOException { + public void flush() throws IOException + { producer.flush(); } @Override @LifecycleStop - public void close() throws IOException { + public void close() throws IOException + { scheduler.shutdownNow(); producer.close(); } diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index 32e3a01bcfe5..a6c3882190f5 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -26,7 +26,8 @@ import java.util.Map; -public class KafkaEmitterConfig { +public class KafkaEmitterConfig +{ @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) final private String bootstrapServers; @@ -39,32 +40,15 @@ public class KafkaEmitterConfig { @JsonProperty("producer.config") private Map kafkaProducerConfig; - @Override - public boolean equals(Object o) { - if(this == o) { - return true; - } - if(!(o instanceof KafkaEmitterConfig)) { - return false; - } - - KafkaEmitterConfig that = (KafkaEmitterConfig) o; - - if(!getBootstrapServers().equals(that.getBootstrapServers()) - || !getMetricTopic().equals(that.getMetricTopic()) - || !getAlertTopic().equals(that.getAlertTopic()) - || !getClusterName().equals(that.getClusterName())) { - return false; - } else { - return true; - } - } @JsonCreator - public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, - @JsonProperty("metric.topic") String metricTopic, - @JsonProperty("alert.topic") String alertTopic, - @JsonProperty("clusterName") String clusterName, - @JsonProperty("producer.config") Map kafkaProducerConfig) { + public KafkaEmitterConfig( + @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, + @JsonProperty("metric.topic") String metricTopic, + @JsonProperty("alert.topic") String alertTopic, + @JsonProperty("clusterName") String clusterName, + @JsonProperty("producer.config") Map kafkaProducerConfig + ) + { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); @@ -73,38 +57,84 @@ public KafkaEmitterConfig(@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) } @JsonProperty - public String getBootstrapServers() { + public String getBootstrapServers() + { return bootstrapServers; } @JsonProperty - public String getMetricTopic() { + public String getMetricTopic() + { return metricTopic; } @JsonProperty - public String getAlertTopic() { + public String getAlertTopic() + { return alertTopic; } @JsonProperty - public String getClusterName() { + public String getClusterName() + { return clusterName; } @JsonProperty - public Map getKafkaProducerConfig() { + public Map getKafkaProducerConfig() + { return kafkaProducerConfig; } @Override - public String toString() { + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + KafkaEmitterConfig that = (KafkaEmitterConfig) o; + + if (!getBootstrapServers().equals(that.getBootstrapServers())) { + return false; + } + if (!getMetricTopic().equals(that.getMetricTopic())) { + return false; + } + if (!getAlertTopic().equals(that.getAlertTopic())) { + return false; + } + if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { + return false; + } + return getKafkaProducerConfig() != null + ? getKafkaProducerConfig().equals(that.getKafkaProducerConfig()) + : that.getKafkaProducerConfig() == null; + } + + @Override + public int hashCode() + { + int result = getBootstrapServers().hashCode(); + result = 31 * result + getMetricTopic().hashCode(); + result = 31 * result + getAlertTopic().hashCode(); + result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); + result = 31 * result + (getKafkaProducerConfig() != null ? getKafkaProducerConfig().hashCode() : 0); + return result; + } + + @Override + public String toString() + { return "KafkaEmitterConfig{" + - "bootstrap.servers='" + bootstrapServers + - ", metric.topic=" + metricTopic + - ", alert.topic=" + alertTopic + - ", clusterName=" + clusterName + - ", producer.config={" + kafkaProducerConfig.toString() + - "}}"; + "bootstrapServers='" + bootstrapServers + '\'' + + ", metricTopic='" + metricTopic + '\'' + + ", alertTopic='" + alertTopic + '\'' + + ", clusterName='" + clusterName + '\'' + + ", kafkaProducerConfig=" + kafkaProducerConfig + + '}'; } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java index 42508b01be8f..0608b1f0d898 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java @@ -32,23 +32,27 @@ import java.util.Collections; import java.util.List; -public class KafkaEmitterModule implements DruidModule { +public class KafkaEmitterModule implements DruidModule +{ private static final String EMITTER_TYPE = "kafka"; @Override - public List getJacksonModules() { + public List getJacksonModules() + { return Collections.EMPTY_LIST; } @Override - public void configure(Binder binder) { + public void configure(Binder binder) + { JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, KafkaEmitterConfig.class); } @Provides @ManageLifecycle @Named(EMITTER_TYPE) - public Emitter getEmitter(KafkaEmitterConfig kafkaEmitterConfig, ObjectMapper mapper) { + public Emitter getEmitter(KafkaEmitterConfig kafkaEmitterConfig, ObjectMapper mapper) + { return new KafkaEmitter(kafkaEmitterConfig, mapper); } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java index e4ed7717f111..d34469574dae 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java @@ -26,19 +26,22 @@ * Similar to LinkedBlockingQueue but can be bounded by the total byte size of the items present in the queue * rather than number of items. */ -public class MemoryBoundLinkedBlockingQueue { +public class MemoryBoundLinkedBlockingQueue +{ private final long memoryBound; private AtomicLong currentMemory; private LinkedBlockingQueue> queue; - public MemoryBoundLinkedBlockingQueue(long memoryBound) { + public MemoryBoundLinkedBlockingQueue(long memoryBound) + { this.memoryBound = memoryBound; this.currentMemory = new AtomicLong(0L); this.queue = new LinkedBlockingQueue<>(); } // returns true/false depending on whether item was added or not - public boolean offer(ObjectContainer item) { + public boolean offer(ObjectContainer item) + { final long itemLength = item.getSize(); if (currentMemory.addAndGet(itemLength) <= memoryBound) { @@ -51,34 +54,41 @@ public boolean offer(ObjectContainer item) { } // blocks until at least one item is available to take - public ObjectContainer take() throws InterruptedException { + public ObjectContainer take() throws InterruptedException + { final ObjectContainer ret = queue.take(); currentMemory.addAndGet(-ret.getSize()); return ret; } - public long getAvailableBuffer() { + public long getAvailableBuffer() + { return memoryBound - currentMemory.get(); } - public int size() { + public int size() + { return queue.size(); } - public static class ObjectContainer { + public static class ObjectContainer + { private T data; private long size; - ObjectContainer(T data, long size) { + ObjectContainer(T data, long size) + { this.data = data; this.size = size; } - public T getData() { + public T getData() + { return data; } - public long getSize() { + public long getSize() + { return size; } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java index 6212247b4cf8..37a8c1133486 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -29,20 +29,24 @@ import java.io.IOException; -public class KafkaEmitterConfigTest { +public class KafkaEmitterConfigTest +{ private ObjectMapper mapper = new DefaultObjectMapper(); @Before - public void setUp() { + public void setUp() + { mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); } @Test - public void testSerDeserKafkaEmitterConfig() throws IOException { + public void testSerDeserKafkaEmitterConfig() throws IOException + { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", "clusterNameTest", - ImmutableMap.builder() - .put("testKey", "testValue").build()); + ImmutableMap.builder() + .put("testKey", "testValue").build() + ); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.reader(KafkaEmitterConfig.class) .readValue(kafkaEmitterConfigString); From 8be4c4cd29d113f161ab40fdfe73aa79756fb977 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Mon, 27 Mar 2017 12:59:04 +0900 Subject: [PATCH 27/28] make variables final --- .../io/druid/emitter/kafka/KafkaEmitter.java | 66 ++++++------------- .../kafka/MemoryBoundLinkedBlockingQueue.java | 4 +- 2 files changed, 22 insertions(+), 48 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java index 9d67dc9c78ce..74391bd1ba65 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -35,7 +35,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.io.IOException; @@ -51,8 +50,6 @@ public class KafkaEmitter implements Emitter private static Logger log = new Logger(KafkaEmitter.class); private final static int DEFAULT_RETRIES = 3; - private final static long DEFAULT_QUEUE_MEMORY_BOUND = 33554432L; // same with default value of kafka producer's buffer.memory - private final long queueMemoryBound; private final AtomicLong metricLost; private final AtomicLong alertLost; private final AtomicLong invalidLost; @@ -68,11 +65,12 @@ public class KafkaEmitter implements Emitter public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; - this.config.getKafkaProducerConfig().containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? this.jsonMapper = jsonMapper; this.producer = setKafkaProducer(); this.producerCallback = setProducerCallback(); - this.queueMemoryBound = config.getKafkaProducerConfig().get + // same with kafka producer's buffer.memory + long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig() + .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")); this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.scheduler = Executors.newScheduledThreadPool(3); @@ -83,20 +81,15 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) private Callback setProducerCallback() { - return new Callback() - { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) - { - if (e != null) { - log.debug("Event send failed [%s]", e.getMessage()); - if (recordMetadata.topic().equals(config.getMetricTopic())) { - metricLost.incrementAndGet(); - } else if (recordMetadata.topic().equals(config.getAlertTopic())) { - alertLost.incrementAndGet(); - } else { - invalidLost.incrementAndGet(); - } + return (recordMetadata, e) -> { + if (e != null) { + log.debug("Event send failed [%s]", e.getMessage()); + if (recordMetadata.topic().equals(config.getMetricTopic())) { + metricLost.incrementAndGet(); + } else if (recordMetadata.topic().equals(config.getAlertTopic())) { + alertLost.incrementAndGet(); + } else { + invalidLost.incrementAndGet(); } } }; @@ -114,8 +107,6 @@ private Producer setKafkaProducer() props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); props.putAll(config.getKafkaProducerConfig()); -// queueMemoryBound = (props.containsKey(ProducerConfig.BUFFER_MEMORY_CONFIG) ? -// Long.parseLong(props.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG)) : queueMemoryBound); return new KafkaProducer<>(props); } @@ -128,29 +119,11 @@ private Producer setKafkaProducer() @LifecycleStart public void start() { - scheduler.scheduleWithFixedDelay(new Runnable() - { - public void run() - { - sendMetricToKafka(); - } - }, 10, 10, TimeUnit.SECONDS); - scheduler.scheduleWithFixedDelay(new Runnable() - { - public void run() - { - sendAlertToKafka(); - } - }, 10, 10, TimeUnit.SECONDS); - scheduler.scheduleWithFixedDelay(new Runnable() - { - @Override - public void run() - { - log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]", - metricLost.get(), alertLost.get(), invalidLost.get() - ); - } + scheduler.scheduleWithFixedDelay(this::sendMetricToKafka, 10, 10, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(this::sendAlertToKafka, 10, 10, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(() -> { + log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]", + metricLost.get(), alertLost.get(), invalidLost.get()); }, 5, 5, TimeUnit.MINUTES); log.info("Starting Kafka Emitter."); } @@ -169,8 +142,9 @@ private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue objectToSend; try { - while ((objectToSend = recordQueue.take()) != null) { - producer.send(new ProducerRecord(topic, objectToSend.getData()), producerCallback); + while (true) { + objectToSend = recordQueue.take(); + producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback); } } catch (InterruptedException e) { diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java index d34469574dae..b12d2a75d16e 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java @@ -29,8 +29,8 @@ public class MemoryBoundLinkedBlockingQueue { private final long memoryBound; - private AtomicLong currentMemory; - private LinkedBlockingQueue> queue; + private final AtomicLong currentMemory; + private final LinkedBlockingQueue> queue; public MemoryBoundLinkedBlockingQueue(long memoryBound) { From f02218add8fd0b5151bc83fefd4dbf74d4ffc9a3 Mon Sep 17 00:00:00 2001 From: Dongkyu Hwangbo Date: Mon, 27 Mar 2017 15:01:35 +0900 Subject: [PATCH 28/28] rewrite toString --- .../java/io/druid/emitter/kafka/KafkaEmitterConfig.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index a6c3882190f5..b5ca3ce096de 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -130,11 +130,11 @@ public int hashCode() public String toString() { return "KafkaEmitterConfig{" + - "bootstrapServers='" + bootstrapServers + '\'' + - ", metricTopic='" + metricTopic + '\'' + - ", alertTopic='" + alertTopic + '\'' + + "bootstrap.servers='" + bootstrapServers + '\'' + + ", metric.topic='" + metricTopic + '\'' + + ", alert.topic='" + alertTopic + '\'' + ", clusterName='" + clusterName + '\'' + - ", kafkaProducerConfig=" + kafkaProducerConfig + + ", Producer.config=" + kafkaProducerConfig + '}'; } }