-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggregate offsets and commit before poll #862
Conversation
val replyTo = sender() | ||
commit(offsets, replyTo ! _) | ||
commitAggregate ++= offsets | ||
commitSenders = commitSenders :+ sender() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A thing to consider would be to request an extra poll by setting up a "delayed poll in flight" as the RequestMessages
do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
Pushed an update so that the order in which the offsets are received doesn't matter anymore. With this we can increase the parallelism in the Committer and switch to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Liking the simplicity of this.
Failure was #832 |
* Commit refreshing: fix bug introduced #862 * Run RetentionPeriodSpec against Kafka 2.0.0
Purpose
Reduce calls to
consumer.commitAsync
by aggregating commits in the actor and sending them just before polling.References
Idea explained in #849
Implementation partly stolen from #851
Changes
The almost unchanged tests indicate that this doesn't change the behaviour a lot.
This change may make a commit wait until the next poll (default 50 ms). To fully make use of this, the committer defaults should be changed.