diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e0d4e4e89e..3737871f10 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2724,6 +2724,19 @@ class ReplicaManager(val config: KafkaConfig, doPartitionDeletion() + // Clean up the consumption offset for the deleted partitions that do not belong to the current broker. + delta.deletedTopicIds().forEach { id => + val topicImage = delta.image().getTopic(id) + topicImage.partitions().entrySet().forEach { entry => { + val partitionId = entry.getKey + val partition = entry.getValue + if (partition.leader != config.nodeId) { + callback(new TopicPartition(topicImage.name(), partitionId)) + } + } + } + } + } // Handle partitions which we are now the leader or follower for.