-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding Kafka-emitter #3860
Adding Kafka-emitter #3860
Conversation
This PR is under construction but I wanna get many people's comment about direction of this PR. |
602b9d8
to
03a7b90
Compare
Thanks @dkhwangbo. Will review soon |
1b1393a
to
c0c5459
Compare
@fjy Hi. Any update is in there? |
TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() {}; | ||
HashMap<String, Object> result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); | ||
result.put("clusterName", config.getClusterName()); | ||
producer.send(new ProducerRecord<String, String>(config.getTopic(), jsonMapper.writeValueAsString(result))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at least two major issues here.
-
producer.send
can be blocking and that's is not ok (emit call should never block) -
alerts are not handled.
-
doing retry in case of exception is the norm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@b-slim Hi! Thanks for your review.
Kafka new producer api (since 0.8.2) only works asynchronously, so, as my understanding, using another method or logic is needless. If my comment is wrong, please correct it.
e7e7b02
to
02e9ffb
Compare
@b-slim I just reflect your another comment. Please feel free for review and giving another comment to me. |
public void onCompletion(RecordMetadata metadata, Exception exception) { | ||
if(exception != null) { | ||
log.warn(exception, "Exception is occured! Retry."); | ||
emit(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can keep on retrying infinitely if lets says kafka brokers are down.
In my opinion retries in case of emitting events are not necessary but if you want to retry you can set the retires
property on KafkaProducer like - props.put("retries", <num retires>);
which would let KafkaProducer retry by itself and throw exception if the send fails even after the retires.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjain1 Thanks for your review! What a great point! I'll apply your comment.
also it would be great if you can write some documentation for this extension. For example, here's the documentation for |
HashMap<String, Object> result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); | ||
result.put("clusterName", config.getClusterName()); | ||
producer.send(new ProducerRecord<String, String>(config.getTopic(), jsonMapper.writeValueAsString(result)), | ||
new Callback() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since now the callback function is same for all the events, there is no need to create a new callback object for each and every event. You can create a single class level callback object and use the same object for all the events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjain1 Thanks for your kindness! I reflect your comment.
@pjain1 Thanks for your review! I just reflect your kind comment and also write documentation. Please review it. |
@Override | ||
public void emit(final Event event) { | ||
if(event instanceof ServiceMetricEvent) { | ||
if(event == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition does not make sense because if event
is null then check at line 87 event instanceof ServiceMetricEvent
will always be false. So you can put a simple check if(event == null) { return; }
and remove the instanceof
check unless you want to do different thing with different types of events (ServiceMetricEvent
and AlertEvent
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! thanks for correct my mistake. I'll fix it.
@Override | ||
public void onCompletion(RecordMetadata recordMetadata, Exception e) { | ||
if(e != null) { | ||
log.warn(e, "Exception is occured! Retry."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception is occured! Retry.
- > Exception occurred!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I'll rewrite this message.
@dkhwangbo Also please file the CLA if you have not done so far - http://druid.io/community/cla.html |
try { | ||
TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() {}; | ||
HashMap<String, Object> result = jsonMapper.readValue(jsonMapper.writeValueAsString(event), typeRef); | ||
result.put("clusterName", config.getClusterName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you just want to put "clusterName" property in the event map, line 93 - 95 can be replaced with -
Map<String, Object> result = event.toMap();
result.put("clusterName", config.getClusterName());
This will prevent the costly serialization and deserialization at line 94.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I'll change this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi. I try with your line but UnsupportedOperationException
of guava is occured so I try another way similiar with your comment.
@dkhwangbo have you tested this emitter on your druid cluster ? |
Closing/Reopening PR for travis |
Changes look good to me so 👍 .. however I have one last question - Why are you not handling |
@pjain1 To make this PR, I refer What's the purpose of |
This is the I am not sure why I am OK with the PR as it is, if someone wants to have alerts handled then it can be done in a follow up PR. If you want you can write the code for handling alerts in this PR as well or in a follow up PR or you can choose not to do it. |
@pjain1 Oh, I got it. Thanks for your nice explanation. I'll add the code line to handle |
Hi Dongkyu,
I'm interested in your implementation as I've tricked current http metric
emitter.
What I've currently done is an http server in Go lang that receives http
emitter.
It parses messages and put events into a Kafka topic.
Main issue is that some of the events has multiple informational like JVM
data or System ones, so I've set generic fields for Druid datasource as
user1 user2 an so on.
A suggestion is about having a configuration file which set events that can
be set into Kafka and which not. This is what I've set in my http server,
avoiding to have unwanted metrics.
About alerts what I've done is generating an email and a Slack notification
with the error details.
Mainly I use the standard event metrics to monitor Druid through Grafana
charts and alerts as immediate notifications (this is why I didn't put into
Kafka).
Hope I helps in your implementation
Thanks
Maurizio
Il 05 feb 2017 3:07 AM, "Dongkyu Hwangbo" <[email protected]> ha
scritto:
<https://github.com/pjain1>
@pjain1 <https://github.com/pjain1> Oh, I got it. Thanks for your nice
explanation. I'll add the code line to handle AlertEvent in another kafka
topic. I'll ping you when I ready for review.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
|
@tutunci Hi Maurizio. Thanks for your interesting suggestion. I understand your point. In fact, I use this extension with another my own Spark streaming job. It can consume Kafka topic, metric-value-based anomaly detect, and send the metric to my timeseries DB. I think your idea also reasonable, so I'll add something like whitelist based logic. |
Hi Dongkyu,
It's absolutely OK, of course.
Just to give you some additional details, here my configuration file:
enabled=sys/disk/write/count||all
enabled=jvm/mem/max|memKind|all
enabled=jvm/mem/committed|memKind|all
enabled=jvm/mem/used|memKind|all
enabled=jvm/mem/init|memKind|all
enabled=jvm/pool/max|poolKind,poolName|all
enabled=jvm/pool/committed|poolKind,poolName|all
enabled=jvm/pool/used|poolKind,poolName|all
enabled=jvm/pool/init|poolKind,poolName|all
enabled=segment/loadQueue/size|server|druid:aws:coordinator
enabled=segment/loadQueue/failed|server|druid:aws:coordinator
enabled=segment/loadQueue/count|server|druid:aws:coordinator
enabled=query/cache/total/hitRate||druid:aws:broker,druid:aws:historical
enabled=query/cache/total/averageBytes||druid:aws:broker,druid:aws:historical
enabled=query/cache/total/timeouts||druid:aws:broker,druid:aws:historical
enabled=query/cache/total/errors||druid:aws:broker,druid:aws:historical
Here what's setup is
metric | additional params to set as user1,user2... | services involved
So for example when I receive these metrics:
{"feed":"metrics","host":"10.80.4.154:8083","metric":"*jvm/pool/init*","
*poolKind*":"heap","*poolName*":"PS Survivor Space","service":"
*druid:aws:realtime*
","timestamp":"2017-02-05T23:59:54.891-05:00","value":268435456}
{"feed":"metrics","host":"10.80.4.154:8083","metric":"jvm/pool/max","poolKind":"heap","poolName":"PS
Old
Gen","service":"druid:aws:realtime","timestamp":"2017-02-05T23:59:54.891-05:00","value":10737418240}
It matches with this condition enabled=jvm/pool/init|poolKind,poolName|all
because metric matches, services are all and additional user-defined fields
are poolKind=>user1 and poolName=>user2 (it's positional based on the
insert order).
Hope it can give you some kind of help.
Let me know if you create another PR, so I can follow it.
Thanks
Maurizio
2017-02-06 3:01 GMT+01:00 Dongkyu Hwangbo <[email protected]>:
… @tutunci <https://github.com/tutunci> Hi Maurizio. Your suggestion is
under construction by me, but I think origin purpose of this PR is already
done. So, I think It is better finishing this PR in here and handle your
suggestion in another PR. Is it Okay?
@pjain1 <https://github.com/pjain1> It's ready for your review.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3860 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAYbMbAFNwJmyWDVrTOJEtjEFGdJ3nouks5rZn8QgaJpZM4LnmVX>
.
|
import com.google.common.base.Preconditions; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
|
||
public class KafkaEmitterConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you implement toString()
method. It is useful because when the actual KafkaEmitterConfig
object is created from the runtime properties, the toString()
method will be used to print the config in the logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjain1 Hi! Thanks for your suggestion. I apply your comment. Thanks for your support.
… config value of kafka producer
c113f10
to
8be4c4c
Compare
@gianm Hi! Thanks for your reply. I addressed your comment, also I apply code formatting rule. Furthermore, I use lambda a little bit to the right place. |
@dkhwangbo thanks for contribution ! this looks good to me ! good job ! |
Can we maybe get some example JSON in the documentation so people know what to expect? Also maybe and example kafka consumer task that consumes this JSON back into Druid? |
@erikdubbelboer Thanks for your suggestion! I'll revise document your suggestion included soon. |
@pjain1 Can you describe what the 0.10.1 release notes should call out regarding this patch? |
hi dkhwangbo : |
@ambition119 |
@dkhwangbo : extensions/kafka-emitter directory Are not the same, kafka-emitter(0.8.2.1)info: kafka-emitter(0.10.2.0)info: thanks! |
hi @dkhwangbo Do you still plan to work on a whitelisting system? thanks |
This PR is about adding new extensions-contrib.
Currently, Druid has various emitter module and also has HttpPostEmitter for general purpose. But, Many user currently using Apache Kafka platform and I assume that If druid has another extension emitting their metrics to kafka directly, many user feel comfort and convenience using various monitoring dashboard UI or another eco.
Please feel free for commenting this PR.