Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lockless multiproducer #48

Closed
wants to merge 33 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e1a8f56
done dispatcher, still wip
eapache Nov 8, 2013
d7f58f3
done batcher, still wip
eapache Nov 8, 2013
748dc78
build produce requests, still wip
eapache Nov 8, 2013
0bf43e4
It builds
eapache Nov 8, 2013
ac46323
Don't spam empty produceRequests when timer is 0
eapache Nov 8, 2013
a5d73c2
logging and bugfixes
eapache Nov 8, 2013
7d26b36
Properly clean up closed batchers
eapache Nov 9, 2013
807a01b
Simplify states slightly.
eapache Nov 9, 2013
d64ce0d
Rename forceFlush to orderMarker
eapache Nov 9, 2013
293d012
doc
eapache Nov 9, 2013
0d73f5c
Implement AckSuccesses and SimpleProducer
eapache Nov 9, 2013
bb2a30f
Respect AckSuccesses even if RequiredAcks is 0
eapache Nov 9, 2013
cf9fc19
minor refactor
eapache Nov 9, 2013
484227e
switch back to a flat buffer in the batcher
eapache Nov 9, 2013
218040d
safer dispatcher cleanup
eapache Nov 9, 2013
e5ffd26
typo, simplification
eapache Nov 9, 2013
c559e4c
Expose a channel for sending instead of a method
eapache Nov 9, 2013
2154305
guarantee correct cleanup order
eapache Nov 9, 2013
dbda53d
Select is your friend.
eapache Nov 9, 2013
0ff0c46
Don't leak when a broker no longer leads anything
eapache Nov 9, 2013
4c0441b
Add missing state transition
eapache Nov 9, 2013
a5e2603
Change AckSuccesses to not produce nils
eapache Nov 9, 2013
847eb9d
Allow specifying diff partitioners for diff topics
eapache Nov 9, 2013
858d3d0
Distinguish between new and bounced messages
eapache Nov 10, 2013
904bc6f
Force metadata refresh before resetting partition
eapache Nov 10, 2013
93233f9
try to clarify configuration parameters
eapache Nov 12, 2013
b0c2c8b
Allow SimpleProducer to have a custom partitioner to make it actually…
eapache Nov 12, 2013
b3d95be
comments
eapache Nov 12, 2013
8c7797d
more comments
eapache Nov 12, 2013
28e9ac0
Merge branch 'master' into lockless_multiproducer
eapache Nov 12, 2013
6e83859
address misc little issues
eapache Nov 14, 2013
2cca3f1
Change flush conditions so setting any one of the three config values…
eapache Nov 14, 2013
5524e61
give useful error strings
eapache Nov 14, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Properly clean up closed batchers
Misc other bug-fixes and refactors
eapache committed Nov 9, 2013
commit 7d26b361258b74e66cc68c3f16007c80df783c90
73 changes: 34 additions & 39 deletions producer.go
Original file line number Diff line number Diff line change
@@ -322,6 +322,17 @@ func (b *batcher) buildRequest() *ProduceRequest {
}
}

func (b *batcher) handleError(buf []*pendingMessage, err error) {
for _, msg := range buf {
if msg.err != nil {
b.prod.errors <- &ProduceError{msg.topic, msg.origKey, msg.origValue, err}
} else {
msg.err = err
b.prod.dispatcher.msgs <- msg
}
}
}

func (b *batcher) flush() {
request := b.buildRequest()

@@ -347,54 +358,31 @@ func (b *batcher) flush() {
default:
for _, tmp := range b.buffers {
for _, buffer := range tmp {
for _, msg := range buffer {
if msg.err != nil {
b.prod.errors <- &ProduceError{msg.topic, msg.origKey, msg.origValue, err}
} else {
msg.err = err
b.prod.dispatcher.msgs <- msg
}
}
b.handleError(buffer, err)
}
}
}

if response == nil {
return
}

for topic, tmp := range b.buffers {
for part, buffer := range tmp {
block := response.GetBlock(topic, part)
if response != nil {
for topic, tmp := range b.buffers {
for part, buffer := range tmp {
block := response.GetBlock(topic, part)

if block == nil {
for _, msg := range buffer {
if msg.err != nil {
b.prod.errors <- &ProduceError{msg.topic, msg.origKey, msg.origValue, IncompleteResponse}
} else {
msg.err = IncompleteResponse
b.prod.dispatcher.msgs <- msg
}
}
} else {
switch block.Err {
case NoError:
break
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
for _, msg := range buffer {
if msg.err != nil {
if block == nil {
b.handleError(buffer, IncompleteResponse)
} else {
switch block.Err {
case NoError:
break
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
b.handleError(buffer, err)
default:
for _, msg := range buffer {
b.prod.errors <- &ProduceError{msg.topic, msg.origKey, msg.origValue, err}
} else {
msg.err = err
b.prod.dispatcher.msgs <- msg
}
}
default:
for _, msg := range buffer {
b.prod.errors <- &ProduceError{msg.topic, msg.origKey, msg.origValue, err}
}
}

}
}
}
}
@@ -408,6 +396,13 @@ func (b *batcher) processMessages() {
select {
case msg := <-b.msgs:
Logger.Println("batcher", msg)

if msg == nil {
b.flush()
Logger.Println("batcher exiting")
return
}

if b.buffers[msg.topic] == nil {
b.buffers[msg.topic] = make(map[int32][]*pendingMessage)
}