Skip to content

Commit

Permalink
fix(core): ensure partition closed before metadata listener close (#1068
Browse files Browse the repository at this point in the history
)

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Mar 31, 2024
1 parent b5fe31c commit 79781b4
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,17 @@ class BrokerServer(
}
}
lifecycleManager.beginShutdown()

// AutoMQ for Kafka inject start
// https://github.com/AutoMQ/automq-for-kafka/issues/540
// await partition shutdown:
// 1. after lifecycleManager start shutdown to trigger partitions gracefully reassign.
// 2. before metadataListener start close to ensure S3Stream can read the latest metadata.
if (replicaManager != null) {
CoreUtils.swallow(replicaManager.awaitAllPartitionShutdown(), this)
}
// AutoMQ for Kafka inject end

// Stop socket server to stop accepting any more connections and requests.
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null) {
Expand All @@ -675,14 +686,6 @@ class BrokerServer(
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)

// AutoMQ for Kafka inject start
// https://github.com/AutoMQ/automq-for-kafka/issues/540
// await partition shutdown before metadataListener.close()
if (replicaManager != null) {
CoreUtils.swallow(replicaManager.awaitAllPartitionShutdown(), this)
}
// AutoMQ for Kafka inject end

/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other
* resources that might have been shutdown and cause exceptions.
Expand Down

0 comments on commit 79781b4

Please sign in to comment.