Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Kafka-emitter #3860

Merged
merged 28 commits into from
Apr 4, 2017
Merged

Adding Kafka-emitter #3860

merged 28 commits into from
Apr 4, 2017

Conversation

dkhwangbo
Copy link
Contributor

This PR is about adding new extensions-contrib.

Currently, Druid has various emitter module and also has HttpPostEmitter for general purpose. But, Many user currently using Apache Kafka platform and I assume that If druid has another extension emitting their metrics to kafka directly, many user feel comfort and convenience using various monitoring dashboard UI or another eco.

Please feel free for commenting this PR.

@dkhwangbo
Copy link
Contributor Author

This PR is under construction but I wanna get many people's comment about direction of this PR.

@fjy
Copy link
Contributor

fjy commented Jan 20, 2017

Thanks @dkhwangbo. Will review soon

@dkhwangbo
Copy link
Contributor Author

@fjy Hi. Any update is in there?

TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() {};
HashMap<String, Object> result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef);
result.put("clusterName", config.getClusterName());
producer.send(new ProducerRecord<String, String>(config.getTopic(), jsonMapper.writeValueAsString(result)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least two major issues here.

  • producer.send can be blocking and that's is not ok (emit call should never block)

  • alerts are not handled.

  • doing retry in case of exception is the norm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-slim Hi! Thanks for your review.
Kafka new producer api (since 0.8.2) only works asynchronously, so, as my understanding, using another method or logic is needless. If my comment is wrong, please correct it.

@dkhwangbo
Copy link
Contributor Author

@b-slim I just reflect your another comment. Please feel free for review and giving another comment to me.

public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
log.warn(exception, "Exception is occured! Retry.");
emit(event);
Copy link
Member

@pjain1 pjain1 Feb 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can keep on retrying infinitely if lets says kafka brokers are down.
In my opinion retries in case of emitting events are not necessary but if you want to retry you can set the retires property on KafkaProducer like - props.put("retries", <num retires>); which would let KafkaProducer retry by itself and throw exception if the send fails even after the retires.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 Thanks for your review! What a great point! I'll apply your comment.

@pjain1
Copy link
Member

pjain1 commented Feb 3, 2017

also it would be great if you can write some documentation for this extension. For example, here's the documentation for StatsD Emitter - https://github.com/druid-io/druid/blob/master/docs/content/development/extensions-contrib/statsd.md
You can include the extension in list of Community Extensions here - https://github.com/druid-io/druid/blob/master/docs/content/development/extensions.md

HashMap<String, Object> result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef);
result.put("clusterName", config.getClusterName());
producer.send(new ProducerRecord<String, String>(config.getTopic(), jsonMapper.writeValueAsString(result)),
new Callback() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now the callback function is same for all the events, there is no need to create a new callback object for each and every event. You can create a single class level callback object and use the same object for all the events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 Thanks for your kindness! I reflect your comment.

@dkhwangbo
Copy link
Contributor Author

@pjain1 Thanks for your review! I just reflect your kind comment and also write documentation. Please review it.

@Override
public void emit(final Event event) {
if(event instanceof ServiceMetricEvent) {
if(event == null) {
Copy link
Member

@pjain1 pjain1 Feb 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition does not make sense because if event is null then check at line 87 event instanceof ServiceMetricEvent will always be false. So you can put a simple check if(event == null) { return; } and remove the instanceof check unless you want to do different thing with different types of events (ServiceMetricEvent and AlertEvent)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! thanks for correct my mistake. I'll fix it.

@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null) {
log.warn(e, "Exception is occured! Retry.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception is occured! Retry. - > Exception occurred!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll rewrite this message.

@pjain1
Copy link
Member

pjain1 commented Feb 4, 2017

@dkhwangbo Also please file the CLA if you have not done so far - http://druid.io/community/cla.html

try {
TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() {};
HashMap<String, Object> result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef);
result.put("clusterName", config.getClusterName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you just want to put "clusterName" property in the event map, line 93 - 95 can be replaced with -

Map<String, Object> result = event.toMap();
result.put("clusterName", config.getClusterName());

This will prevent the costly serialization and deserialization at line 94.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I'll change this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. I try with your line but UnsupportedOperationException of guava is occured so I try another way similiar with your comment.

@pjain1
Copy link
Member

pjain1 commented Feb 4, 2017

@dkhwangbo have you tested this emitter on your druid cluster ?

@dkhwangbo
Copy link
Contributor Author

@pjain1 I reflect your comment! This extension is running in my druid cluster since 3 weeks ago and All functionality doing well. Also, I already done signing CLA during writing this PR

@pjain1 pjain1 closed this Feb 4, 2017
@pjain1 pjain1 reopened this Feb 4, 2017
@pjain1
Copy link
Member

pjain1 commented Feb 4, 2017

Closing/Reopening PR for travis

@pjain1
Copy link
Member

pjain1 commented Feb 4, 2017

Changes look good to me so 👍 .. however I have one last question - Why are you not handling AlertEvent ?

@dkhwangbo
Copy link
Contributor Author

@pjain1 To make this PR, I refer graphite-emitter and StatsD-emitter. Both has no handling AlertEvent. (Just graphite-emitter write error log when AlertEvent emitted). I dig a little up but druid has no documentation about AlertEvent, furthermore, AlertEvent class only used in ambari-metrics-emitter and graphite-emitter in whole druid project. So, I cannot get a sense how to handle AlertEvent appropriately.

What's the purpose of AlertEvent? When AlertEvent is emitted? What's the class AlertEvent called? If you give me the answer of my question, I really appreciate your help and support.

@pjain1
Copy link
Member

pjain1 commented Feb 5, 2017

This is the AlertEvent class. It is outside of Druid repo. It is used whenever an Alert is made using log.makeAlert(). This is used for alerting critical problematic situations like Segment could not be loaded or task failed to run etc.

I am not sure why graphite-emitter is not handling alerts but for Kafka emitter Ideally alerts should go to a different topic than the metrics (ServiceMetricEvent) so that users immediately know that something is wrong in the cluster and they can take a look.

I am OK with the PR as it is, if someone wants to have alerts handled then it can be done in a follow up PR. If you want you can write the code for handling alerts in this PR as well or in a follow up PR or you can choose not to do it.

@dkhwangbo
Copy link
Contributor Author

@pjain1 Oh, I got it. Thanks for your nice explanation. I'll add the code line to handle AlertEvent in another kafka topic. I'll ping you when I ready for review.

@tutunci
Copy link

tutunci commented Feb 5, 2017 via email

@dkhwangbo
Copy link
Contributor Author

@tutunci Hi Maurizio. Thanks for your interesting suggestion. I understand your point. In fact, I use this extension with another my own Spark streaming job. It can consume Kafka topic, metric-value-based anomaly detect, and send the metric to my timeseries DB. I think your idea also reasonable, so I'll add something like whitelist based logic.

@dkhwangbo
Copy link
Contributor Author

@tutunci Hi Maurizio. Your suggestion is under construction by me, but I think origin purpose of this PR is already done. So, I think It is better finishing this PR in here and handle your suggestion in another PR. Is it Okay?
@pjain1 It's ready for your review.

@tutunci
Copy link

tutunci commented Feb 6, 2017 via email

import com.google.common.base.Preconditions;
import org.apache.kafka.clients.producer.ProducerConfig;

public class KafkaEmitterConfig {
Copy link
Member

@pjain1 pjain1 Feb 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you implement toString() method. It is useful because when the actual KafkaEmitterConfig object is created from the runtime properties, the toString() method will be used to print the config in the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 Hi! Thanks for your suggestion. I apply your comment. Thanks for your support.

@dkhwangbo
Copy link
Contributor Author

@gianm Hi! Thanks for your reply. I addressed your comment, also I apply code formatting rule. Furthermore, I use lambda a little bit to the right place.

@dkhwangbo
Copy link
Contributor Author

dkhwangbo commented Mar 30, 2017

@fjy @b-slim @gianm Anything else?

@b-slim
Copy link
Contributor

b-slim commented Mar 30, 2017

@dkhwangbo thanks for contribution ! this looks good to me ! good job !

@fjy fjy merged commit 0d2e91e into apache:master Apr 4, 2017
@erikdubbelboer
Copy link
Contributor

Can we maybe get some example JSON in the documentation so people know what to expect? Also maybe and example kafka consumer task that consumes this JSON back into Druid?

@dkhwangbo
Copy link
Contributor Author

@erikdubbelboer Thanks for your suggestion! I'll revise document your suggestion included soon.

@jon-wei
Copy link
Contributor

jon-wei commented Jun 8, 2017

@pjain1 Can you describe what the 0.10.1 release notes should call out regarding this patch?

@ambition119
Copy link

ambition119 commented Jul 21, 2017

hi dkhwangbo :
In kafka-emitter code kafka version is 0.10.2.0 , I use kafka-0.8.2.1 config kafka-emitter status is ok.But use kafka-0.10.2.0 status is exception :
io.druid.emitter.kafka.KafkaEmitter - Failed to take record from queue!
java.lang.InterruptedException

@dkhwangbo
Copy link
Contributor Author

@ambition119
Can I get your config file both with 0.8.2.1 and 0.10.2.0?

@ambition119
Copy link

ambition119 commented Jul 21, 2017

@dkhwangbo :
Use kafka-0.8.2.1 and kafka-0.10.2.0 common.runtime.properties kafka-emitter config is the same, info :
druid.extensions.loadList=["druid-kafka-eight","druid-hdfs-storage", "druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "mysql-metadata-storage", "kafka-emitter"]
druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]
druid.emitter=composing
druid.emitter.composing.emitters=["logging","kafka"]
druid.emitter.logging.logLevel=info
#druid.emitter=kafka
druid.emitter.kafka.bootstrap.servers=hdp:9092
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.producer.config={"key.serializer":"org.apache.kafka.common.serialization.StringSerializer","value.serializer":"org.apache.kafka.common.serialization.StringSerializer","max.block.ms":10}

extensions/kafka-emitter directory Are not the same, kafka-emitter(0.8.2.1)info:
kafka-emitter/
kafka-emitter-0.11.0-SNAPSHOT.jar
kafka_2.10-0.8.2.1.jar
kafka-clients-0.8.2.1.jar
metrics-core-2.2.0.jar
scala-library-2.10.4.jar
snappy-java-1.1.1.6.jar
zkclient-0.3.jar
jopt-simple-3.2.jar

kafka-emitter(0.10.2.0)info:
kafka-emitter/
jopt-simple-5.0.3.jar
kafka_2.10-0.10.2.0.jar
kafka-clients-0.10.2.0.jar
kafka-emitter-0.11.0-SNAPSHOT.jar
metrics-core-2.2.0.jar
scala-library-2.10.6.jar
snappy-java-1.1.2.6.jar
zkclient-0.10.jar

thanks!

@Dadard
Copy link

Dadard commented Sep 28, 2017

hi @dkhwangbo

Do you still plan to work on a whitelisting system?

thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.