Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer timestamp #41

Closed
lud opened this issue Dec 21, 2021 · 13 comments
Closed

Producer timestamp #41

lud opened this issue Dec 21, 2021 · 13 comments

Comments

@lud
Copy link

lud commented Dec 21, 2021

Hi,

I am asked to add a timestamp to messages when producing a message. I am currently using brod but would like to migrate to erlkaf eventually.

I would like to know if it is supported by erlkaf or if support is planned.

Thank you!

@silviucpp
Copy link
Owner

Why don't you add the timestamp in message header ? you can add there whatever metadata you want

@silviucpp
Copy link
Owner

@lud
Copy link
Author

lud commented Dec 21, 2021

Well the timestamp is a specific piece of data (from what I understand) that is used to order messages accros partitions. It is not a custom metadata that would fit on some header but an attribute of the Kafka event, just like the key, the value, the headers or the offset.

@silviucpp
Copy link
Owner

Hello,

I see what do you mean. For sure is not used to order the messages because for this you have the offset. I see there is a property form broker version v0.10 or newer where each message has a timestamp when was received by the broker. Not sure on what they are using it internally.

I can expose this info even if I'm a bit reticent because will decrease a bit the performances for those that are not using this property and honestly I don't see why you cannot set yourself a timestamp when the message was actually produced which is a bit more correct than using this.

To give you an example: you are producing message X at T1 and at that moment let's suppose the kafka broker is down (or connection between your client and the broker). erlkaf will queue (by default) the message until when your connection is up again and send the message at that moment. The gap between T1 and the timestamp when broker got the message can be very big..

You know better on what you are using this info but personally I won't implement my logic on it.

@lud
Copy link
Author

lud commented Dec 22, 2021

From what I have been told by a Java team this timestamp is used to order messages properly using Kafka Streams (the Java library) when data is consolidated from multiple partitions (so the offset is not enough).

To give you an example: you are producing message X at T1 and at that moment let's suppose the kafka broker is down (or connection between your client and the broker). erlkaf will queue (by default) the message until when your connection is up again and send the message at that moment. The gap between T1 and the timestamp when broker got the message can be very big..

Yes but I think that is the point. The consumers of the message will see the timestamp that you have set and use it as the "official" timestamp for the validity of the message data. So if in the meantime the broker receives newer messages, or receives multiples messages representing the same data (after connection recovery), that timestamp allows the consumer to select the last one. That is the role of the offset but here we are talking about multi-partitions KGlobalTables or Streams (honestly I only have a basic understanding of Java's Kafka Streams).

will decrease a bit the performances for those that are not using this property

You mean that event if there was a produce/6 function, you need to support that new value all along the data path towards librdkafka even from produce/5 ?

@silviucpp
Copy link
Owner

Hello, to be clear: we don't support kafka streams. librdkafka is not planning to support this as well.

For the timestamp: I will add it when I will have some spare time. It's not a big deal. This is not part of the producer API and won't impact the produce in any way.

Basically when you consume a message you will be able to retrieve the timestamp property as was populated by the broker (if any - for broker older than 0.10 is not available) so basically in the erlkaf_msg record you will have a new property called timestamp.

@lud
Copy link
Author

lud commented Dec 22, 2021

Hello, to be clear: we don't support kafka streams. librdkafka is not planning to support this as well.

I think that librdkafka does support that feature : confluentinc/librdkafka#1016 (comment) . But it's fine if you don't want to in your own library,

For the timestamp: I will add it when I will have some spare time. It's not a big deal. This is not part of the producer API and won't impact the produce in any way.

Basically when you consume a message you will be able to retrieve the timestamp property as was populated by the broker (if any - for broker older than 0.10 is not available) so basically in the erlkaf_msg record you will have a new property called timestamp.

My personal need is to be able to set the timestamp as the producer level, which is supported by Kafka, instead of letting the broker set its own timestamp. So if you're doing that just for me maybe wait but otherwise I guess it can be useful for anyone :)

Thanks!

@silviucpp
Copy link
Owner

I think you misunderstand this or maybe I'm doing it. The only api in librdkafka is to get the timestamp that's assigned by the broker. There is no api to set yourself the timestamp on your own.

@lud
Copy link
Author

lud commented Dec 22, 2021

Maybe, I don't know C nor Kafka very well, but the issue comment I linked above mentioned that the function rd_kafka_producev supports it and it looks like it is possible to set a timestamp looking at the code : https://github.com/edenhill/librdkafka/blob/15d3e7ea8589fcf0a81f5631e9d74f88630fa7bf/tests/0052-msg_timestamps.c#L79

Is that so ? Or is that something else ?

@silviucpp
Copy link
Owner

Hello @lud ,

I had some time yesterday - today to look into this and librdkafka source code and yes it can be done you are right. Seems if not specified the value of timestamp is internally 0 which translates to current time.

I'll allocate some time and implement this feature soon.

@lud
Copy link
Author

lud commented Jan 13, 2022

Hello, Thanks for the update, it is good to hear!

Thanks 👍

silviucpp added a commit that referenced this issue Oct 10, 2022
@silviucpp
Copy link
Owner

The feature is available in last master

@lud
Copy link
Author

lud commented Oct 12, 2022

Thank you very much :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants