Skip to content

Commit

Permalink
[collector] fix the issue of reusing the adminClient in the Kafka c…
Browse files Browse the repository at this point in the history
…lient. (#2895)
  • Loading branch information
doveLin0818 authored Dec 23, 2024
1 parent 5d81c32 commit 364de60
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hertzbeat.collector.collect.kafka;

import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.collector.collect.common.cache.AbstractConnection;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand All @@ -32,6 +33,8 @@ public class KafkaConnect extends AbstractConnection<AdminClient> {

private static AdminClient adminClient;

private static String preUrl;

public KafkaConnect(String brokerList) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
Expand All @@ -53,12 +56,13 @@ public void closeConnection() {
}

public static synchronized AdminClient getAdminClient(String brokerList) {
if (adminClient == null) {
if (StringUtils.isBlank(preUrl) || !brokerList.equals(preUrl)) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
adminClient = KafkaAdminClient.create(properties);
preUrl = brokerList;
}
return adminClient;
}

}
}

0 comments on commit 364de60

Please sign in to comment.