-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rebalance event listeners don't respect ConsumerRebalanceListener contract #539
Comments
Thanks for raising this issue. Upon more investigation I see that asynchronous execution starts even earlier. When the Apache Kafka client calls the listener, it continues further code execution as soon as the callback method completes. Which is very soon, because the only thing that the Alpakka Kafka listener does, is an asynchronous message dispatch to run the graph stage callback (calling
And then, when the callback is executed there is additional asynchronous execution which is initiated by the message send to the listener actor which you have pointed out. Now in most cases, when the stream is running without async boundaries, everything is running on a single graph interpreter, which handles messages in order. Therefore messages dispatched by However if there are multiple consumer sources running on the same stream that are in their own async islands, then different graph interpreters will be running streams on the different async islands. Which means that one graph interpreter can execute both |
Hey. Was this triaged by any chance? If not, I could use some pointers / guidance on how to approach this. |
I'm already kneedeep into that code and know exactly why these messages come in the wrong order. I'll give it a try, next week. |
There is no backwards-compatible way to turn these signals sent on revoking and on assignment into something semi-synchronous. |
Reopening as #595 did not address this. |
@ennru the use-case would be something like microbatching with e.g. Consider consumer A doing Consumer
.committableSource(settings, Subscriptions.topics("aaa"))
.groupedWithin(N, 10.seconds)
.mapAsync(P)(xs => store(xs)) where topic Since Then, at some point in time ( Say messages Say the deadline for commit is at Then Consumer
Process hosting I've given this some more thought and I can't say that as a user I have a perfect solution for this. If I had to make a choice, the safest option would be to discard (uncommitted) messages that are in the batch (and clear the batch) before they are passed to the next stage ( |
Thank you for this detailed explanation. Doesn't the Consumer setting There is no way to clear in-flight messages already in the stream. |
It's not just about new consumers in the group. If you have 2+ consumers periodically restarting with this type of batching, without a way to reliably commit the batch before revocation there is no way to prevent double-consuming. |
I mean, if we had a way to commit before reassignment, users would still have the choice of eg resorting to this: // assuming this API
trait AsyncRebalanceListener {
def onRevoke(pts: Set[TopicPartition]): Future[Unit]
def onAssign(pts: Set[TopicPartition]): Future[Unit]
}
// disregarding mutability and concurrency here
val offsets = Map.empty[GroupTopicPartition, CommittableOffset]
val rebalanceListener = new AsyncRebalanceListener {
def onRevoke(pts: Set[TopicPartition]) =
Future
.traverse(offsets.values())(_.commitScalaDsl())
.map(_ => ())
def onAssign(pts: Set[TopicPartition]): Future[Unit] =
Future.successful(())
}
src
.addRebalanceListener(rebalanceListener)
.map { msg =>
offsets.update(msg.offset.groupTopicPartition -> msg.offset)
msg
}
.grouped(N)
.mapAsync(A)(actuallyConsume())
.mapAsync(B)(commitNormally()) delivery semantics in this case is different from default (i think this means at-most-once for batches coinciding with rebalances and at-least-once for any other case), but this is anyway better that nothing. |
Yes, I guess you are right. What we need to add is a callback API that is not based on actor messaging. |
Looking into this a bit more I believe there is no good way to allow for user-defined callback methods for this.
which will be the It is a bad idea to hand out callbacks on that thread in the first place, what it worse is that you can't access anything interesting (like asking for a topic position or committing), as messages to the actor won't be handled while the thread is in the user code. Wrapping it in futures won't help as we'd still need to block until completion to guarantee the callback order. |
While digging around for #539 I noticed that the manual subscription case classes allow setting the actor to receive rebalance notifications. Manual subscriptions do never rebalance, so I deprecated those methods and made them no-ops for now.
Yes, I think we need to add an additional API with the blocking handlers. It may not be ideal, but the design of the |
@seglo that would be really great. |
Hey @seglo what's the status of that? I'm eager to help but need some guidance where to start. |
@szymonm I've been sidetracked. I think this may actually get handled in #705, but I'd need to check. I'm curious to learn more about the issue you're having though. The transactional sink/flow should already handle blocking for you in a transactional commit. This ticket is discussing consumer group commit event handler semantics. |
The problem we have is very similar to what @tkroman has described in: #539 (comment) IIUC, the root cause of the problem is that alpakka rebalance listener is not waiting for the records in the stream to be processed and committed. This could be done by exposing blocking I think that #705 should solve the problem because in #705 the whole stream is restarted when rebalancing happens, so all in-flight elements are not dropped. However, it also introduces a bit different model, where we have a stream per partition. We expect to have 1000s partitions processed by a single node, so that could lead to high memory demand. |
Created a test that demonstrates the problem #739 In the test, I create transactional streams that copy data from input to output queue. Due to multiple short-living consumers Kafka triggers partitions rebalancing. This results in data duplication because of consumers that don't wait for offset commits to flush before fetching offsets to read from. |
Fyi, I was able to fix the issue making the test fail and it was actually not due to rebalancing but the way we commit offsets in the |
I see. Thanks for the PR! |
What's worse is that after a rebalance the consumer that has in-flight messages from a revoked partition can actually commit them without error (although I've seen exceptions that say exactly that, something like "cannot commit because partition was revoked", I can't really reproduce them during normal rebalancing when a consumer is being stopped gracefully, e.g. consumer node redeploy - even 5 seconds after revocation it's still able to commit normally...). We have business logic where for each message there's an idempotent processing part with at least once processing guarantees and a non-idempotent part where we need at most once. Wanted to commit between these two stages to protect the second stage from processing by a new consumer, but it doesn't seem to work. External offset management is an option, but are there maybe some settings I'm not seeing that would help without external database? I'm using |
Assuming the semantic of the stream is at-least-once, is there a way, to simply discard all the messages or restart the stream from within the balance listener ?I have fully describe the issue i am facing here #750 |
@tkroman I wonder if you have faced the following: When a rebalance happens and you have revocation of partition, and you have a bunch of inflight message to commit, then you get a commitFailedException upon committing them because the partition was revoked. I wonder if a safe thing to do would be to simply resume on commitFailedException, provided that the semantic is at-least-once. This way who ever picked the partition will keep on processing them. |
That was my original point, that there should be a way to hold the
revolution off until I handle commits. Because the contract is that
revocation won’t happen until it’s handler returns. There are multiple
subclasses of this issue which are recoverable but there is no way to
prevent current behavior for now.
…On Mon, Mar 18, 2019 at 10:24 Maatary ***@***.***> wrote:
@tkroman <https://github.com/tkroman> I wonder if you have faced the
following:
When a rebalance happens and you have revocation of partition, and you
have a bunch of inflight message to commit, then you get a
commitFailedException upon committing them because the partition was
revoked.
I wonder if a safe thing to do would be to simply resume on
commitFailedException, provided that the semantic is at-least-once. This
way who ever picked the partition will keep on processing them.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#539 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABECoJoJcvjTtsKHLEwQvnNl2omyA6dKks5vX01NgaJpZM4VS9eB>
.
|
Agreed. Internally, we tried with making a revoke listener blocking and block until all messages are emitted by the So additionally we need to add a mechanism for making sure that transaction commit ( |
## Purpose Commit 1: Make use of the fact that the rebalance listener `WrappedAutoPausedListener` is called on the thread that called `poll`, which is the actor thread. So it is safe to touch actor state. The messages sent from it before are turned into method calls. Commit 2: Isolate commit-refreshing into a class holding the data structures and deadline logic. Commit 3: When commit-refreshing is switched off (the default) no data needs to be collected at all. Depending on the `commit-refresh-interval` setting a no-op implementation might be used. ## Background Context I'm investigating #750 and rebalance handling for #539 and found this to be a good step on the way. Since Kafka 2.1 commit-refreshing is not required anymore, as [KAFKA-4682](https://issues.apache.org/jira/browse/KAFKA-4682) is resolved.
Since you are working on it @ennru, let me share my thoughts (no solution yet). It does not look right to add analogous
In Java world we would express the necessity to block in If we wanted to do only a minimal changes, I would suggest to add a boolean flag to |
Yes, that makes sense. But I still wonder what the user will be able to do in the callback, as the actor can't respond do we need to hand out a reference to the |
I've now explored an API that would allow reacting synchronously to rebalance events. I want such an API to allow for external offset handling while still using Kafka's rebalancing. For that it is important to be able to block on both callbacks and - more imporantly - get access to the underlying consumer on the same thread that called poll. I'll push my work in progress soon. |
In #761 I explore how a synchronous handler for partition changes could work. Please have a look. |
This is not forgotten, I just opened #949 to address this. |
Contract:
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned-java.util.Collection-
Quote:
This guarantees that revoked partitions' records get the change to be taken care of: e.g. we can commit them etc etc.
However, here:
https://github.com/akka/alpakka-kafka/blob/master/core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala#L72
user callbacks are invoked asynchronously via the
notifyUserOnAssign
method which does not wait for the completion of user's handler, which opens a possibility for a "race condition" where consuming from newly assigned partitions will start before callbacks (e.g. assuring offsets commits) are finished.An idea of a fix would be something along these lines:
However, I'm not familiar with custom graph stages enough to make this into a functional implementation (since this becomes a
Future
, how do I incorporate that into an async callback?), so if maintainers see this as a valid concern (and a fix approach), I could use some help.The text was updated successfully, but these errors were encountered: