Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lockless multiproducer #48

Closed
wants to merge 33 commits into from
Closed

Lockless multiproducer #48

wants to merge 33 commits into from

Conversation

eapache
Copy link
Contributor

@eapache eapache commented Nov 8, 2013

CC @sirupsen @wvanbergen @burke @fw42 @graemej

Simple non-batching synchronous tests are working. Needs batching tests, but as Burke mentioned yesterday they are very messy with the current framework.

Misc other bug-fixes and refactors
And respect forceFlush messages in the batcher
it doesn't actually force a flush, so the old name was confusing
AckSuccesses causes a nil error for every successful message, making it possible
to be fake synchrony by listening once after every send (assuming no buffering).
Wrap this logic in a SimpleProducer, and migrate the old Producer test to be
a SimpleProducer test, since the API is closer.
In this case there is no response, so we must treat everything as a success.
the loop nestings to iterate the map were getting hard to manage
Fixes a hypothetical deadlock when the user's stack was inside the send function
at the exact moment the dispatcher blocked trying to put something on the error
channel.
Fix hypothetical deadlock when two high-volume partitions exchange leaders
simultaneously, by using select the way it was meant to be used.
Instead, produce ProduceErrors with the Err field nil. This lets the user more
easily determine which messages succeeded and simplifies the SimpleProducer.
With a fallback DefaultPartitioner that defaults to Random, so the base
behaviour doesn't change.
Messages that were bounced by the batcher without being sent were coming back
with a nil error, which made them indistinguishable from *new* messages, so they
were ending up in the backlog instead of the requeue. Fix that by adding a
transient "bounced" error type.

Also rename the other errors to reset and shutdown. Naming is hard.
@eapache eapache mentioned this pull request Nov 10, 2013
Otherwise the call to Leader will likely just use the stale cache.
MaxBufferedMessages uint // The maximum number of messages permitted to buffer before flushing.
MaxBufferedBytes uint // The maximum number of message bytes permitted to buffer before flushing.
MaxBufferTime time.Duration // The maximum amount of time permitted to buffer before flushing (or zero for no timer).
AckSuccesses bool // When true, every message sent results in a ProduceError, even if Err is nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get that comment. Can you explain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

He means that every message enqueued will result in one and exactly one error returned, even if the error is nil. This way you can read one result off of the channel for each message you enqueue, to be sure that no messages are still in transit before shutting down, etc.

The inverse is presumably that successes won't be ack'd, and you won't have a 1:1 correspondence between messages enqueued and error returns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I need to figure out a better way of saying that. Normally, messages that aren't sent because of an error cause a ProduceError on the channel accessible by Errors(). A ProduceError consists of just the message and an error.

If AckSuccesses is set to true, then even successfully delivered messages will cause a ProduceError to show up on the channel, but successes will have the error field of the ProduceError set to nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the SimpleProducer. In that code, there is no batching and AckSuccesses is true, so every message put on the Send channel is guaranteed to immediately produce a response on the Errors channel, which allows it to fake the non-concurrent function behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Burke's explanation is far better :)

The design should guarantee that:

  • if AckSuccesses == false then any message not successfully flushed to the broker shows up on the Errors channel (even on shutdown)
  • if AckSuccesses == true then every message shows up on the Errors channel (even on shutdown)

@burke
Copy link
Contributor

burke commented Nov 12, 2013

FWIW

mb1 := mockbroker.New(t, 1)
mb2 := mockbroker.New(t, 2)
defer mb1.Close()
defer mb2.Close()

mb1.ExpectMetadataRequest().
        AddBroker(mb2).
        AddTopicPartition("my_topic", 0, 2)

for i := 0; i < 10; i++ {
        mb2.ExpectProduceRequest().
                AddTopicPartition("my_topic", 0, 1, nil)
}

client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
// ...

I'm going through and converting most of the tests to this implementation now.

}

if topic == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for the 32? Is that just arbitrary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. None of the channels need buffering, but I expect it's somewhat easier on the scheduler if it isn't forced to context-switch all the time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Maybe make this a constant (and use the same constant for all the 32s in here)

@burke
Copy link
Contributor

burke commented Nov 15, 2013

Benchmark results

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

Where applicable, tunable concurrency parameters were left to their default values.

Does this mean MaxBufferedMessages, MaxBufferedBytes and MaxBufferTime were all zero? If so then this branch still defaults to synchronous. You have to set at least one of them to enable asynchronous.

Originally you had to set all of them to enable asynchronous. All that 2cca3f1 does is require you to set one of them, but if you don't set any then it's still synchronous.

If this is still the point of confusion, then I am just bad with words, and apologize for forcing you to rerun the benchmarks again :(

@burke
Copy link
Contributor

burke commented Nov 15, 2013

The behaviour we want is this:

  • First message is delivered immediately.
  • Messages are buffered until the previous request completes, after which all buffered messages are immediately flushed in a new request.

Is there a setting for those three values that will approximate that? If not, what settings would you recommend I benchmark with?

@burke
Copy link
Contributor

burke commented Nov 15, 2013

I tried setting MaxBufferedMessages=MaxBufferedBytes=1, and got pretty similar results.

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

My behaviour is described best in #48 (comment)

This design does not provide a way to approximate the desired behaviour. Setting MaxBufferedMessages=MaxBufferedBytes=1 will force a flush as soon as the message being built contains more than one message or more than one byte, so is effectively synchronous.

Setting MaxBufferedMessages to one million will cause this design to send a single request containing all messages, setting it to 500k will cause it to send two requests, etc.

I would suggest benchmarking with a MaxBufferedMessages of 25k (40 TCP roundtrips to send 1 million messages) and a MaxBufferTime of 1 second.

@burke
Copy link
Contributor

burke commented Nov 15, 2013

producer, err := sarama.NewProducer(client, &sarama.ProducerConfig{MaxBufferTime: 1000, MaxBufferedBytes: 128 * 1024 * 1024, MaxBufferedMessages: 25000})
% go build ;time ./vegeta -cpus=8 attack -protocol=kafka -targets=kafka.txt -duration=1s -output=/dev/null -rate 1000000
...
       65.99 real        28.92 user        48.28 sys

3.2GB RAM, 65% CPU

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

MaxBufferTime is a time.Duration which is in nanoseconds - you're forcing a flush every microsecond. Use 1 * time.Second.

@burke
Copy link
Contributor

burke commented Nov 15, 2013

Ah, haha. I assumed it would be in milliseconds to be consistent with the consumer timeout.

producer, err := sarama.NewProducer(client, &sarama.ProducerConfig{MaxBufferTime: 1 * time.Second, MaxBufferedBytes: 128 * 1024 * 1024, MaxBufferedMessages: 25000})
66.39 real        28.54 user        49.02 sys

65% - 180% CPU, 3.2GB RAM

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

Welp, I must have done something wrong. If you set MaxBufferedMessages to 1 million and leave the other two unset, what is the result?

@burke
Copy link
Contributor

burke commented Nov 15, 2013

8.98s, 250% CPU, 1.4GB RAM

@burke
Copy link
Contributor

burke commented Nov 15, 2013

For an apples-to-apples, multiproducer with equivalent settings (MaxBufferBytes=99999999999, MaxBufferTime=999999999)

9.21s, 180% CPU, 700MB RAM

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

There we go. So the problem with this benchmark is that there's no limit on the message generation speed. With an infinite (or effectively infinite) buffer this means the client work gets done near-instantaneously. The time (~9s) is just the limitation of the network bandwidth to send that much data.

Granted, yours seems to use less CPU/memory.

@burke
Copy link
Contributor

burke commented Nov 15, 2013

It seems to me as though kafka itself is the bottleneck here. I see 108% CPU usage for kafka in most benchmarks, and I'm running all processes on the same box. This kind of makes sense -- I'm writing to one partition, which would presumably be a single core at 100%, and 8% for some manager thread seems reasonable.

Both implementations can, assuming a sufficiently large buffer, write at 100,000 messages/s or 3.8MB/s, which is the maximum a single topic-partition seems to want to accept on my machine.

The resource usage difference is significant, but largely negligible in the scheme of things -- at tens of thousands of messages per second, requiring a couple free gigabytes and cores isn't a big deal.

What I do like better about my implementation is the buffering semantics, ie. delivering messages immediately when possible, and flushing immediately after the last request has completed successfully.

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

Frankly I don't like either of the buffering semantics we ended up implementing. Agree that mine is, in hindsight, pretty awful, but It seems bizarre to send a singleton message if there's more coming down the pipe (and that is easy to tell). I also dislike the lack of buffer cap. As we have shown here, a high-volume source would OOM even a hefty server if the TCP connection decided to timeout.

What about the following pseudo-code semantics. The flushing goroutine is a function like:

func () {
  for bufferToFlush := range doFlush {
    send message...
    flushDone <- true
  }
}

And the main receiving goroutine is something like:

func flush() {
  doFlush <- buffer
  buffer = nil
  flushInProgress = true
}

for {
  select {
    case msg := <- userInput:
      buffer = append(buffer, msg)
      if len(buffer) >= maxBufferSize && flushInProgress {
        <-flushDone // block
        flush()
      } else if len(buffer) > flushTriggerPoint && !flushInProgress {
        flush()
      }
    case <-flushDone:
      flushInProgress = false
      timer.reset()
      if len(buffer) > flushTriggerPoint {
        flush()
      }
    case <- timer:
      if !flushInProgress {
        flush()
      }
}

I believe this provides a maximum buffer size (after which point the user blocks), it doesn't send immediately if it has more messages coming that it can batch efficiently. It does continue to accept messages (until the max buffer size is hit) even during a flush...

Thoughts?

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

We won't be going with these semantics in any case, so closing this PR. Not deleting the branch in case we decide so salvage some of the code for attempt number 3.

@eapache eapache closed this Nov 15, 2013
@tobi
Copy link

tobi commented Nov 15, 2013

just to add some color: the "flush all queued messages starting after
getting the first one" semantic is the one employed by 0MQ and nanomsg.
It's a tradeoff for sure but it approximates ideal behavior for low latency
and bulk messages both.

  • tobi
    CEO Shopify

On Fri, Nov 15, 2013 at 2:46 PM, Evan Huus [email protected] wrote:

Frankly I don't like either of the buffering semantics we ended up
implementing. Agree that mine is, in hindsight, pretty awful, but It seems
bizarre to send a singleton message if there's more coming down the pipe
(and that is easy to tell). I also dislike the lack of buffer cap. As we
have shown here, a high-volume source would OOM even a hefty server if the
TCP connection decided to timeout.

What about the following pseudo-code semantics. The flushing goroutine is
a function like:

func () {
for bufferToFlush := range doFlush {
send message...
flushDone <- true
}
}

And the main receiving goroutine is something like:

func flush() {
doFlush <- buffer
buffer = nil
flushInProgress = true
}

for {
select {
case msg := <- userInput:
buffer = append(buffer, msg)
if len(buffer) >= maxBufferSize && flushInProgress {
<-flushDone // block
flush()
} else if len(buffer) > flushTriggerPoint && !flushInProgress {
flush()
}
case <-flushDone:
flushInProgress = false
timer.reset()
if len(buffer) > flushTriggerPoint {
flush()
}
case <- timer:
if !flushInProgress {
flush()
}
}

I believe this provides a maximum buffer size (after which point the user
blocks), it doesn't send immediately if it has more messages coming that it
can batch efficiently. It does continue to accept messages (until the max
buffer size is hit) even during a flush...

Thoughts?


Reply to this email directly or view it on GitHubhttps://github.com//pull/48#issuecomment-28597792
.

@burke
Copy link
Contributor

burke commented Nov 15, 2013

This can be really trivially tacked onto multiproducer by acquiring from a counting semaphore on Producer.addMessage, and releasing either before submitting the request or after receiving the response from the server.

I'm kind of disappointed go doesn't have counting semaphores that don't require O(n) bytes, though.

@burke
Copy link
Contributor

burke commented Nov 15, 2013

Oh wow, make(chan struct{}, n) only consumes O(1) bytes, apparently.

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

This can be really trivially tacked

Which "this"?

Oh wow, make(chan struct{}, n) only consumes O(1) bytes, apparently.

Buffer is lazy, so it will take O(n) once you fill it up with with structs. You could do an O(1) version with an int and a mutex.

@burke
Copy link
Contributor

burke commented Nov 15, 2013

package main

import "time"

type nothing struct{}

func main() {
    size := 512 * 1024 * 1024 // 512 MB
    ch := make(chan nothing, 2000000000000)
    for i := 0; i < size; i++ {
        ch <- nothing{}
    }
    time.Sleep(30 * time.Second)
}

Uses 412KB of RAM.

EDIT: By "this" I just mean some sort of block-on-too-many-messages-enqueued concept.

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

Huh, go figure. Guess they're smart enough to optimize that case.

@burke
Copy link
Contributor

burke commented Nov 15, 2013

I just discussed this with Tobi and Willem, and we think we'd rather have unbounded memory consumption than block: 99% of use cases for kafka are going to be instrumentation on processes that can't be slowed down; thus, buffering as many messages as possible in hopes of the system recovering and allowing us to flush is preferable to blocking or rejecting them.

Applying back-pressure doesn't make sense in this context, I think, so blocking is out; it's just a question of whether we would rather risk going OOM, or define an upper memory bound above which errors will be returned rather than enqueueing the messages.

Thoughts? /cc also @wvanbergen @tobi

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

OK, I can see that logic. I think I would prefer not OOMing if we don't have to (by checking http://golang.org/pkg/runtime/#ReadMemStats maybe?). I suspect that if we don't have memory to queue a message, we should increment a counter of dropped messages then when have space again generate a "dropped n messages" message.

The other question is if it's worth making configurable. Are there potential use-cases where it would be preferable to have a fixed, non-memory-based queue maximum? Are there cases where it would be preferable to apply backpressure instead of erroring?

@eapache
Copy link
Contributor Author

eapache commented Nov 15, 2013

See https://godoc.org/github.com/eapache/channels#BatchingChannel for a lockless implementation of my current understanding of the desired buffering semantics (if one constructs it with channels.Infinity as the size parameter). The goroutine responsible for flushing messages can just for buffer := range batchChannel {send buffer}.

@eapache
Copy link
Contributor Author

eapache commented Nov 19, 2013

See also https://godoc.org/github.com/eapache/channels#OverflowingChannel for a lockless way to discard messages without blocking when the buffer fills up (if we decide to do that to avoid OOM). It could be combined with the batching channel using https://godoc.org/github.com/eapache/channels#Pipe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants