diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 56b1bb9d89..8cf3851944 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -57,7 +57,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{ThreadUtils, Time} -import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} +import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicDelta, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager @@ -2682,7 +2682,15 @@ class ReplicaManager(val config: KafkaConfig, } def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = { - asyncApplyDelta(delta, newImage).get() + asyncApplyDelta(delta, newImage, (_, _) => {}).get() + } + + def getTopicDelta(topicName: String, newImage: MetadataImage, delta: TopicsDelta): Option[TopicDelta] = { + Option(newImage.topics().getTopic(topicName)).flatMap { + topicImage => Option(delta).flatMap { + topicDelta => Option(topicDelta.changedTopic(topicImage.id())) + } + } } // AutoMQ for Kafka inject start @@ -2692,7 +2700,7 @@ class ReplicaManager(val config: KafkaConfig, * @param delta The delta to apply. * @param newImage The new metadata image. */ - def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage): CompletableFuture[Void] = { + def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = { // Before taking the lock, compute the local changes val localChanges = delta.localChanges(config.nodeId) @@ -2709,7 +2717,7 @@ class ReplicaManager(val config: KafkaConfig, def doPartitionDeletion(): Unit = { stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).") deletes.forKeyValue((tp, _) => { - val opCf = doPartitionDeletionAsyncLocked(tp) + val opCf = doPartitionDeletionAsyncLocked(tp, delta, newImage, callback) opCfList.add(opCf) }) } @@ -2738,6 +2746,8 @@ class ReplicaManager(val config: KafkaConfig, val leader = mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]() leader += (tp -> info) applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, leader) + // Apply the delta before elect leader. + getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition())) } catch { case t: Throwable => stateChangeLogger.error(s"Transitioning partition(s) fail: $localChanges", t) } finally { @@ -2788,6 +2798,10 @@ class ReplicaManager(val config: KafkaConfig, * @return A future which completes when the partition has been deleted. */ private def doPartitionDeletionAsyncLocked(tp: TopicPartition): CompletableFuture[Void] = { + doPartitionDeletionAsyncLocked(tp, null, null, (_, _) => {}) + } + + private def doPartitionDeletionAsyncLocked(tp: TopicPartition, delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = { val prevOp = partitionOpMap.getOrDefault(tp, CompletableFuture.completedFuture(null)) val opCf = new CompletableFuture[Void]() partitionOpMap.put(tp, opCf) @@ -2806,6 +2820,9 @@ class ReplicaManager(val config: KafkaConfig, s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}") } } + if (newImage != null && delta != null) { + getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition())) + } } finally { opCf.complete(null) partitionOpMap.remove(tp, opCf) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 81df709361..338f5fc5b4 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -17,8 +17,6 @@ package kafka.server.metadata -import java.util.Properties -import java.util.concurrent.atomic.AtomicLong import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.{LogManager, UnifiedLog} @@ -27,11 +25,13 @@ import kafka.utils.Logging import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.utils.ThreadUtils -import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsDelta, TopicsImage} +import org.apache.kafka.image._ import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.fault.FaultHandler +import java.util.Properties +import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable @@ -233,51 +233,72 @@ class BrokerMetadataPublisher( } def handleTopicsDelta(deltaName: String, topicsDelta: TopicsDelta, delta: MetadataDelta, newImage: MetadataImage): Unit = { - // Notify the replica manager about changes to topics. - val cf = replicaManager.asyncApplyDelta(topicsDelta, newImage) - cf.whenCompleteAsync((nil, ex) => { - if (ex != null) { - metadataPublishingFaultHandler.handleFault("Error applying topics " + s"delta in ${deltaName}", ex) - } - try { - // Update the group coordinator of local changes - updateCoordinator(newImage, - delta, - Topic.GROUP_METADATA_TOPIC_NAME, - groupCoordinator.onElection, - groupCoordinator.onResignation) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " + - s"coordinator with local changes in ${deltaName}", t) + // Callback for each topic delta. + def callback(topicDelta: TopicDelta, partition: Int): Unit = { + if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) { + try { + // Handle the case where the group metadata topic was deleted + if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) { + val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(partition) + if (partitionRegistration != null && partitionRegistration.leader == brokerId) { + groupCoordinator.onResignation(partition, Option(partitionRegistration.leaderEpoch)) + } + } + + // Update the group coordinator of local changes + updateCoordinator( + topicDelta, + partition, + groupCoordinator.onElection, + (partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, leaderEpochOpt) + ) + } catch { + case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " + + s"coordinator with local changes in $deltaName", t) + } } - try { - // Update the transaction coordinator of local changes - updateCoordinator(newImage, - delta, - Topic.TRANSACTION_STATE_TOPIC_NAME, - txnCoordinator.onElection, - txnCoordinator.onResignation) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " + - s"coordinator with local changes in ${deltaName}", t) + + if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) { + try { + // Handle the case where the transaction state topic was deleted + if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) { + val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(partition) + if (partitionRegistration != null && partitionRegistration.leader == brokerId) { + groupCoordinator.onResignation(partition, Option(partitionRegistration.leaderEpoch)) + } + } + + // Update the transaction coordinator of local changes + updateCoordinator( + topicDelta, + partition, + txnCoordinator.onElection, + txnCoordinator.onResignation) + } catch { + case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " + + s"coordinator with local changes in $deltaName", t) + } } + try { // Notify the group coordinator about deleted topics. - val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]() - topicsDelta.deletedTopicIds().forEach { id => - val topicImage = topicsDelta.image().getTopic(id) - topicImage.partitions().keySet().forEach { - id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id) - } - } - if (deletedTopicPartitions.nonEmpty) { - groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, RequestLocal.NoCaching) + if (topicsDelta.deletedTopicIds().contains(topicDelta.id())) { + groupCoordinator.handleDeletedPartitions( + mutable.Seq.apply(new TopicPartition(topicDelta.name(), partition)), RequestLocal.NoCaching) } } catch { case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " + - s"coordinator with deleted partitions in ${deltaName}", t) + s"coordinator with deleted partitions in $deltaName", t) + } + } + + // Notify the replica manager about changes to topics. + val cf = replicaManager.asyncApplyDelta(topicsDelta, newImage, callback) + cf.whenComplete((_, ex) => { + if (ex != null) { + metadataPublishingFaultHandler.handleFault("Error applying topics " + s"delta in ${deltaName}", ex) } - }, partitionOpCallbackExecutor) + }) } override def publishedOffset: Long = publishedOffsetAtomic.get() @@ -331,6 +352,28 @@ class BrokerMetadataPublisher( } } + // AutoMQ inject start + def updateCoordinator( + topicDelta: TopicDelta, + partition: Int, + election: (Int, Int) => Unit, + resignation: (Int, Option[Int]) => Unit + ): Unit = { + val topicPartition = new TopicPartition(topicDelta.name(), partition) + val changes = topicDelta.localChanges(brokerId) + + if (changes.deletes.contains(topicPartition)) { + resignation(partition, None) + } + Option(changes.leaders.get(topicPartition)).foreach { partitionInfo => + election(partition, partitionInfo.partition.leaderEpoch) + } + Option(changes.followers().get(topicPartition)).foreach { partitionInfo => + resignation(partition, Some(partitionInfo.partition.leaderEpoch)) + } + } + // AutoMQ inject end + private def initializeManagers(): Unit = { try { // Start log manager, which will perform (potentially lengthy) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 60925eec21..31d38fc0da 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -97,6 +97,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + +