Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulate and commit offsets at poll time #851

Closed
wants to merge 2 commits into from

Conversation

seglo
Copy link
Contributor

@seglo seglo commented Jul 8, 2019

Purpose

This PR proposes 2 major changes to the offset committing.

  1. Decouple back-pressure when committing offsets (background in Allow user to commit offsets without handling response when using committable sources #845)
  2. Send accumulated commit requests asynchronously during the akka.kafka.consumer.poll-interval (inspired by comments in Commit only once per poll-interval #849 and Smarter committer flow #850)

Changes

  • Replace ask pattern with a fire and forget commit request to the KafkaConsumerActor
  • Accumulate commit requests in KafkaConsumerActor
  • At poll time (akka.kafka.consumer.poll-interval) merge all commit requests and perform an asynchronous commit before fetching records.
  • Throw a akka.kafka.CommitTimeoutException from the commit callback when the round trip takes greater than akka.kafka.consumer.commit-timeout. This exception was previously thrown by the ask timeout handler in the KafkaAsyncConsumerCommitterRef.
  • Commit any outstanding commit requests during graceful shutdown
  • Deprecate Committable.commitScaladsl and Committable.commitJavadsl in favour of Committable.commit because we no longer need to return a Future or CompletableFuture.
  • Remove parallelism from CommitterSettings

More Proposed Changes

  • Should we deprecate CommitterSettings altogether? We can change all Committer flows to groupedWithin using the akka.kafka.consumer.poll-interval. I can't think of a good reason to commit more frequently than this interval since commits will not be sent immediately to Kafka.
  • Several tests are no longer relevant if we adopt this new approach to committing. I've commented them out for now.

@seglo
Copy link
Contributor Author

seglo commented Jul 8, 2019

I created this PR as a draft since it's incomplete. I need some feedback about the direction it's going before proceeding.

@ennru
Copy link
Member

ennru commented Aug 12, 2019

I started digging into your PR now. Would you please rebase it on "master" to get the RebalanceListener changes in.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Started digging into this. Some comments.

*/
private def maybeCommit(): Unit =
if (!rebalanceInProgress && commitStash.nonEmpty) {
val combinedStash = commitStash.flatMap(_.offsets).toMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why commitStash isn't kept as Map[TopicPartition, OffsetAndMetadata]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. The commitStash is just maintaining a list of all the Commit messages that it receives and processes them into the map at commit time. Is there a reason to maintain the stash as a Map between commits?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little more compact in memory to use one map, instead of keeping all maps in the Commit messages and we need to flatten those anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll do that.

@@ -338,6 +327,7 @@ import scala.util.control.NonFatal
commitRefreshing.committed(offsets)

case Stop =>
maybeCommit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great. But it would possibly need to be a commitSync?

Copy link
Contributor Author

@seglo seglo Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. If commits are in progress then the KafkaConsumerActor will start using the stopping behaviour, but that behavior will only do up to 1 more poll (and possibly handle some or all remaining commit callbacks) and then stop the actor. I think I assumed the polling would continue until all commit callbacks have returned, but since that's not the case I agree that doing a commitSync would make more sense.

Do you know what the purpose of allowing 1 final poll when in stopping?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just that: collect replies to any outstanding commits.
And that solution is actually better now that I play around with it, as it doesn't block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I misread part of the condition that checks if there are still comits in progress. So it will poll until all callbacks have returned.

if (stopInProgress && commitsInProgress == 0) {

So to be clear, you're fine with this implementaiton as-is?

@@ -222,10 +222,13 @@ object Consumer {
* Convenience for "at-most once delivery" semantics. The offset of each message is committed to Kafka
* before being emitted downstream.
*/
// TODO: this should probably be deprecated since it can no longer be guaranteed
// we should guide the user to using auto commit interval setting of the Consumer to do this instead
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be replaced with a plain source with auto-committing enabled on Kafka-level via enable.auto.commit=true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to do that in this PR?

if (exception == null) {
self ! Committed(offsets.asScala.toMap)
} else {
log.error("Kafka commit failed", exception)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What failures do we expect here? I guess the stream should fail, or retry committing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current Alpakka Kafka behaviour is to return a failure as the reply to the committer.

if (exception != null) sendReply(Status.Failure(exception))

IIUC it is up to the user to handle the Failure of the Future that was returned by the commit. In Committer.batchFlow it would just return the Failure and kill the stream. We can change this to fail the stream to remain consistent.

WRT to retrying commits:

In cases where we can retry sending the commit we'll need to be careful we don't end up committing out of order. I recall discussing this with someone (maybe the Alpakka team?) It's discussed in Kafka: The Definitive Guide and summarized in this SO post:

https://stackoverflow.com/questions/53240589/kafka-commitasync-retries-with-commit-order

if (offsets.isEmpty)
Future.successful(Done)
else {
override def commitScaladsl(): Future[Done] = Future.successful {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should instead return a singleton instance of Future[Done].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. Something like this?

  override def commitScaladsl(): Future[Done] = {
    commit()
    Future.successful(Done)
  }

Or do you mean assign Future.successful(Done) to a static member and just return that all the time?

b
}

private def batch(settings: CommitterSettings): Flow[Committable, CommittableOffsetBatch, NotUsed] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this separate method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a remnant from an earlier work I did. It's not necessary any more I'll remove it.

@seglo seglo force-pushed the seglo/poll-committer branch from 999901f to 422f3d2 Compare August 13, 2019 15:44
@seglo
Copy link
Contributor Author

seglo commented Aug 13, 2019

@ennru Thanks for reviewing. I rebased master and added a few small changes based on your feedback.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm playing around with just implementing #849 and I believe we gain many of the benefits which this solution has. That smaller scope would allow us to release it as a patch or minor version.

@@ -338,6 +327,7 @@ import scala.util.control.NonFatal
commitRefreshing.committed(offsets)

case Stop =>
maybeCommit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just that: collect replies to any outstanding commits.
And that solution is actually better now that I play around with it, as it doesn't block.

*/
private def maybeCommit(): Unit =
if (!rebalanceInProgress && commitStash.nonEmpty) {
val combinedStash = commitStash.flatMap(_.offsets).toMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little more compact in memory to use one map, instead of keeping all maps in the Commit messages and we need to flatten those anyway.

@ennru
Copy link
Member

ennru commented Aug 30, 2019

With #862 merged and #874 implementing this a bit less aggressive, we can close this. Thank you for suggesting and exploring it.

@ennru ennru closed this Aug 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants