-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Producer timestamp #41
Comments
Why don't you add the timestamp in message header ? you can add there whatever metadata you want |
Well the timestamp is a specific piece of data (from what I understand) that is used to order messages accros partitions. It is not a custom metadata that would fit on some header but an attribute of the Kafka event, just like the key, the value, the headers or the offset. |
Hello, I see what do you mean. For sure is not used to order the messages because for this you have the offset. I see there is a property form broker version v0.10 or newer where each message has a timestamp when was received by the broker. Not sure on what they are using it internally. I can expose this info even if I'm a bit reticent because will decrease a bit the performances for those that are not using this property and honestly I don't see why you cannot set yourself a timestamp when the message was actually produced which is a bit more correct than using this. To give you an example: you are producing message X at T1 and at that moment let's suppose the kafka broker is down (or connection between your client and the broker). erlkaf will queue (by default) the message until when your connection is up again and send the message at that moment. The gap between T1 and the timestamp when broker got the message can be very big.. You know better on what you are using this info but personally I won't implement my logic on it. |
From what I have been told by a Java team this timestamp is used to order messages properly using Kafka Streams (the Java library) when data is consolidated from multiple partitions (so the offset is not enough).
Yes but I think that is the point. The consumers of the message will see the timestamp that you have set and use it as the "official" timestamp for the validity of the message data. So if in the meantime the broker receives newer messages, or receives multiples messages representing the same data (after connection recovery), that timestamp allows the consumer to select the last one. That is the role of the offset but here we are talking about multi-partitions KGlobalTables or Streams (honestly I only have a basic understanding of Java's Kafka Streams).
You mean that event if there was a |
Hello, to be clear: we don't support kafka streams. librdkafka is not planning to support this as well. For the timestamp: I will add it when I will have some spare time. It's not a big deal. This is not part of the producer API and won't impact the produce in any way. Basically when you consume a message you will be able to retrieve the timestamp property as was populated by the broker (if any - for broker older than 0.10 is not available) so basically in the |
I think that librdkafka does support that feature : confluentinc/librdkafka#1016 (comment) . But it's fine if you don't want to in your own library,
My personal need is to be able to set the timestamp as the producer level, which is supported by Kafka, instead of letting the broker set its own timestamp. So if you're doing that just for me maybe wait but otherwise I guess it can be useful for anyone :) Thanks! |
I think you misunderstand this or maybe I'm doing it. The only api in librdkafka is to get the timestamp that's assigned by the broker. There is no api to set yourself the timestamp on your own. |
Maybe, I don't know C nor Kafka very well, but the issue comment I linked above mentioned that the function Is that so ? Or is that something else ? |
Hello @lud , I had some time yesterday - today to look into this and librdkafka source code and yes it can be done you are right. Seems if not specified the value of timestamp is internally 0 which translates to current time. I'll allocate some time and implement this feature soon. |
Hello, Thanks for the update, it is good to hear! Thanks 👍 |
The feature is available in last master |
Thank you very much :) |
Hi,
I am asked to add a timestamp to messages when producing a message. I am currently using brod but would like to migrate to erlkaf eventually.
I would like to know if it is supported by erlkaf or if support is planned.
Thank you!
The text was updated successfully, but these errors were encountered: