Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit messages with headers #77

Closed
asdine opened this issue Jan 10, 2018 · 13 comments
Closed

Emit messages with headers #77

asdine opened this issue Jan 10, 2018 · 13 comments

Comments

@asdine
Copy link

asdine commented Jan 10, 2018

Since Kafka v0.11, messages can contain headers, it would be nice to be able to specify them when emitting messages and receive them in our processors.
I'm interested in submitting a PR if that's something you'd like to consider adding to goka.

@db7
Copy link
Collaborator

db7 commented Jan 15, 2018

Sure. I'd be happy to review your PR. Also, if you need help to drive through the code, let me know.

@jbpin
Copy link
Contributor

jbpin commented Feb 2, 2018

@asdine Same here, I'll be happy to give you a hand.

@asdine
Copy link
Author

asdine commented Feb 2, 2018

Sure ! I'll work on that this weekend 👍

@db7 db7 added the enhancement label Aug 5, 2018
@annymsMthd
Copy link

We are starting to use goka and this would be a nice addition that would provide a way to trace. Any word on this? If you guys have an idea on how this should be implemented I could take a stab at it as well.

@db7
Copy link
Collaborator

db7 commented Oct 22, 2018

@annymsMthd sorry for the delayed response. PRs are very welcome.

Since we don't use message headers, perhaps you could quickly explain me how you'd like have it available in your processor callbacks, and then I can hint you the places where to add the pieces of code you'd need.

A processor callback looks like this func(ctx goka.Context, m interface{}). The message payload is m. Other metadata are accessible via ctx:

type Context interface {
    // Topic returns the topic of input message.
    Topic() Stream

    // Key returns the key of the input message.
    Key() string

    // Partition returns the partition of the input message.
    Partition() int32

    // Offset returns the offset of the input message.
    Offset() int64

    // Timestamp returns the timestamp of the input message. If the timestamp is
    // invalid, a zero time will be returned.
    Timestamp() time.Time

    // Emit asynchronously writes a message into a topic.
    Emit(topic Stream, key string, value interface{})  
   
   ...
}

ctx.Emit(topic, key, value) is used to emit a message into a topic.

So, how would you like to use the headers given the existing interface?

@jbpin
Copy link
Contributor

jbpin commented Oct 23, 2018

Hello @db7. good to hear from you on this subject.
As mentioned in the doc of Kafka, Record can have headers. Headers are key-value (string, []byte).

So what I can see it's having a new map for Headers in context and modifying emit to add those headers during the emit if present. and also a second Emit func that allow for a fourth parameter Emit(topic, key, value, headers)

Also in Context having access to this Headers will be helpful. I don't know if headers could be encoded using a codec as it's the case for the value.
Headers can be map[string][]byte or map[string]interface{} depending.

What do you think ?

@db7
Copy link
Collaborator

db7 commented Oct 23, 2018

Ok, thanks for the suggestions. I think it would make sense to add the decoding of the header value in the codec. I could imagine using the headers as follows:

// Header is one entry in the Kafka record headers.
type Header struct{ key string, value interface{} }

type Context interface {
  // Header returns the value for a header key if available, otherwise nil.
  Header(key string) interface{}

  // Emit asynchronously writes a message into a topic.
  Emit(topic Stream, key string, value interface{}, header ...Header)

  ...
}

The codec would be extended to look like this:

type Codec interface {
  Encode(value interface{}) (data []byte, err error)
  Decode(data []byte) (value interface{}, err error)

  EncodeHeader(value interface{}) (data []byte, err error)
  DecodeHeader(data []byte) (value interface{}, err error)
}

To avoid having to implement EncodeHeader and DecodeHeader every time, we can provide some header codecs to be added to any codec:

type MyCodec struct {
  codec.HeaderBytes
}

// only implement Encode and Decode
func (c *MyCodec) Encode(value interface{}) (data []byte, err error) { ... }
func (c *MyCodec) Decode(data []byte) (value interface{}, err error) { ... }

That would require to add to all codecs in existing code the above codec.HeaderBytes, even if one does not use headers. Does that sound ok?

@j0hnsmith
Copy link
Contributor

j0hnsmith commented Oct 23, 2018

Also a way to get all the headers (raw) could be useful, eg

type Context interface {
  // Header returns the value for a header key if available, otherwise nil.
  Header(key string) interface{}

  // Headers returns the raw headers.
  Headers() map[string][]byte
...
}

@db7
Copy link
Collaborator

db7 commented Oct 23, 2018

@j0hnsmith A complete map is a good idea, but do you really want to have that always as raw bytes? Wouldn't it be sufficient to use a header bytes codec (which does not do anything)?

@j0hnsmith
Copy link
Contributor

Ahh, yeah, missed that bit. 👍

@jbpin
Copy link
Contributor

jbpin commented Jan 8, 2019

Hi guys.
Happy new year. Can we take time to finish this feature. I think it should not take a while to implement this. What's Lovoo position with that project? Are you maintaining it? Do you have any roadmap?

@frairon
Copy link
Contributor

frairon commented Jan 9, 2019

Hi @jbpin,
very sorry again for the long delay.

In general, we are maintaining Goka primarily by fixing bugs or improving smaller features
we use in our services. That's why there is no official roadmap.
Unfortunately we don't have much resources to proactively work on
feature requests for which we don't have real use cases,
even if those features (like emitting headers in a message) definitely make sense.

Nevertheless we have interest in pushing the project and its features. I'm happy to
support work on specific problems, review pull requests and such.

Having support by active users of Goka would be helpful and very much appreciated.
Maybe we can create a roadmap in the wiki to start buildinga community, what do you guys think?

For this specific ticket: I think the suggestions made so far are a good starting point.
And again I'm happy to support anyone who wants to give it a try, because it can take a while until I find
the time to start.

@frairon
Copy link
Contributor

frairon commented Mar 23, 2020

Since this issue didn't have any activity for over a year, I'll close it as it doesn't seem to be too important. Feel free to create a new issue with concrete requirements or ideas.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants