Skip to content

Commit

Permalink
Merge pull request #214 from toninho09/add-header-on-message
Browse files Browse the repository at this point in the history
Adding header on message
  • Loading branch information
frairon authored Jan 6, 2020
2 parents 2554930 + 6c253cf commit 1cf2c41
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 4 deletions.
7 changes: 7 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Context interface {
// Value returns the value of the key in the group table.
Value() interface{}

// Headers returns the headers of the input message
Headers() map[string][]byte

// SetValue updates the value of the key in the group table.
SetValue(value interface{})

Expand Down Expand Up @@ -196,6 +199,10 @@ func (ctx *cbContext) Partition() int32 {
return ctx.msg.Partition
}

func (ctx *cbContext) Headers() map[string][]byte {
return ctx.msg.Header
}

func (ctx *cbContext) Join(topic Table) interface{} {
if ctx.pviews == nil {
ctx.Fail(fmt.Errorf("table %s not subscribed", topic))
Expand Down
6 changes: 6 additions & 0 deletions kafka/confluent/confluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,19 @@ func (c *confluent) run() {
partition = e.TopicPartition.Partition
)

headers := make(map[string][]byte)
for _, header := range e.Headers {
headers[header.Key] = header.Value
}

c.events <- &kafka.Message{
Topic: topic,
Partition: partition,
Offset: int64(e.TopicPartition.Offset),
Key: string(e.Key),
Value: e.Value,
Timestamp: e.Timestamp,
Header: headers,
}

case rdkafka.PartitionEOF:
Expand Down
1 change: 1 addition & 0 deletions kafka/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Message struct {
Partition int32
Offset int64
Timestamp time.Time
Header map[string][]byte

Key string
Value []byte
Expand Down
7 changes: 7 additions & 0 deletions kafka/group_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ func (c *groupConsumer) waitForMessages() bool {
return false
}
case msg := <-c.consumer.Messages():

headers := make(map[string][]byte)
for _, header := range msg.Headers {
headers[string(header.Key)] = header.Value
}

select {
case c.events <- &Message{
Topic: msg.Topic,
Expand All @@ -219,6 +225,7 @@ func (c *groupConsumer) waitForMessages() bool {
Timestamp: msg.Timestamp,
Key: string(msg.Key),
Value: msg.Value,
Header: headers,
}:
case <-c.stop:
return false
Expand Down
3 changes: 3 additions & 0 deletions kafka/group_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func TestGroupConsumer_GroupConsumeMessages(t *testing.T) {
Offset: 0,
Key: string(key),
Value: value,
Header: make(map[string][]byte),
})

ensure.DeepEqual(t, c.partitionMap, map[int32]bool{
Expand Down Expand Up @@ -458,6 +459,7 @@ func TestGroupConsumer_GroupNotificationsAfterMessages(t *testing.T) {
Offset: 0,
Key: string(key),
Value: value,
Header: make(map[string][]byte),
})

ensure.DeepEqual(t, c.partitionMap, map[int32]bool{
Expand Down Expand Up @@ -564,6 +566,7 @@ func TestGroupConsumer_GroupEmptyNotifications(t *testing.T) {
Offset: 0,
Key: string(key),
Value: value,
Header: make(map[string][]byte),
})

ensure.DeepEqual(t, c.partitionMap, map[int32]bool{
Expand Down
7 changes: 7 additions & 0 deletions kafka/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ func (c *simpleConsumer) run(pc sarama.PartitionConsumer, topic string, partitio
// drained.
continue
}

headers := make(map[string][]byte)
for _, header := range m.Headers {
headers[string(header.Key)] = header.Value
}

select {
case c.events <- &Message{
Topic: m.Topic,
Expand All @@ -147,6 +153,7 @@ func (c *simpleConsumer) run(pc sarama.PartitionConsumer, topic string, partitio
Key: string(m.Key),
Value: m.Value,
Timestamp: m.Timestamp,
Header: headers,
}:
case <-c.dying:
return
Expand Down
18 changes: 17 additions & 1 deletion kafka/simple_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,23 @@ func TestSimpleConsumer_ErrorBlocked(t *testing.T) {
pc.EXPECT().HighWaterMarkOffset().Return(int64(0))
mo, ok := (<-ch).(*Message)
ensure.True(t, ok)
ensure.DeepEqual(t, mo, &Message{Topic: "sometopic", Partition: 123, Key: "somekey", Value: []byte("somevalue")})
ensure.DeepEqual(t, mo, &Message{Topic: "sometopic", Partition: 123, Key: "somekey", Value: []byte("somevalue"), Header: map[string][]byte{}})

messages <- &sarama.ConsumerMessage{
Key: []byte("somekey"),
Value: []byte("somevalue"),
Topic: "sometopic",
Partition: 123,
Headers: []*sarama.RecordHeader{{
Key: []byte("someHeader"),
Value: []byte("someHeaderValue"),
}},
}

pc.EXPECT().HighWaterMarkOffset().Return(int64(0))
mo1, ok := (<-ch).(*Message)
ensure.True(t, ok)
ensure.DeepEqual(t, mo1, &Message{Topic: "sometopic", Partition: 123, Key: "somekey", Value: []byte("somevalue"), Header: map[string][]byte{"someHeader": []byte("someHeaderValue")}})

// we now write, but don't read events
messages <- &sarama.ConsumerMessage{
Expand Down
1 change: 1 addition & 0 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func newMessage(ev *kafka.Message) *message {
Timestamp: ev.Timestamp,
Data: ev.Value,
Key: ev.Key,
Header: ev.Header,
}
}

Expand Down
4 changes: 1 addition & 3 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type message struct {
Partition int32
Offset int64
Timestamp time.Time
Header map[string][]byte
}

// ProcessCallback function is called for every message received by the
Expand Down Expand Up @@ -655,15 +656,12 @@ func (g *Processor) rebalance(errg *multierr.ErrGroup, ctx context.Context, part
errs := new(multierr.Errors)
g.opts.log.Printf("Processor: rebalancing: %+v", partitions)


// callback the new partition assignment
g.opts.rebalanceCallback(partitions)


g.m.Lock()
defer g.m.Unlock()


for id := range partitions {
// create partition views
if err := g.createPartitionViews(errg, ctx, id); err != nil {
Expand Down

0 comments on commit 1cf2c41

Please sign in to comment.