Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lockless multiproducer #48

Closed
wants to merge 33 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e1a8f56
done dispatcher, still wip
eapache Nov 8, 2013
d7f58f3
done batcher, still wip
eapache Nov 8, 2013
748dc78
build produce requests, still wip
eapache Nov 8, 2013
0bf43e4
It builds
eapache Nov 8, 2013
ac46323
Don't spam empty produceRequests when timer is 0
eapache Nov 8, 2013
a5d73c2
logging and bugfixes
eapache Nov 8, 2013
7d26b36
Properly clean up closed batchers
eapache Nov 9, 2013
807a01b
Simplify states slightly.
eapache Nov 9, 2013
d64ce0d
Rename forceFlush to orderMarker
eapache Nov 9, 2013
293d012
doc
eapache Nov 9, 2013
0d73f5c
Implement AckSuccesses and SimpleProducer
eapache Nov 9, 2013
bb2a30f
Respect AckSuccesses even if RequiredAcks is 0
eapache Nov 9, 2013
cf9fc19
minor refactor
eapache Nov 9, 2013
484227e
switch back to a flat buffer in the batcher
eapache Nov 9, 2013
218040d
safer dispatcher cleanup
eapache Nov 9, 2013
e5ffd26
typo, simplification
eapache Nov 9, 2013
c559e4c
Expose a channel for sending instead of a method
eapache Nov 9, 2013
2154305
guarantee correct cleanup order
eapache Nov 9, 2013
dbda53d
Select is your friend.
eapache Nov 9, 2013
0ff0c46
Don't leak when a broker no longer leads anything
eapache Nov 9, 2013
4c0441b
Add missing state transition
eapache Nov 9, 2013
a5e2603
Change AckSuccesses to not produce nils
eapache Nov 9, 2013
847eb9d
Allow specifying diff partitioners for diff topics
eapache Nov 9, 2013
858d3d0
Distinguish between new and bounced messages
eapache Nov 10, 2013
904bc6f
Force metadata refresh before resetting partition
eapache Nov 10, 2013
93233f9
try to clarify configuration parameters
eapache Nov 12, 2013
b0c2c8b
Allow SimpleProducer to have a custom partitioner to make it actually…
eapache Nov 12, 2013
b3d95be
comments
eapache Nov 12, 2013
8c7797d
more comments
eapache Nov 12, 2013
28e9ac0
Merge branch 'master' into lockless_multiproducer
eapache Nov 12, 2013
6e83859
address misc little issues
eapache Nov 14, 2013
2cca3f1
Change flush conditions so setting any one of the three config values…
eapache Nov 14, 2013
5524e61
give useful error strings
eapache Nov 14, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement AckSuccesses and SimpleProducer
AckSuccesses causes a nil error for every successful message, making it possible
to be fake synchrony by listening once after every send (assuming no buffering).
Wrap this logic in a SimpleProducer, and migrate the old Producer test to be
a SimpleProducer test, since the API is closer.
eapache committed Nov 9, 2013

Partially verified

This commit is signed with the committer’s verified signature.
spydon’s contribution has been verified via GPG key.
We cannot verify signatures from co-authors, and some of the co-authors attributed to this commit require their commits to be signed.
commit 0d73f5cd31d459136caa5875431838789544d223
68 changes: 36 additions & 32 deletions producer.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,8 @@ type ProducerConfig struct {
Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
MaxBufferedMessages uint // The maximum number of messages permitted to buffer before flushing.
MaxBufferedBytes uint // The maximum number of message bytes permitted to buffer before flushing.
MaxBufferTime time.Duration // The maximum amount of time permitted to buffer before flushing.
MaxBufferTime time.Duration // The maximum amount of time permitted to buffer before flushing (or zero for no timer).
AckSuccesses bool // When true, every successful delivery causes a nil to be sent on the Errors channel.
}

// Producer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
@@ -197,9 +198,14 @@ func (d *dispatcher) getQueue(msg *pendingMessage) *msgQueue {
return d.queues[msg.topic][msg.partition]
}

func (d *dispatcher) cleanup() {
for _, batcher := range d.batchers {
close(batcher.msgs)
}
}

func (d *dispatcher) dispatch() {
for msg := range d.msgs {
Logger.Println("dispatcher", msg)

queue := d.getQueue(msg)

@@ -220,7 +226,6 @@ func (d *dispatcher) dispatch() {
if d.batchers[queue.broker] == nil {
d.createBatcher(queue.broker)
}
Logger.Println("dispatching")
d.batchers[queue.broker].msgs <- msg
} else {
queue.backlog = append(queue.backlog, msg)
@@ -267,6 +272,7 @@ func (d *dispatcher) dispatch() {
}
}
}
d.cleanup()
}

func (b *batcher) buildRequest() *ProduceRequest {
@@ -341,13 +347,37 @@ func (b *batcher) flush() {
return
}

Logger.Println("flushing")

response, err := b.broker.Produce(b.prod.client.id, request)

switch err {
case nil:
break
if response != nil {
for topic, tmp := range b.buffers {
for part, buffer := range tmp {
block := response.GetBlock(topic, part)

if block == nil {
b.handleError(buffer, IncompleteResponse)
} else {
switch block.Err {
case NoError:
if b.prod.config.AckSuccesses {
for _ = range buffer {
b.prod.errors <- nil
}
}
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
b.handleError(buffer, err)
default:
for _, msg := range buffer {
b.prod.errors <- &ProduceError{msg.topic, msg.origKey, msg.origValue, err}
}
}

}
}
}
}
case EncodingError:
for _, tmp := range b.buffers {
for _, buffer := range tmp {
@@ -364,30 +394,6 @@ func (b *batcher) flush() {
}
}

if response != nil {
for topic, tmp := range b.buffers {
for part, buffer := range tmp {
block := response.GetBlock(topic, part)

if block == nil {
b.handleError(buffer, IncompleteResponse)
} else {
switch block.Err {
case NoError:
break
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
b.handleError(buffer, err)
default:
for _, msg := range buffer {
b.prod.errors <- &ProduceError{msg.topic, msg.origKey, msg.origValue, err}
}
}

}
}
}
}

b.buffers = make(map[string]map[int32][]*pendingMessage)
b.timer.Reset()
}
@@ -396,11 +402,9 @@ func (b *batcher) processMessages() {
for {
select {
case msg := <-b.msgs:
Logger.Println("batcher", msg)

if msg == nil {
b.flush()
Logger.Println("batcher exiting")
return
}

49 changes: 49 additions & 0 deletions simple_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package sarama

// SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
// and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
type SimpleProducer struct {
producer *Producer
topic string
}

// NewSimpleProducer creates a new SimpleProducer using the given client and topic.
func NewSimpleProducer(client *Client, topic string) (*SimpleProducer, error) {
if topic == "" {
return nil, ConfigurationError("Empty topic")
}

config := new(ProducerConfig)
config.RequiredAcks = 1
config.AckSuccesses = true

prod, err := NewProducer(client, config)

if err != nil {
return nil, err
}

return &SimpleProducer{prod, topic}, nil
}

// SendMessage produces a message with the given key and value. The partition to send to is selected
// at randome. To send strings as either key or value, see the StringEncoder type.
func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
sp.producer.SendMessage(sp.topic, key, value)

err := <-sp.producer.Errors() // we always get something because AckSuccesses is true

if err != nil {
return err.Err
}

return nil
}

// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
// on the underlying client.
func (sp *SimpleProducer) Close() error {
return sp.producer.Close()
}
6 changes: 3 additions & 3 deletions producer_test.go → simple_producer_test.go
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ func TestSimpleProducer(t *testing.T) {
}
defer client.Close()

producer, err := NewProducer(client, "my_topic", &ProducerConfig{RequiredAcks: WaitForLocal})
producer, err := NewSimpleProducer(client, "my_topic")
if err != nil {
t.Fatal(err)
}
@@ -65,7 +65,7 @@ func TestSimpleProducer(t *testing.T) {
}
}

func ExampleProducer() {
func ExampleSimpleProducer() {
client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
if err != nil {
panic(err)
@@ -74,7 +74,7 @@ func ExampleProducer() {
}
defer client.Close()

producer, err := NewProducer(client, "my_topic", &ProducerConfig{RequiredAcks: WaitForLocal})
producer, err := NewSimpleProducer(client, "my_topic")
if err != nil {
panic(err)
}