From 364de609bf8869755fa29c008e62e522e085145a Mon Sep 17 00:00:00 2001 From: doveLin Date: Mon, 23 Dec 2024 11:44:45 +0800 Subject: [PATCH] [collector] fix the issue of reusing the `adminClient` in the Kafka client. (#2895) --- .../hertzbeat/collector/collect/kafka/KafkaConnect.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java b/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java index a898c6824f6..5507d8a3cb3 100644 --- a/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java +++ b/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java @@ -17,6 +17,7 @@ package org.apache.hertzbeat.collector.collect.kafka; +import org.apache.commons.lang3.StringUtils; import org.apache.hertzbeat.collector.collect.common.cache.AbstractConnection; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; @@ -32,6 +33,8 @@ public class KafkaConnect extends AbstractConnection { private static AdminClient adminClient; + private static String preUrl; + public KafkaConnect(String brokerList) { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); @@ -53,12 +56,13 @@ public void closeConnection() { } public static synchronized AdminClient getAdminClient(String brokerList) { - if (adminClient == null) { + if (StringUtils.isBlank(preUrl) || !brokerList.equals(preUrl)) { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); adminClient = KafkaAdminClient.create(properties); + preUrl = brokerList; } return adminClient; } -} +} \ No newline at end of file