Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate offsets and commit before poll #862

Merged
merged 4 commits into from
Aug 16, 2019

Conversation

ennru
Copy link
Member

@ennru ennru commented Aug 14, 2019

Purpose

Reduce calls to consumer.commitAsync by aggregating commits in the actor and sending them just before polling.

References

Idea explained in #849
Implementation partly stolen from #851

Changes

The almost unchanged tests indicate that this doesn't change the behaviour a lot.

This change may make a commit wait until the next poll (default 50 ms). To fully make use of this, the committer defaults should be changed.

val replyTo = sender()
commit(offsets, replyTo ! _)
commitAggregate ++= offsets
commitSenders = commitSenders :+ sender()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thing to consider would be to request an extra poll by setting up a "delayed poll in flight" as the RequestMessages do.

Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

@ennru
Copy link
Member Author

ennru commented Aug 16, 2019

Pushed an update so that the order in which the offsets are received doesn't matter anymore. With this we can increase the parallelism in the Committer and switch to mapAsyncUnordered.

Copy link
Contributor

@2m 2m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Liking the simplicity of this.

@ennru
Copy link
Member Author

ennru commented Aug 16, 2019

Failure was #832

@ennru ennru merged commit b148f0a into akka:master Aug 16, 2019
@ennru ennru deleted the ennru/collect-commits branch August 16, 2019 14:02
ennru added a commit to ennru/alpakka-kafka that referenced this pull request Nov 21, 2019
seglo pushed a commit that referenced this pull request Nov 21, 2019
* Commit refreshing: fix bug introduced #862
* Run RetentionPeriodSpec against Kafka 2.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants