-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka 0.9+ consumer support #2487
Add Kafka 0.9+ consumer support #2487
Conversation
Thanks for submitting this @seuf. First thing is that you'll need to fix the tests that rely on spotify/docker, which IIRC is a kafka 0.8 cluster. We'll probably need to use https://github.com/wurstmeister/kafka-docker instead. Secondly, instead of completely removing Kafka 0.8 support, we should instead change the current Does this need to get ported to the kafka output plugin as well? did anything change for kafka producers in 0.9+? Since this is a breaking change we're going to have to be a bit more careful about rolling it out. Probably what we want to do is submit a PR for telegraf 1.3 that will warn |
It looks like the code changes are minimal, what are the reasons we can't support both 0.8 and 0.9 in this plugin? |
We probably could but two issues are that "zookeeper peers" don't apply to Kafka 0.9+, and there isn't a dynamic way of telling which version we are interacting with. So it would require having a separate broker list, where if the user specified "zookeeper peers" we would use the old consumer library, and if they specified "brokers" then we use the new consumer library. AFAICT the Kafka project is trying to heavily push users to use 0.9+ clusters. This migration may take some time but IMO it's better if telegraf attempts to be on the front of this rather than maintaining messy cross-compatibility. This would mirror how Apache is maintaining the official scala kafka consumer, which defaults to only work with "new" clusters, and a CLI flag has to be used to enable "old" clusters: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L268 |
Hi. |
For the last test run it looks like it failed because you need to run go fmt. We have this change marked for the 1.4 milestone, so we don't need to rename the current plugin for the 1.3 release, but we do need to update the 1.3 release notes to warn about the breaking change. |
9ed44a8
to
7f0946f
Compare
7f0946f
to
d928b8c
Compare
d928b8c
to
5cf8f1a
Compare
Hi, I've rebased the PR to add the commit 0193cbe |
@seuf is there a way to test your new version of kafka inputs plugin ? nightly build ? |
@JulienChampseix I've made a pre release on my github : https://github.com/seuf/telegraf/releases/tag/v1.3.0 |
@seuf I'm able to connect kafka client with kafak broker using SSL, but I‘m failed to communicate telegraf using this kafka-consumer-plugin with the broker.
but I see in this pull request, I need to specify a different config, are the 3 entries(ssl_ca,ssl_cert,ssl_key) all required? and is it possible for me to change to .jks cert instead in this ssl-kafka-consumer-plugin? Could u pls give me an example to test this plugin?
|
Hi @TinaRen The ssl_cert and ssl_key are only required if you want a bi-directional ssl connection. Othewise, just give the For example :
|
Hi @seuf, thanks for your reply. |
Hey, @seuf, i'm currently testing this MR, and i am running into some error message with the following config:
And i am getting the following error:
However, /opt/kafka/kafka.{key,cer}.pem both look correct, and are unencrypted. What i tried:
I am currently in the dark place where you start to wonder if you get the right error message, or if there is a bug in a library. If you see something obvious, could you point it to me? Thanks for your work. |
Hi, @paulollivier I couldn't use this plugin to connet kafka, too. But my error is different, it always report "kafka ran out of brokers", strangely, the output plugin using the similar library have the same error, too, I even tried directly coding with the Cluster library, seems I couldn't use it and don't know how to use it because my key file has password. So I re-write this plugin with another library confluent-kafka-go, now it works fine for me, it's in my repositoriy, you could have a try if you also need the SSL connection as kafka consumer. BTW, I'm using kafka 0.10 |
5cf8f1a
to
35d1513
Compare
Hi @paulollivier Are you sure your kafka is listening on the 9092 port for SSL ? 9092 is the default port for no ssl / no sasl. |
I've tried this with the default brew install of kafka (
Works great! |
@paulollivier The |
@seuf Now that 1.3 is out we can go forward with renaming the old version, include a note in the new README pointing 0.8 users to it. We should also add the new dependency to the list of dependency licenses. |
@danielnelson: triple-checked, my keys are in x509 pem format :/ |
35d1513
to
b6f6d1f
Compare
@danielnelson OK, I have :
Don't know why, but the circle tests are failling when kafka_consumer_legacy produce a message to kafka. |
06e63e6
to
ded031c
Compare
Can we clarify the name of this bug to mention the new kafka consumer format? (To me at least, that's the more important change.) (Also, thanks @seuf for your hard work :) ) |
@seuf Thanks for doing this, code looks awesome. I'll try to figure out the unit tests tomorrow, can you come up with a better name for the issue and to use in the changelog though? It's okay if we list the item twice if needed. |
ded031c
to
7a6606b
Compare
Didn't have enough time to test but maybe we need to add back the |
Use wurstmeister/kafka docker images for input kafka_consumer tests
7a6606b
to
1e7666a
Compare
I tried to run both containers but for some reason I can't get it to work on circleci by just changing the ports, despite having no problem on my laptop. I'm going to add an unconditional skip to the legacy test after merging as I'm not sure how to get them both working at this time. Perhaps when we move to circleci 2.0 it will work again. |
I have Kerberos kafka cluster, would like use telegraf/kafka_consumer input plug in for consuming messages to influx db. I could not be able to find right config for this, there are lot of questions on this none of them are solving my problem. |
Plugin Input kafka_consumer
This PR allow the kafka_consumer input plugin to connect directly to kafka brokers using the new kafka consumer offset management.
Here is an example of configuraiton
Note : the old Zookeeper Connection type is no more supported.
Related Issue : #1312