Skip to content

Commit

Permalink
feat(metadata): fix data race when close and reopen coordinator parti…
Browse files Browse the repository at this point in the history
…tion (#1255)

* feat(metadata): fix data race when close and reopen coordinator partition

Signed-off-by: SSpirits <[email protected]>

* fix(metadata): fix potential data race

Signed-off-by: SSpirits <[email protected]>

* feat(spotbugs): make spotbugs happy

Signed-off-by: SSpirits <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored May 14, 2024
1 parent 35ec4ee commit a1667a3
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 44 deletions.
25 changes: 21 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{ThreadUtils, Time}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicDelta, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager
Expand Down Expand Up @@ -2682,7 +2682,15 @@ class ReplicaManager(val config: KafkaConfig,
}

def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
asyncApplyDelta(delta, newImage).get()
asyncApplyDelta(delta, newImage, (_, _) => {}).get()
}

def getTopicDelta(topicName: String, newImage: MetadataImage, delta: TopicsDelta): Option[TopicDelta] = {
Option(newImage.topics().getTopic(topicName)).flatMap {
topicImage => Option(delta).flatMap {
topicDelta => Option(topicDelta.changedTopic(topicImage.id()))
}
}
}

// AutoMQ for Kafka inject start
Expand All @@ -2692,7 +2700,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param delta The delta to apply.
* @param newImage The new metadata image.
*/
def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage): CompletableFuture[Void] = {
def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = {
// Before taking the lock, compute the local changes
val localChanges = delta.localChanges(config.nodeId)

Expand All @@ -2709,7 +2717,7 @@ class ReplicaManager(val config: KafkaConfig,
def doPartitionDeletion(): Unit = {
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
deletes.forKeyValue((tp, _) => {
val opCf = doPartitionDeletionAsyncLocked(tp)
val opCf = doPartitionDeletionAsyncLocked(tp, delta, newImage, callback)
opCfList.add(opCf)
})
}
Expand Down Expand Up @@ -2738,6 +2746,8 @@ class ReplicaManager(val config: KafkaConfig,
val leader = mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]()
leader += (tp -> info)
applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, leader)
// Apply the delta before elect leader.
getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition()))
} catch {
case t: Throwable => stateChangeLogger.error(s"Transitioning partition(s) fail: $localChanges", t)
} finally {
Expand Down Expand Up @@ -2788,6 +2798,10 @@ class ReplicaManager(val config: KafkaConfig,
* @return A future which completes when the partition has been deleted.
*/
private def doPartitionDeletionAsyncLocked(tp: TopicPartition): CompletableFuture[Void] = {
doPartitionDeletionAsyncLocked(tp, null, null, (_, _) => {})
}

private def doPartitionDeletionAsyncLocked(tp: TopicPartition, delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = {
val prevOp = partitionOpMap.getOrDefault(tp, CompletableFuture.completedFuture(null))
val opCf = new CompletableFuture[Void]()
partitionOpMap.put(tp, opCf)
Expand All @@ -2806,6 +2820,9 @@ class ReplicaManager(val config: KafkaConfig,
s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}")
}
}
if (newImage != null && delta != null) {
getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition()))
}
} finally {
opCf.complete(null)
partitionOpMap.remove(tp, opCf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package kafka.server.metadata

import java.util.Properties
import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogManager, UnifiedLog}
Expand All @@ -27,11 +25,13 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.ThreadUtils
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsDelta, TopicsImage}
import org.apache.kafka.image._
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.fault.FaultHandler

import java.util.Properties
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable


Expand Down Expand Up @@ -233,51 +233,72 @@ class BrokerMetadataPublisher(
}

def handleTopicsDelta(deltaName: String, topicsDelta: TopicsDelta, delta: MetadataDelta, newImage: MetadataImage): Unit = {
// Notify the replica manager about changes to topics.
val cf = replicaManager.asyncApplyDelta(topicsDelta, newImage)
cf.whenCompleteAsync((nil, ex) => {
if (ex != null) {
metadataPublishingFaultHandler.handleFault("Error applying topics " + s"delta in ${deltaName}", ex)
}
try {
// Update the group coordinator of local changes
updateCoordinator(newImage,
delta,
Topic.GROUP_METADATA_TOPIC_NAME,
groupCoordinator.onElection,
groupCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in ${deltaName}", t)
// Callback for each topic delta.
def callback(topicDelta: TopicDelta, partition: Int): Unit = {
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Handle the case where the group metadata topic was deleted
if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(partition)
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(partition, Option(partitionRegistration.leaderEpoch))
}
}

// Update the group coordinator of local changes
updateCoordinator(
topicDelta,
partition,
groupCoordinator.onElection,
(partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, leaderEpochOpt)
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
}
}
try {
// Update the transaction coordinator of local changes
updateCoordinator(newImage,
delta,
Topic.TRANSACTION_STATE_TOPIC_NAME,
txnCoordinator.onElection,
txnCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
s"coordinator with local changes in ${deltaName}", t)

if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Handle the case where the transaction state topic was deleted
if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(partition)
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(partition, Option(partitionRegistration.leaderEpoch))
}
}

// Update the transaction coordinator of local changes
updateCoordinator(
topicDelta,
partition,
txnCoordinator.onElection,
txnCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
s"coordinator with local changes in $deltaName", t)
}
}

try {
// Notify the group coordinator about deleted topics.
val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
topicsDelta.deletedTopicIds().forEach { id =>
val topicImage = topicsDelta.image().getTopic(id)
topicImage.partitions().keySet().forEach {
id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id)
}
}
if (deletedTopicPartitions.nonEmpty) {
groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, RequestLocal.NoCaching)
if (topicsDelta.deletedTopicIds().contains(topicDelta.id())) {
groupCoordinator.handleDeletedPartitions(
mutable.Seq.apply(new TopicPartition(topicDelta.name(), partition)), RequestLocal.NoCaching)
}
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with deleted partitions in ${deltaName}", t)
s"coordinator with deleted partitions in $deltaName", t)
}
}

// Notify the replica manager about changes to topics.
val cf = replicaManager.asyncApplyDelta(topicsDelta, newImage, callback)
cf.whenComplete((_, ex) => {
if (ex != null) {
metadataPublishingFaultHandler.handleFault("Error applying topics " + s"delta in ${deltaName}", ex)
}
}, partitionOpCallbackExecutor)
})
}

override def publishedOffset: Long = publishedOffsetAtomic.get()
Expand Down Expand Up @@ -331,6 +352,28 @@ class BrokerMetadataPublisher(
}
}

// AutoMQ inject start
def updateCoordinator(
topicDelta: TopicDelta,
partition: Int,
election: (Int, Int) => Unit,
resignation: (Int, Option[Int]) => Unit
): Unit = {
val topicPartition = new TopicPartition(topicDelta.name(), partition)
val changes = topicDelta.localChanges(brokerId)

if (changes.deletes.contains(topicPartition)) {
resignation(partition, None)
}
Option(changes.leaders.get(topicPartition)).foreach { partitionInfo =>
election(partition, partitionInfo.partition.leaderEpoch)
}
Option(changes.followers().get(topicPartition)).foreach { partitionInfo =>
resignation(partition, Some(partitionInfo.partition.leaderEpoch))
}
}
// AutoMQ inject end

private def initializeManagers(): Unit = {
try {
// Start log manager, which will perform (potentially lengthy)
Expand Down
5 changes: 5 additions & 0 deletions gradle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
</Or>
</Match>

<Match>
<Class name="kafka.server.ReplicaManager"/>
<Bug pattern="UPM_UNCALLED_PRIVATE_METHOD"/>
</Match>

<!-- false positive in Java 11, related to https://github.com/spotbugs/spotbugs/issues/756 but more complex -->
<Match>
<Class name="org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream"/>
Expand Down

0 comments on commit a1667a3

Please sign in to comment.