-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Emit messages with headers #77
Comments
Sure. I'd be happy to review your PR. Also, if you need help to drive through the code, let me know. |
@asdine Same here, I'll be happy to give you a hand. |
Sure ! I'll work on that this weekend 👍 |
We are starting to use goka and this would be a nice addition that would provide a way to trace. Any word on this? If you guys have an idea on how this should be implemented I could take a stab at it as well. |
@annymsMthd sorry for the delayed response. PRs are very welcome. Since we don't use message headers, perhaps you could quickly explain me how you'd like have it available in your processor callbacks, and then I can hint you the places where to add the pieces of code you'd need. A processor callback looks like this type Context interface {
// Topic returns the topic of input message.
Topic() Stream
// Key returns the key of the input message.
Key() string
// Partition returns the partition of the input message.
Partition() int32
// Offset returns the offset of the input message.
Offset() int64
// Timestamp returns the timestamp of the input message. If the timestamp is
// invalid, a zero time will be returned.
Timestamp() time.Time
// Emit asynchronously writes a message into a topic.
Emit(topic Stream, key string, value interface{})
...
}
So, how would you like to use the headers given the existing interface? |
Hello @db7. good to hear from you on this subject. So what I can see it's having a new map for Headers in context and modifying emit to add those headers during the emit if present. and also a second Emit func that allow for a fourth parameter Also in Context having access to this Headers will be helpful. I don't know if headers could be encoded using a codec as it's the case for the value. What do you think ? |
Ok, thanks for the suggestions. I think it would make sense to add the decoding of the header value in the codec. I could imagine using the headers as follows: // Header is one entry in the Kafka record headers.
type Header struct{ key string, value interface{} }
type Context interface {
// Header returns the value for a header key if available, otherwise nil.
Header(key string) interface{}
// Emit asynchronously writes a message into a topic.
Emit(topic Stream, key string, value interface{}, header ...Header)
...
} The codec would be extended to look like this: type Codec interface {
Encode(value interface{}) (data []byte, err error)
Decode(data []byte) (value interface{}, err error)
EncodeHeader(value interface{}) (data []byte, err error)
DecodeHeader(data []byte) (value interface{}, err error)
} To avoid having to implement type MyCodec struct {
codec.HeaderBytes
}
// only implement Encode and Decode
func (c *MyCodec) Encode(value interface{}) (data []byte, err error) { ... }
func (c *MyCodec) Decode(data []byte) (value interface{}, err error) { ... } That would require to add to all codecs in existing code the above |
Also a way to get all the headers (raw) could be useful, eg type Context interface {
// Header returns the value for a header key if available, otherwise nil.
Header(key string) interface{}
// Headers returns the raw headers.
Headers() map[string][]byte
...
} |
@j0hnsmith A complete map is a good idea, but do you really want to have that always as raw bytes? Wouldn't it be sufficient to use a header bytes codec (which does not do anything)? |
Ahh, yeah, missed that bit. 👍 |
Hi guys. |
Hi @jbpin, In general, we are maintaining Goka primarily by fixing bugs or improving smaller features Nevertheless we have interest in pushing the project and its features. I'm happy to Having support by active users of Goka would be helpful and very much appreciated. For this specific ticket: I think the suggestions made so far are a good starting point. |
Since this issue didn't have any activity for over a year, I'll close it as it doesn't seem to be too important. Feel free to create a new issue with concrete requirements or ideas. |
Since Kafka v0.11, messages can contain headers, it would be nice to be able to specify them when emitting messages and receive them in our processors.
I'm interested in submitting a PR if that's something you'd like to consider adding to goka.
The text was updated successfully, but these errors were encountered: