forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproduce_message.go
105 lines (90 loc) · 2.72 KB
/
produce_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package sarama
import (
"log"
"time"
)
type produceMessage struct {
tp topicPartition
key, value []byte
retried bool
sync bool
}
type produceRequestBuilder []*produceMessage
// If the message is synchronous, we manually send it and wait for a return.
// Otherwise, we just hand it back to the producer to enqueue using the normal
// method.
func (msg *produceMessage) enqueue(p *Producer) error {
if !msg.sync {
return p.addMessage(msg)
}
var prb produceRequestBuilder = []*produceMessage{msg}
bp, err := p.brokerProducerFor(msg.tp)
if err != nil {
return err
}
errs := make(chan error, 1)
bp.flushRequest(p, prb, func(err error) {
errs <- err
close(errs)
})
return <-errs
}
func (msg *produceMessage) reenqueue(p *Producer) error {
if msg.retried {
return DroppedMessagesError{}
}
msg.retried = true
return msg.enqueue(p)
}
func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
return msg.tp.partition == partition && msg.tp.topic == topic
}
func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: int32(config.Timeout / time.Millisecond)}
// If compression is enabled, we need to group messages by topic-partition and
// wrap them in MessageSets. We already discarded that grouping, so we
// inefficiently re-sort them. This could be optimized (ie. pass a hash around
// rather than an array. Not sure what the best way is.
if config.Compression != CompressionNone {
msgSets := make(map[topicPartition]*MessageSet)
for _, pmsg := range b {
msgSet, ok := msgSets[pmsg.tp]
if !ok {
msgSet = new(MessageSet)
msgSets[pmsg.tp] = msgSet
}
msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value})
}
for tp, msgSet := range msgSets {
valBytes, err := encode(msgSet)
if err != nil {
log.Fatal(err) // if this happens, it's basically our fault.
}
msg := Message{Codec: config.Compression, Key: nil, Value: valBytes}
req.AddMessage(tp.topic, tp.partition, &msg)
}
return req
}
// Compression is not enabled. Dumb-ly append each request directly to the
// request, with no MessageSet wrapper.
for _, pmsg := range b {
msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)
}
return req
}
func (msg *produceMessage) byteSize() uint32 {
return uint32(len(msg.key) + len(msg.value))
}
func (b produceRequestBuilder) byteSize() uint32 {
var size uint32
for _, m := range b {
size += m.byteSize()
}
return size
}
func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) {
for i := len(b) - 1; i >= 0; i-- {
fn(b[i])
}
}