Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async/Batched Producer #18

Closed
eapache opened this issue Aug 21, 2013 · 4 comments
Closed

Async/Batched Producer #18

eapache opened this issue Aug 21, 2013 · 4 comments

Comments

@eapache
Copy link
Contributor

eapache commented Aug 21, 2013

Some config options (similar to the Java/Scala producer) for collecting and batching messages rather than sending each produced message immediately. This will be more efficient, and will make compression much more efficient as well.

@cloudaice
Copy link

I think we need it!!!

@eapache
Copy link
Contributor Author

eapache commented Sep 11, 2013

Brain dump for reference, more thought is needed:

Thought about this a bit, it doesn't seem as simple as I'd hoped (depending on the tradeoffs you want to make). The biggest decision to make is how to handle errors: the simplest way is an async errors channel the user can select on, but then the user can't really act on the errors they see. There's no easy way for them to associate errors with a particular message (or set of messages, since this is batching), and even if there were they may have submitted more messages in the meantime, so resubmitting the failed ones would lead to (potentially severe) out-of-order scenarios. A call-me-on-error callback is more complicated but perhaps more powerful and flexible, especially combined with a set of possible commands that the callback could return (retry/requeue/discard/abort are possibilities that come to mind).

Barring that complication (however we solve it), I think the basic architecture can be:

  • one 'manager' goroutine
  • n 'sending' goroutines, where n is the number of distinct brokers leading partitions for the given topic (so 8 partitions spread across 3 brokers would only be 3 of these goroutines, not 8).

The SendMessage function simply sends the key/value pair to the manager via a channel. The manager loops, choosing a partition for each message, determining the broker leading that partition, then sending that message to the appropriate 'sending' goroutine by broker. The sending goroutine waits for either a number of messages or a timeout (or other triggers, perhaps a number of bytes of message) and then sends everything it's accumulated so far.

The manager goroutine, in addition to dispatching messages, also has to manage (create/delete) the sending goroutines as brokers go up and down, leadership elections happen etc. If a batch of messages is sent to a broker which is no longer the leader for their partition, that sending goroutine has to pass those messages back to the manager to be redispatched to whatever the new broker is leading that partition.

@eapache
Copy link
Contributor Author

eapache commented Nov 10, 2013

This is being worked on in both #41 and #48

@eapache
Copy link
Contributor Author

eapache commented Nov 26, 2013

#41 was merged, effectively fixing this. There is still much cleanup to do, but this can be closed.

@eapache eapache closed this as completed Nov 26, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants