-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Accumulate and commit offsets at poll time #851
Conversation
I created this PR as a draft since it's incomplete. I need some feedback about the direction it's going before proceeding. |
I started digging into your PR now. Would you please rebase it on "master" to get the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Started digging into this. Some comments.
*/ | ||
private def maybeCommit(): Unit = | ||
if (!rebalanceInProgress && commitStash.nonEmpty) { | ||
val combinedStash = commitStash.flatMap(_.offsets).toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why commitStash
isn't kept as Map[TopicPartition, OffsetAndMetadata]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. The commitStash
is just maintaining a list of all the Commit
messages that it receives and processes them into the map at commit time. Is there a reason to maintain the stash as a Map
between commits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a little more compact in memory to use one map, instead of keeping all maps in the Commit
messages and we need to flatten those anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll do that.
@@ -338,6 +327,7 @@ import scala.util.control.NonFatal | |||
commitRefreshing.committed(offsets) | |||
|
|||
case Stop => | |||
maybeCommit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great. But it would possibly need to be a commitSync
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question. If commits are in progress then the KafkaConsumerActor
will start using the stopping
behaviour, but that behavior will only do up to 1 more poll (and possibly handle some or all remaining commit callbacks) and then stop the actor. I think I assumed the polling would continue until all commit callbacks have returned, but since that's not the case I agree that doing a commitSync
would make more sense.
Do you know what the purpose of allowing 1 final poll when in stopping
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just that: collect replies to any outstanding commits.
And that solution is actually better now that I play around with it, as it doesn't block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I misread part of the condition that checks if there are still comits in progress. So it will poll until all callbacks have returned.
if (stopInProgress && commitsInProgress == 0) {
So to be clear, you're fine with this implementaiton as-is?
@@ -222,10 +222,13 @@ object Consumer { | |||
* Convenience for "at-most once delivery" semantics. The offset of each message is committed to Kafka | |||
* before being emitted downstream. | |||
*/ | |||
// TODO: this should probably be deprecated since it can no longer be guaranteed | |||
// we should guide the user to using auto commit interval setting of the Consumer to do this instead | |||
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be replaced with a plain source with auto-committing enabled on Kafka-level via enable.auto.commit=true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to do that in this PR?
if (exception == null) { | ||
self ! Committed(offsets.asScala.toMap) | ||
} else { | ||
log.error("Kafka commit failed", exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What failures do we expect here? I guess the stream should fail, or retry committing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current Alpakka Kafka behaviour is to return a failure as the reply to the committer.
if (exception != null) sendReply(Status.Failure(exception))
IIUC it is up to the user to handle the Failure
of the Future
that was returned by the commit. In Committer.batchFlow
it would just return the Failure
and kill the stream. We can change this to fail the stream to remain consistent.
WRT to retrying commits:
In cases where we can retry sending the commit we'll need to be careful we don't end up committing out of order. I recall discussing this with someone (maybe the Alpakka team?) It's discussed in Kafka: The Definitive Guide and summarized in this SO post:
https://stackoverflow.com/questions/53240589/kafka-commitasync-retries-with-commit-order
if (offsets.isEmpty) | ||
Future.successful(Done) | ||
else { | ||
override def commitScaladsl(): Future[Done] = Future.successful { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should instead return a singleton instance of Future[Done]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean. Something like this?
override def commitScaladsl(): Future[Done] = {
commit()
Future.successful(Done)
}
Or do you mean assign Future.successful(Done)
to a static member and just return that all the time?
b | ||
} | ||
|
||
private def batch(settings: CommitterSettings): Flow[Committable, CommittableOffsetBatch, NotUsed] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for this separate method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a remnant from an earlier work I did. It's not necessary any more I'll remove it.
999901f
to
422f3d2
Compare
@ennru Thanks for reviewing. I rebased |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm playing around with just implementing #849 and I believe we gain many of the benefits which this solution has. That smaller scope would allow us to release it as a patch or minor version.
@@ -338,6 +327,7 @@ import scala.util.control.NonFatal | |||
commitRefreshing.committed(offsets) | |||
|
|||
case Stop => | |||
maybeCommit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just that: collect replies to any outstanding commits.
And that solution is actually better now that I play around with it, as it doesn't block.
*/ | ||
private def maybeCommit(): Unit = | ||
if (!rebalanceInProgress && commitStash.nonEmpty) { | ||
val combinedStash = commitStash.flatMap(_.offsets).toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a little more compact in memory to use one map, instead of keeping all maps in the Commit
messages and we need to flatten those anyway.
Purpose
This PR proposes 2 major changes to the offset committing.
akka.kafka.consumer.poll-interval
(inspired by comments in Commit only once per poll-interval #849 and Smarter committer flow #850)Changes
KafkaConsumerActor
KafkaConsumerActor
akka.kafka.consumer.poll-interval
) merge all commit requests and perform an asynchronous commit before fetching records.akka.kafka.CommitTimeoutException
from the commit callback when the round trip takes greater thanakka.kafka.consumer.commit-timeout
. This exception was previously thrown by the ask timeout handler in theKafkaAsyncConsumerCommitterRef
.Committable.commitScaladsl
andCommittable.commitJavadsl
in favour ofCommittable.commit
because we no longer need to return aFuture
orCompletableFuture
.parallelism
fromCommitterSettings
More Proposed Changes
CommitterSettings
altogether? We can change allCommitter
flows togroupedWithin
using theakka.kafka.consumer.poll-interval
. I can't think of a good reason to commit more frequently than this interval since commits will not be sent immediately to Kafka.