-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lockless multiproducer #48
Conversation
Misc other bug-fixes and refactors
And respect forceFlush messages in the batcher
it doesn't actually force a flush, so the old name was confusing
AckSuccesses causes a nil error for every successful message, making it possible to be fake synchrony by listening once after every send (assuming no buffering). Wrap this logic in a SimpleProducer, and migrate the old Producer test to be a SimpleProducer test, since the API is closer.
In this case there is no response, so we must treat everything as a success.
the loop nestings to iterate the map were getting hard to manage
Fixes a hypothetical deadlock when the user's stack was inside the send function at the exact moment the dispatcher blocked trying to put something on the error channel.
Fix hypothetical deadlock when two high-volume partitions exchange leaders simultaneously, by using select the way it was meant to be used.
Instead, produce ProduceErrors with the Err field nil. This lets the user more easily determine which messages succeeded and simplifies the SimpleProducer.
With a fallback DefaultPartitioner that defaults to Random, so the base behaviour doesn't change.
Messages that were bounced by the batcher without being sent were coming back with a nil error, which made them indistinguishable from *new* messages, so they were ending up in the backlog instead of the requeue. Fix that by adding a transient "bounced" error type. Also rename the other errors to reset and shutdown. Naming is hard.
Otherwise the call to Leader will likely just use the stale cache.
MaxBufferedMessages uint // The maximum number of messages permitted to buffer before flushing. | ||
MaxBufferedBytes uint // The maximum number of message bytes permitted to buffer before flushing. | ||
MaxBufferTime time.Duration // The maximum amount of time permitted to buffer before flushing (or zero for no timer). | ||
AckSuccesses bool // When true, every message sent results in a ProduceError, even if Err is nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get that comment. Can you explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
He means that every message enqueued will result in one and exactly one error returned, even if the error is nil. This way you can read one result off of the channel for each message you enqueue, to be sure that no messages are still in transit before shutting down, etc.
The inverse is presumably that successes won't be ack'd, and you won't have a 1:1 correspondence between messages enqueued and error returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, I need to figure out a better way of saying that. Normally, messages that aren't sent because of an error cause a ProduceError
on the channel accessible by Errors()
. A ProduceError
consists of just the message and an error.
If AckSuccesses
is set to true, then even successfully delivered messages will cause a ProduceError
to show up on the channel, but successes will have the error field of the ProduceError
set to nil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in the SimpleProducer
. In that code, there is no batching and AckSuccesses
is true, so every message put on the Send
channel is guaranteed to immediately produce a response on the Errors
channel, which allows it to fake the non-concurrent function behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Burke's explanation is far better :)
The design should guarantee that:
- if
AckSuccesses == false
then any message not successfully flushed to the broker shows up on the Errors channel (even on shutdown) - if
AckSuccesses == true
then every message shows up on the Errors channel (even on shutdown)
mb1 := mockbroker.New(t, 1)
mb2 := mockbroker.New(t, 2)
defer mb1.Close()
defer mb2.Close()
mb1.ExpectMetadataRequest().
AddBroker(mb2).
AddTopicPartition("my_topic", 0, 2)
for i := 0; i < 10; i++ {
mb2.ExpectProduceRequest().
AddTopicPartition("my_topic", 0, 1, nil)
}
client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
// ... I'm going through and converting most of the tests to this implementation now. |
} | ||
|
||
if topic == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for the 32? Is that just arbitrary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya. None of the channels need buffering, but I expect it's somewhat easier on the scheduler if it isn't forced to context-switch all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Maybe make this a constant (and use the same constant for all the 32s in here)
Does this mean Originally you had to set all of them to enable asynchronous. All that 2cca3f1 does is require you to set one of them, but if you don't set any then it's still synchronous. If this is still the point of confusion, then I am just bad with words, and apologize for forcing you to rerun the benchmarks again :( |
The behaviour we want is this:
Is there a setting for those three values that will approximate that? If not, what settings would you recommend I benchmark with? |
I tried setting |
My behaviour is described best in #48 (comment) This design does not provide a way to approximate the desired behaviour. Setting Setting I would suggest benchmarking with a |
3.2GB RAM, 65% CPU |
|
Ah, haha. I assumed it would be in milliseconds to be consistent with the consumer timeout. producer, err := sarama.NewProducer(client, &sarama.ProducerConfig{MaxBufferTime: 1 * time.Second, MaxBufferedBytes: 128 * 1024 * 1024, MaxBufferedMessages: 25000})
65% - 180% CPU, 3.2GB RAM |
Welp, I must have done something wrong. If you set |
8.98s, 250% CPU, 1.4GB RAM |
For an apples-to-apples, 9.21s, 180% CPU, 700MB RAM |
There we go. So the problem with this benchmark is that there's no limit on the message generation speed. With an infinite (or effectively infinite) buffer this means the client work gets done near-instantaneously. The time (~9s) is just the limitation of the network bandwidth to send that much data. Granted, yours seems to use less CPU/memory. |
It seems to me as though kafka itself is the bottleneck here. I see 108% CPU usage for kafka in most benchmarks, and I'm running all processes on the same box. This kind of makes sense -- I'm writing to one partition, which would presumably be a single core at 100%, and 8% for some manager thread seems reasonable. Both implementations can, assuming a sufficiently large buffer, write at 100,000 messages/s or 3.8MB/s, which is the maximum a single topic-partition seems to want to accept on my machine. The resource usage difference is significant, but largely negligible in the scheme of things -- at tens of thousands of messages per second, requiring a couple free gigabytes and cores isn't a big deal. What I do like better about my implementation is the buffering semantics, ie. delivering messages immediately when possible, and flushing immediately after the last request has completed successfully. |
Frankly I don't like either of the buffering semantics we ended up implementing. Agree that mine is, in hindsight, pretty awful, but It seems bizarre to send a singleton message if there's more coming down the pipe (and that is easy to tell). I also dislike the lack of buffer cap. As we have shown here, a high-volume source would OOM even a hefty server if the TCP connection decided to timeout. What about the following pseudo-code semantics. The flushing goroutine is a function like:
And the main receiving goroutine is something like:
I believe this provides a maximum buffer size (after which point the user blocks), it doesn't send immediately if it has more messages coming that it can batch efficiently. It does continue to accept messages (until the max buffer size is hit) even during a flush... Thoughts? |
We won't be going with these semantics in any case, so closing this PR. Not deleting the branch in case we decide so salvage some of the code for attempt number 3. |
just to add some color: the "flush all queued messages starting after
On Fri, Nov 15, 2013 at 2:46 PM, Evan Huus [email protected] wrote:
|
This can be really trivially tacked onto I'm kind of disappointed go doesn't have counting semaphores that don't require O(n) bytes, though. |
Oh wow, |
Which "this"?
Buffer is lazy, so it will take O(n) once you fill it up with with structs. You could do an O(1) version with an int and a mutex. |
package main
import "time"
type nothing struct{}
func main() {
size := 512 * 1024 * 1024 // 512 MB
ch := make(chan nothing, 2000000000000)
for i := 0; i < size; i++ {
ch <- nothing{}
}
time.Sleep(30 * time.Second)
} Uses 412KB of RAM. EDIT: By "this" I just mean some sort of block-on-too-many-messages-enqueued concept. |
Huh, go figure. Guess they're smart enough to optimize that case. |
I just discussed this with Tobi and Willem, and we think we'd rather have unbounded memory consumption than block: 99% of use cases for kafka are going to be instrumentation on processes that can't be slowed down; thus, buffering as many messages as possible in hopes of the system recovering and allowing us to flush is preferable to blocking or rejecting them. Applying back-pressure doesn't make sense in this context, I think, so blocking is out; it's just a question of whether we would rather risk going OOM, or define an upper memory bound above which errors will be returned rather than enqueueing the messages. Thoughts? /cc also @wvanbergen @tobi |
OK, I can see that logic. I think I would prefer not OOMing if we don't have to (by checking http://golang.org/pkg/runtime/#ReadMemStats maybe?). I suspect that if we don't have memory to queue a message, we should increment a counter of dropped messages then when have space again generate a "dropped n messages" message. The other question is if it's worth making configurable. Are there potential use-cases where it would be preferable to have a fixed, non-memory-based queue maximum? Are there cases where it would be preferable to apply backpressure instead of erroring? |
See https://godoc.org/github.com/eapache/channels#BatchingChannel for a lockless implementation of my current understanding of the desired buffering semantics (if one constructs it with |
See also https://godoc.org/github.com/eapache/channels#OverflowingChannel for a lockless way to discard messages without blocking when the buffer fills up (if we decide to do that to avoid OOM). It could be combined with the batching channel using https://godoc.org/github.com/eapache/channels#Pipe |
CC @sirupsen @wvanbergen @burke @fw42 @graemej
Simple non-batching synchronous tests are working. Needs batching tests, but as Burke mentioned yesterday they are very messy with the current framework.