Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance event listeners don't respect ConsumerRebalanceListener contract #539

Closed
tkroman opened this issue Jul 17, 2018 · 29 comments · Fixed by #949
Closed

Rebalance event listeners don't respect ConsumerRebalanceListener contract #539

tkroman opened this issue Jul 17, 2018 · 29 comments · Fixed by #949
Assignees
Milestone

Comments

@tkroman
Copy link

tkroman commented Jul 17, 2018

Contract:
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned-java.util.Collection-

Quote:

It is guaranteed that all the processes in a consumer group will execute their onPartitionsRevoked(Collection) callback before any instance executes its onPartitionsAssigned(Collection) callback.

This guarantees that revoked partitions' records get the change to be taken care of: e.g. we can commit them etc etc.

However, here:
https://github.com/akka/alpakka-kafka/blob/master/core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala#L72
user callbacks are invoked asynchronously via the notifyUserOnAssign method which does not wait for the completion of user's handler, which opens a possibility for a "race condition" where consuming from newly assigned partitions will start before callbacks (e.g. assuring offsets commits) are finished.

An idea of a fix would be something along these lines:

-subscription.rebalanceListener.foreach(ref ⇒ ref ! TopicPartitionsAssigned(subscription, set))
+subscription.rebalanceListener
+  .map(ref ⇒ ref ? TopicPartitionsAssigned(subscription, set)
+  .map(_ => Done))
+  .getOrElse(Future.successful(()))

However, I'm not familiar with custom graph stages enough to make this into a functional implementation (since this becomes a Future, how do I incorporate that into an async callback?), so if maintainers see this as a valid concern (and a fix approach), I could use some help.

@2m
Copy link
Contributor

2m commented Jul 18, 2018

Thanks for raising this issue.

Upon more investigation I see that asynchronous execution starts even earlier. When the Apache Kafka client calls the listener, it continues further code execution as soon as the callback method completes.

https://github.com/apache/kafka/blob/c0518aa65f25317eca0c1da4d350f549d35b8536/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L419

Which is very soon, because the only thing that the Alpakka Kafka listener does, is an asynchronous message dispatch to run the graph stage callback (calling invoke method).

val onRevoke: Set[TopicPartition] Unit = set partitionRevokedCB.invoke(set)

And then, when the callback is executed there is additional asynchronous execution which is initiated by the message send to the listener actor which you have pointed out.

Now in most cases, when the stream is running without async boundaries, everything is running on a single graph interpreter, which handles messages in order. Therefore messages dispatched by invoke calls are going to be handled in order and one by one. Same with actor messages.

However if there are multiple consumer sources running on the same stream that are in their own async islands, then different graph interpreters will be running streams on the different async islands. Which means that one graph interpreter can execute both onRevoke and onAssign before the first one does anything. This can be tricky to solve, but should be fixed IMO nevertheless.

@tkroman
Copy link
Author

tkroman commented Aug 8, 2018

Hey. Was this triaged by any chance? If not, I could use some pointers / guidance on how to approach this.

@ennru
Copy link
Member

ennru commented Oct 5, 2018

I'm already kneedeep into that code and know exactly why these messages come in the wrong order. I'll give it a try, next week.
@tkroman It would be perfect if you could add a test case to PartitionedSourcesSpec that shows the expected behaviour and is marked with pendingUntilFixed.

@ennru ennru self-assigned this Oct 5, 2018
ennru added a commit to ennru/alpakka-kafka that referenced this issue Oct 8, 2018
@ennru
Copy link
Member

ennru commented Oct 10, 2018

There is no backwards-compatible way to turn these signals sent on revoking and on assignment into something semi-synchronous.
If that functionality is needed, we'd need to introduce a synchronousRebalanceListener on the subscriptions, that requires the actor to acknowledge the message within a timeout.
Alternatively, that could be a function that is passed in as used in SubSourceLogic to support fetching offset from external storage.
How exactly does your use-case look like?

@2m 2m closed this as completed in 8016194 Oct 10, 2018
@2m
Copy link
Contributor

2m commented Oct 10, 2018

Reopening as #595 did not address this.

@2m 2m reopened this Oct 10, 2018
@tkroman
Copy link
Author

tkroman commented Oct 14, 2018

@ennru the use-case would be something like microbatching with e.g. groupedWithin or its variations:

Consider consumer A doing

Consumer
  .committableSource(settings, Subscriptions.topics("aaa"))
  .groupedWithin(N, 10.seconds)
  .mapAsync(P)(xs => store(xs))

where topic aaa has 2[+] partitions, 0 & 1.

Since A is the only consumer, it's subscribed to both 0 and 1.

Then, at some point in time (T0) there is going to be a batch of at most N messages read from 0 / 1 which are not committed yet.

Say messages [0, (N-1)/2) were read from 0 and [(N-1)/2, N-1] were read from 1.

Say the deadline for commit is at TC > T0

Then

Consumer B kicks in at T1 >= T0, T1 < TC

A gets 0 and 1 revoked.

A gets 0 assigned, B gets 1 assigned.

B doesn't know that [0, N-1] have already been read by A, starts reading from 0 and reads half of the messages that A is about to commit.

Process hosting A processes messages [0, N-1] and commits them, B's process writes half of those as well. Also, if I'm not mistaken, positions aren't preserved between partition reassignment, so position for 0 is not stored, hence even though 0 stays with A, first half of messages (0 through (N-1)/2) are going to be re-consumed by A as well.

I've given this some more thought and I can't say that as a user I have a perfect solution for this.
Even if I had a way to react to partition revocation in a reliable (in the sense of opening post, i.e. "don't assign before revocation is completely handled") manner, I'm not sure what should happen to the in-flight messages (should the batch be cleaned and offsets not committed? / should offsets be committed and the elements in the batch kept?).

If I had to make a choice, the safest option would be to discard (uncommitted) messages that are in the batch (and clear the batch) before they are passed to the next stage (MapAsync / whatever). I don't know stream internals well enough to suggest this since I don't know if it's possible, but that's how it looks in my head.

@ennru
Copy link
Member

ennru commented Oct 15, 2018

Thank you for this detailed explanation.

Doesn't the Consumer setting .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") solve your problem?

There is no way to clear in-flight messages already in the stream.

@tkroman
Copy link
Author

tkroman commented Oct 19, 2018

It's not just about new consumers in the group. If you have 2+ consumers periodically restarting with this type of batching, without a way to reliably commit the batch before revocation there is no way to prevent double-consuming.

@tkroman
Copy link
Author

tkroman commented Oct 19, 2018

I mean, if we had a way to commit before reassignment, users would still have the choice of eg resorting to this:

// assuming this API
trait AsyncRebalanceListener {
  def onRevoke(pts: Set[TopicPartition]): Future[Unit]
  def onAssign(pts: Set[TopicPartition]): Future[Unit]
}

// disregarding mutability and concurrency here
val offsets = Map.empty[GroupTopicPartition, CommittableOffset]

val rebalanceListener = new AsyncRebalanceListener {
  def onRevoke(pts: Set[TopicPartition]) = 
    Future
      .traverse(offsets.values())(_.commitScalaDsl())
      .map(_ => ())

  def onAssign(pts: Set[TopicPartition]): Future[Unit] = 
    Future.successful(())
}

src
  .addRebalanceListener(rebalanceListener)
  .map { msg =>
    offsets.update(msg.offset.groupTopicPartition -> msg.offset)
    msg
  }
  .grouped(N)
  .mapAsync(A)(actuallyConsume())
  .mapAsync(B)(commitNormally())

delivery semantics in this case is different from default (i think this means at-most-once for batches coinciding with rebalances and at-least-once for any other case), but this is anyway better that nothing.

@ennru
Copy link
Member

ennru commented Oct 24, 2018

Yes, I guess you are right. What we need to add is a callback API that is not based on actor messaging.
I'll look into that, but I guess we'll release a 1.0-M1 version prior to that.

@ennru
Copy link
Member

ennru commented Nov 8, 2018

Looking into this a bit more I believe there is no good way to allow for user-defined callback methods for this.
The Kafka client's ConsumerRebalanceListener states

This callback will only execute in the user thread as part of the Consumer.poll(java.time.Duration) call whenever partition assignment changes.

which will be the KafkaConsumerActor's thread.

It is a bad idea to hand out callbacks on that thread in the first place, what it worse is that you can't access anything interesting (like asking for a topic position or committing), as messages to the actor won't be handled while the thread is in the user code.

Wrapping it in futures won't help as we'd still need to block until completion to guarantee the callback order.

2m pushed a commit that referenced this issue Dec 7, 2018
While digging around for #539 I noticed that the manual subscription case classes allow setting the actor to receive rebalance notifications. Manual subscriptions do never rebalance, so I deprecated those methods and made them no-ops for now.
@seglo
Copy link
Contributor

seglo commented Jan 14, 2019

Looking into this a bit more I believe there is no good way to allow for user-defined callback methods for this.
The Kafka client's ConsumerRebalanceListener states

This callback will only execute in the user thread as part of the Consumer.poll(java.time.Duration) call whenever partition assignment changes.

which will be the KafkaConsumerActor's thread.

It is a bad idea to hand out callbacks on that thread in the first place, what it worse is that you can't access anything interesting (like asking for a topic position or committing), as messages to the actor won't be handled while the thread is in the user code.

Wrapping it in futures won't help as we'd still need to block until completion to guarantee the callback order.

Yes, I think we need to add an additional API with the blocking handlers. It may not be ideal, but the design of the KafkaConsumer forces our hand here. I could work on a PR if you like.

@tkroman
Copy link
Author

tkroman commented Feb 25, 2019

@seglo that would be really great.

@szymonm
Copy link
Contributor

szymonm commented Mar 4, 2019

Hey @seglo what's the status of that?
Looks like we are hitting this problem with Transactional.source followed by groupWithin and commits stored in Kafka.

I'm eager to help but need some guidance where to start.

@seglo
Copy link
Contributor

seglo commented Mar 4, 2019

@szymonm I've been sidetracked. I think this may actually get handled in #705, but I'd need to check.

I'm curious to learn more about the issue you're having though. The transactional sink/flow should already handle blocking for you in a transactional commit. This ticket is discussing consumer group commit event handler semantics.

@szymonm
Copy link
Contributor

szymonm commented Mar 4, 2019

The problem we have is very similar to what @tkroman has described in: #539 (comment)
Except, we use Transactional.source.
Basically, we see data duplication, because after the rebalancing the consumer doesn't retrieve the offsets committed before the rebalancing.

IIUC, the root cause of the problem is that alpakka rebalance listener is not waiting for the records in the stream to be processed and committed. This could be done by exposing blocking onRevoke api that would allow us to manually wait for all the in-flight elements of the stream to flush.
Please correct me if I'm wrong and this should not happen in the Transactional flow case.

I think that #705 should solve the problem because in #705 the whole stream is restarted when rebalancing happens, so all in-flight elements are not dropped. However, it also introduces a bit different model, where we have a stream per partition. We expect to have 1000s partitions processed by a single node, so that could lead to high memory demand.

@szymonm
Copy link
Contributor

szymonm commented Mar 4, 2019

Created a test that demonstrates the problem #739

In the test, I create transactional streams that copy data from input to output queue. Due to multiple short-living consumers Kafka triggers partitions rebalancing. This results in data duplication because of consumers that don't wait for offset commits to flush before fetching offsets to read from.

@szymonm
Copy link
Contributor

szymonm commented Mar 8, 2019

Fyi, I was able to fix the issue making the test fail and it was actually not due to rebalancing but the way we commit offsets in the TransactionalProducerStage.
I still think that we need to add a blocking "listener" for onRevoke though.

@seglo
Copy link
Contributor

seglo commented Mar 8, 2019

I see. Thanks for the PR!

@einholen
Copy link

einholen commented Mar 9, 2019

What's worse is that after a rebalance the consumer that has in-flight messages from a revoked partition can actually commit them without error (although I've seen exceptions that say exactly that, something like "cannot commit because partition was revoked", I can't really reproduce them during normal rebalancing when a consumer is being stopped gracefully, e.g. consumer node redeploy - even 5 seconds after revocation it's still able to commit normally...).

We have business logic where for each message there's an idempotent processing part with at least once processing guarantees and a non-idempotent part where we need at most once. Wanted to commit between these two stages to protect the second stage from processing by a new consumer, but it doesn't seem to work.

External offset management is an option, but are there maybe some settings I'm not seeing that would help without external database?

I'm using committablePartitionedSource.

@Maatary
Copy link

Maatary commented Mar 16, 2019

Assuming the semantic of the stream is at-least-once, is there a way, to simply discard all the messages or restart the stream from within the balance listener ?I have fully describe the issue i am facing here #750

@Maatary
Copy link

Maatary commented Mar 18, 2019

@tkroman I wonder if you have faced the following:

When a rebalance happens and you have revocation of partition, and you have a bunch of inflight message to commit, then you get a commitFailedException upon committing them because the partition was revoked.

I wonder if a safe thing to do would be to simply resume on commitFailedException, provided that the semantic is at-least-once. This way who ever picked the partition will keep on processing them.

@tkroman
Copy link
Author

tkroman commented Mar 18, 2019 via email

@szymonm
Copy link
Contributor

szymonm commented Mar 18, 2019

Agreed.

Internally, we tried with making a revoke listener blocking and block until all messages are emitted by the Transacional.sink. This is still not enough as they can be emitted but not committed.

So additionally we need to add a mechanism for making sure that transaction commit ( producer.commitTransaction()) has been successfully finished.
Currently, all we have is that demand is stopped while committing the transaction. This is not enough in case of async, buffering etc.

ennru added a commit that referenced this issue Mar 24, 2019
## Purpose

Commit 1: Make use of the fact that the rebalance listener `WrappedAutoPausedListener` is called on the thread that called `poll`, which is the actor thread. So it is safe to touch actor state. The messages sent from it before are turned into method calls.

Commit 2: Isolate commit-refreshing into a class holding the data structures and deadline logic.

Commit 3: When commit-refreshing is switched off (the default) no data needs to be collected at all. Depending on the `commit-refresh-interval` setting a no-op implementation might be used.

## Background Context

I'm investigating #750 and rebalance handling for #539 and found this to be a good step on the way.
Since Kafka 2.1 commit-refreshing is not required anymore, as [KAFKA-4682](https://issues.apache.org/jira/browse/KAFKA-4682) is resolved.
@szymonm
Copy link
Contributor

szymonm commented Mar 25, 2019

Since you are working on it @ennru, let me share my thoughts (no solution yet).

It does not look right to add analogous synchronousRebalanceListener, because:

  1. I expect that the messages send to a listener are "fire and forget". Here we want to expect some kind of blocking mechanism, which seems contrary to what a typical listener does.
  2. It seems like there is no use case for blocking in onPartitionsAssigned callback.
  3. It will be confusing for the users, which listener to choose. And if the listener is an actor it may be hard to debug an issue after forgetting to send a message back from the listener.

In Java world we would express the necessity to block in onRevoke by proving a future. That would mean that the ListenerCallback awaits for the provided feature to complete.

If we wanted to do only a minimal changes, I would suggest to add a boolean flag to rebalanceListener creation, which if true, would mean that callback should wait for a response (like Done message) from a listener actor.

@ennru
Copy link
Member

ennru commented Mar 25, 2019

Yes, that makes sense. But I still wonder what the user will be able to do in the callback, as the actor can't respond do we need to hand out a reference to the KafkaConsumer?

@ennru
Copy link
Member

ennru commented Mar 27, 2019

I've now explored an API that would allow reacting synchronously to rebalance events.

I want such an API to allow for external offset handling while still using Kafka's rebalancing. For that it is important to be able to block on both callbacks and - more imporantly - get access to the underlying consumer on the same thread that called poll.

I'll push my work in progress soon.

@ennru
Copy link
Member

ennru commented Jun 24, 2019

In #761 I explore how a synchronous handler for partition changes could work. Please have a look.

@ennru
Copy link
Member

ennru commented Oct 23, 2019

This is not forgotten, I just opened #949 to address this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants