Skip to content

Commit

Permalink
Send rebalance events immediately (fixes akka#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Oct 8, 2018
1 parent eb95fb3 commit ccf8d39
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,33 @@ private[kafka] abstract class SingleSourceLogic[K, V, Msg](
val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assignedTps =>
tps ++= assignedTps
log.log(partitionLogLevel, "Assigned partitions: {}. All partitions: {}", assignedTps, tps)
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsAssigned(subscription, assignedTps), sourceActor.ref)
}
requestMessages()
}

val partitionRevokedCB = getAsyncCallback[Set[TopicPartition]] { revokedTps =>
tps --= revokedTps
log.log(partitionLogLevel, "Revoked partitions: {}. All partitions: {}", revokedTps, tps)
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsRevoked(subscription, revokedTps), sourceActor.ref)
}
}

def rebalanceListener: KafkaConsumerActor.ListenerCallbacks =
KafkaConsumerActor.ListenerCallbacks(partitionAssignedCB.invoke, partitionRevokedCB.invoke)
KafkaConsumerActor.ListenerCallbacks(
assignedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsAssigned(subscription, assignedTps), sourceActor.ref)
}
if (assignedTps.nonEmpty) {
partitionAssignedCB.invoke(assignedTps)
}
},
revokedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsRevoked(subscription, revokedTps), sourceActor.ref)
}
if (revokedTps.nonEmpty) {
partitionRevokedCB.invoke(revokedTps)
}
}
)

subscription match {
case TopicSubscription(topics, _) =>
Expand Down

0 comments on commit ccf8d39

Please sign in to comment.