diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 56b1bb9d89..8cf3851944 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -57,7 +57,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{ThreadUtils, Time}
-import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
+import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicDelta, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager
@@ -2682,7 +2682,15 @@ class ReplicaManager(val config: KafkaConfig,
}
def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
- asyncApplyDelta(delta, newImage).get()
+ asyncApplyDelta(delta, newImage, (_, _) => {}).get()
+ }
+
+ def getTopicDelta(topicName: String, newImage: MetadataImage, delta: TopicsDelta): Option[TopicDelta] = {
+ Option(newImage.topics().getTopic(topicName)).flatMap {
+ topicImage => Option(delta).flatMap {
+ topicDelta => Option(topicDelta.changedTopic(topicImage.id()))
+ }
+ }
}
// AutoMQ for Kafka inject start
@@ -2692,7 +2700,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param delta The delta to apply.
* @param newImage The new metadata image.
*/
- def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage): CompletableFuture[Void] = {
+ def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = {
// Before taking the lock, compute the local changes
val localChanges = delta.localChanges(config.nodeId)
@@ -2709,7 +2717,7 @@ class ReplicaManager(val config: KafkaConfig,
def doPartitionDeletion(): Unit = {
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
deletes.forKeyValue((tp, _) => {
- val opCf = doPartitionDeletionAsyncLocked(tp)
+ val opCf = doPartitionDeletionAsyncLocked(tp, delta, newImage, callback)
opCfList.add(opCf)
})
}
@@ -2738,6 +2746,8 @@ class ReplicaManager(val config: KafkaConfig,
val leader = mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]()
leader += (tp -> info)
applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, leader)
+ // Apply the delta before elect leader.
+ getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition()))
} catch {
case t: Throwable => stateChangeLogger.error(s"Transitioning partition(s) fail: $localChanges", t)
} finally {
@@ -2788,6 +2798,10 @@ class ReplicaManager(val config: KafkaConfig,
* @return A future which completes when the partition has been deleted.
*/
private def doPartitionDeletionAsyncLocked(tp: TopicPartition): CompletableFuture[Void] = {
+ doPartitionDeletionAsyncLocked(tp, null, null, (_, _) => {})
+ }
+
+ private def doPartitionDeletionAsyncLocked(tp: TopicPartition, delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = {
val prevOp = partitionOpMap.getOrDefault(tp, CompletableFuture.completedFuture(null))
val opCf = new CompletableFuture[Void]()
partitionOpMap.put(tp, opCf)
@@ -2806,6 +2820,9 @@ class ReplicaManager(val config: KafkaConfig,
s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}")
}
}
+ if (newImage != null && delta != null) {
+ getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition()))
+ }
} finally {
opCf.complete(null)
partitionOpMap.remove(tp, opCf)
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 81df709361..338f5fc5b4 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -17,8 +17,6 @@
package kafka.server.metadata
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogManager, UnifiedLog}
@@ -27,11 +25,13 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.ThreadUtils
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsDelta, TopicsImage}
+import org.apache.kafka.image._
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.fault.FaultHandler
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
@@ -233,51 +233,72 @@ class BrokerMetadataPublisher(
}
def handleTopicsDelta(deltaName: String, topicsDelta: TopicsDelta, delta: MetadataDelta, newImage: MetadataImage): Unit = {
- // Notify the replica manager about changes to topics.
- val cf = replicaManager.asyncApplyDelta(topicsDelta, newImage)
- cf.whenCompleteAsync((nil, ex) => {
- if (ex != null) {
- metadataPublishingFaultHandler.handleFault("Error applying topics " + s"delta in ${deltaName}", ex)
- }
- try {
- // Update the group coordinator of local changes
- updateCoordinator(newImage,
- delta,
- Topic.GROUP_METADATA_TOPIC_NAME,
- groupCoordinator.onElection,
- groupCoordinator.onResignation)
- } catch {
- case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
- s"coordinator with local changes in ${deltaName}", t)
+ // Callback for each topic delta.
+ def callback(topicDelta: TopicDelta, partition: Int): Unit = {
+ if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) {
+ try {
+ // Handle the case where the group metadata topic was deleted
+ if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
+ val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(partition)
+ if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
+ groupCoordinator.onResignation(partition, Option(partitionRegistration.leaderEpoch))
+ }
+ }
+
+ // Update the group coordinator of local changes
+ updateCoordinator(
+ topicDelta,
+ partition,
+ groupCoordinator.onElection,
+ (partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, leaderEpochOpt)
+ )
+ } catch {
+ case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
+ s"coordinator with local changes in $deltaName", t)
+ }
}
- try {
- // Update the transaction coordinator of local changes
- updateCoordinator(newImage,
- delta,
- Topic.TRANSACTION_STATE_TOPIC_NAME,
- txnCoordinator.onElection,
- txnCoordinator.onResignation)
- } catch {
- case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
- s"coordinator with local changes in ${deltaName}", t)
+
+ if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) {
+ try {
+ // Handle the case where the transaction state topic was deleted
+ if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
+ val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(partition)
+ if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
+ groupCoordinator.onResignation(partition, Option(partitionRegistration.leaderEpoch))
+ }
+ }
+
+ // Update the transaction coordinator of local changes
+ updateCoordinator(
+ topicDelta,
+ partition,
+ txnCoordinator.onElection,
+ txnCoordinator.onResignation)
+ } catch {
+ case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
+ s"coordinator with local changes in $deltaName", t)
+ }
}
+
try {
// Notify the group coordinator about deleted topics.
- val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
- topicsDelta.deletedTopicIds().forEach { id =>
- val topicImage = topicsDelta.image().getTopic(id)
- topicImage.partitions().keySet().forEach {
- id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id)
- }
- }
- if (deletedTopicPartitions.nonEmpty) {
- groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, RequestLocal.NoCaching)
+ if (topicsDelta.deletedTopicIds().contains(topicDelta.id())) {
+ groupCoordinator.handleDeletedPartitions(
+ mutable.Seq.apply(new TopicPartition(topicDelta.name(), partition)), RequestLocal.NoCaching)
}
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
- s"coordinator with deleted partitions in ${deltaName}", t)
+ s"coordinator with deleted partitions in $deltaName", t)
+ }
+ }
+
+ // Notify the replica manager about changes to topics.
+ val cf = replicaManager.asyncApplyDelta(topicsDelta, newImage, callback)
+ cf.whenComplete((_, ex) => {
+ if (ex != null) {
+ metadataPublishingFaultHandler.handleFault("Error applying topics " + s"delta in ${deltaName}", ex)
}
- }, partitionOpCallbackExecutor)
+ })
}
override def publishedOffset: Long = publishedOffsetAtomic.get()
@@ -331,6 +352,28 @@ class BrokerMetadataPublisher(
}
}
+ // AutoMQ inject start
+ def updateCoordinator(
+ topicDelta: TopicDelta,
+ partition: Int,
+ election: (Int, Int) => Unit,
+ resignation: (Int, Option[Int]) => Unit
+ ): Unit = {
+ val topicPartition = new TopicPartition(topicDelta.name(), partition)
+ val changes = topicDelta.localChanges(brokerId)
+
+ if (changes.deletes.contains(topicPartition)) {
+ resignation(partition, None)
+ }
+ Option(changes.leaders.get(topicPartition)).foreach { partitionInfo =>
+ election(partition, partitionInfo.partition.leaderEpoch)
+ }
+ Option(changes.followers().get(topicPartition)).foreach { partitionInfo =>
+ resignation(partition, Some(partitionInfo.partition.leaderEpoch))
+ }
+ }
+ // AutoMQ inject end
+
private def initializeManagers(): Unit = {
try {
// Start log manager, which will perform (potentially lengthy)
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 60925eec21..31d38fc0da 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -97,6 +97,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
+
+
+
+
+