Skip to content

Commit

Permalink
feat(auto_balancer): support client auth config (#1138)
Browse files Browse the repository at this point in the history
* feat(auto_balancer): support client auth config

Signed-off-by: Shichao Nie <[email protected]>

* test(auto_balancer): fix unit tests

Signed-off-by: Shichao Nie <[email protected]>

---------

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Apr 19, 2024
1 parent 879010b commit de9438d
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 58 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import kafka.autobalancer.common.AutoBalancerThreadFactory;
import kafka.autobalancer.common.types.MetricTypes;
import kafka.autobalancer.config.AutoBalancerConfig;
import kafka.autobalancer.config.AutoBalancerConfigUtils;
import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics;
Expand Down Expand Up @@ -82,6 +83,7 @@ public class LoadRetriever implements BrokerStatusListener {
private final ScheduledExecutorService mainExecutorService;
private final Set<Integer> brokerIdsInUse;
private final Set<TopicPartition> currentAssignment = new HashSet<>();
private final AutoBalancerControllerConfig config;
private volatile boolean leaderEpochInitialized;
private volatile boolean isLeader;
private volatile Consumer<String, AutoBalancerMetrics> consumer;
Expand All @@ -104,6 +106,7 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,
this.cond = lock.newCondition();
this.mainExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-main"));
leaderEpochInitialized = false;
this.config = config;
metricReporterTopic = config.getString(AutoBalancerConfig.AUTO_BALANCER_TOPIC_CONFIG);
metricReporterTopicPartition = config.getInt(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG);
metricReporterTopicRetentionTime = config.getLong(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_RETENTION_MS_CONFIG);
Expand Down Expand Up @@ -150,6 +153,7 @@ protected KafkaConsumer<String, AutoBalancerMetrics> createConsumer(String boots
consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
AutoBalancerConfigUtils.addSslConfigs(consumerProps, config);
return new KafkaConsumer<>(consumerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

package kafka.autobalancer.config;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.types.Password;

import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,17 +30,27 @@ public class AutoBalancerConfig extends AbstractConfig {
public static final String AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG = PREFIX + "topic.num.partitions";
public static final String AUTO_BALANCER_METRICS_TOPIC_RETENTION_MS_CONFIG = PREFIX + "topic.retention.ms";
public static final String AUTO_BALANCER_METRICS_TOPIC_CLEANUP_POLICY = PREFIX + "topic.cleanup.policy";
public static final String AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL = PREFIX + "client.auth." + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
public static final String AUTO_BALANCER_CLIENT_AUTH_SASL_MECHANISM = PREFIX + "client.auth." + SaslConfigs.SASL_MECHANISM;
public static final String AUTO_BALANCER_CLIENT_AUTH_SASL_JAAS_CONFIG = PREFIX + "client.auth." + SaslConfigs.SASL_JAAS_CONFIG;
/* Default values */
public static final String DEFAULT_AUTO_BALANCER_TOPIC = "__auto_balancer_metrics";
public static final Integer DEFAULT_AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS = 1;
public static final long DEFAULT_AUTO_BALANCER_METRICS_TOPIC_RETENTION_MS = TimeUnit.MINUTES.toMillis(30);
public static final String DEFAULT_AUTO_BALANCER_METRICS_TOPIC_CLEANUP_POLICY = String.join(",", TopicConfig.CLEANUP_POLICY_DELETE);
public static final String DEFAULT_AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
public static final String DEFAULT_AUTO_BALANCER_CLIENT_AUTH_SASL_MECHANISM = SaslConfigs.DEFAULT_SASL_MECHANISM;
public static final Password DEFAULT_AUTO_BALANCER_CLIENT_AUTH_SASL_JAAS_CONFIG = null;
/* Documents */
private static final String AUTO_BALANCER_TOPIC_DOC = "The topic to which Auto Balancer metrics reporter "
+ "should send messages";
private static final String AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS_DOC = "The number of partitions of Auto Balancer metrics topic";
private static final String AUTO_BALANCER_METRICS_TOPIC_RETENTION_MS_DOC = TopicConfig.RETENTION_MS_DOC;
public static final String AUTO_BALANCER_METRICS_TOPIC_CLEANUP_POLICY_DOC = TopicConfig.CLEANUP_POLICY_DOC;
public static final String AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
public static final String AUTO_BALANCER_CLIENT_AUTH_SASL_MECHANISM_DOC = SaslConfigs.SASL_MECHANISM_DOC;
public static final String AUTO_BALANCER_CLIENT_AUTH_SASL_JAAS_CONFIG_DOC = SaslConfigs.SASL_JAAS_CONFIG_DOC;


static {
CONFIG = new ConfigDef()
Expand All @@ -60,7 +73,22 @@ public class AutoBalancerConfig extends AbstractConfig {
ConfigDef.Type.STRING,
DEFAULT_AUTO_BALANCER_METRICS_TOPIC_CLEANUP_POLICY,
ConfigDef.Importance.HIGH,
AUTO_BALANCER_METRICS_TOPIC_CLEANUP_POLICY_DOC);
AUTO_BALANCER_METRICS_TOPIC_CLEANUP_POLICY_DOC)
.define(AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL,
ConfigDef.Type.STRING,
DEFAULT_AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL,
ConfigDef.Importance.MEDIUM,
AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL_DOC)
.define(AUTO_BALANCER_CLIENT_AUTH_SASL_MECHANISM,
ConfigDef.Type.STRING,
DEFAULT_AUTO_BALANCER_CLIENT_AUTH_SASL_MECHANISM,
ConfigDef.Importance.MEDIUM,
AUTO_BALANCER_CLIENT_AUTH_SASL_MECHANISM_DOC)
.define(AUTO_BALANCER_CLIENT_AUTH_SASL_JAAS_CONFIG,
ConfigDef.Type.PASSWORD,
DEFAULT_AUTO_BALANCER_CLIENT_AUTH_SASL_JAAS_CONFIG,
ConfigDef.Importance.MEDIUM,
AUTO_BALANCER_CLIENT_AUTH_SASL_JAAS_CONFIG_DOC);
}

public AutoBalancerConfig(Map<?, ?> originals, boolean doLogs) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.config;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;

import java.util.Properties;

/**
* This class was modified based on Cruise Control: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils.
* Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
*/
public class AutoBalancerConfigUtils {

/**
* Parse AdminClient configs based on the given {@link AutoBalancerConfig configs}.
*
* @param clientConfigs Configs that will be return with SSL configs.
* @param configs Configs to be used for parsing AdminClient SSL configs.
*/
public static void addSslConfigs(Properties clientConfigs, AutoBalancerConfig configs) {
// Add security protocol (if specified).
try {
String securityProtocol = configs.getString(AutoBalancerConfig.AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL);
clientConfigs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
setStringConfigIfExists(configs, clientConfigs, AutoBalancerConfig.AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
setStringConfigIfExists(configs, clientConfigs, AutoBalancerConfig.AUTO_BALANCER_CLIENT_AUTH_SASL_MECHANISM,
SaslConfigs.SASL_MECHANISM);
setPasswordConfigIfExists(configs, clientConfigs, AutoBalancerConfig.AUTO_BALANCER_CLIENT_AUTH_SASL_JAAS_CONFIG,
SaslConfigs.SASL_JAAS_CONFIG);
} catch (ConfigException ce) {
// let it go.
}
}

private static void setPasswordConfigIfExists(AutoBalancerConfig configs, Properties props, String name, String originalName) {
try {
Password pwd = configs.getPassword(name);
if (pwd != null) {
props.put(originalName, pwd);
}
} catch (ConfigException ce) {
// let it go.
}
}

private static void setStringConfigIfExists(AutoBalancerConfig configs, Properties props, String name, String originalName) {
try {
props.put(originalName, configs.getString(name));
} catch (ConfigException ce) {
// let it go.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static String config(String baseConfigName) {
public static Properties parseProducerConfigs(Map<String, ?> configMap) {
Properties props = new Properties();
for (Map.Entry<String, ?> entry : configMap.entrySet()) {
if (entry.getKey().startsWith(PREFIX)) {
if (entry.getKey().startsWith(PREFIX) && CONFIGS.contains(entry.getKey())) {
props.put(entry.getKey().replace(PREFIX, ""), entry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.yammer.metrics.core.MetricsRegistryListener;
import kafka.autobalancer.common.types.MetricTypes;
import kafka.autobalancer.common.types.RawMetricTypes;
import kafka.autobalancer.config.AutoBalancerConfigUtils;
import kafka.autobalancer.config.AutoBalancerMetricsReporterConfig;
import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics;
import kafka.autobalancer.metricsreporter.metric.MetricSerde;
Expand Down Expand Up @@ -48,6 +49,7 @@

/**
* This class was modified based on Cruise Control: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.
* Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
*/
public class AutoBalancerMetricsReporter implements MetricsRegistryListener, MetricsReporter, Runnable {
public static final String DEFAULT_BOOTSTRAP_SERVERS_HOST = "localhost";
Expand Down Expand Up @@ -185,6 +187,7 @@ public void configure(Map<String, ?> rawConfigs) {
setIfAbsent(producerProps, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
setIfAbsent(producerProps, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
setIfAbsent(producerProps, ProducerConfig.ACKS_CONFIG, "all");
AutoBalancerConfigUtils.addSslConfigs(producerProps, reporterConfig);

metricsReporterCreateRetries = reporterConfig.getInt(
AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_CREATE_RETRIES_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,11 @@

package kafka.autobalancer.metricsreporter;

import kafka.autobalancer.config.AutoBalancerMetricsReporterConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;

import java.util.Map;
import java.util.Properties;
Expand All @@ -30,6 +25,7 @@

/**
* This class was modified based on Cruise Control: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils.
* Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
*/
public final class AutoBalancerMetricsUtils {

Expand Down Expand Up @@ -91,57 +87,6 @@ public static void closeAdminClientWithTimeout(AdminClient adminClient, long tim
}, timeoutMs);
}

/**
* Parse AdminClient configs based on the given {@link AutoBalancerMetricsReporterConfig configs}.
*
* @param adminClientConfigs Configs that will be return with SSL configs.
* @param configs Configs to be used for parsing AdminClient SSL configs.
* @return AdminClient configs.
*/
public static Properties addSslConfigs(Properties adminClientConfigs, AutoBalancerMetricsReporterConfig configs) {
// Add security protocol (if specified).
try {
String securityProtocol = configs.getString(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
adminClientConfigs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
setStringConfigIfExists(configs, adminClientConfigs, SaslConfigs.SASL_MECHANISM);
setPasswordConfigIfExists(configs, adminClientConfigs, SaslConfigs.SASL_JAAS_CONFIG);

// Configure SSL configs (if security protocol is SSL or SASL_SSL)
if (securityProtocol.equals(SecurityProtocol.SSL.name) || securityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEY_PASSWORD_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
}
} catch (ConfigException ce) {
// let it go.
}

return adminClientConfigs;
}

private static void setPasswordConfigIfExists(AutoBalancerMetricsReporterConfig configs, Properties props, String name) {
try {
props.put(name, configs.getPassword(name));
} catch (ConfigException ce) {
// let it go.
}
}

private static void setStringConfigIfExists(AutoBalancerMetricsReporterConfig configs, Properties props, String name) {
try {
props.put(name, configs.getString(name));
} catch (ConfigException ce) {
// let it go.
}
}

/**
* Create a config altering operation if config's current value does not equal to target value.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.config;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class AutoBalancerConfigTest {

@Test
public void testNoPredefinedConfig() {
Map<String, String> props = new HashMap<>();
props.put(AutoBalancerConfig.AUTO_BALANCER_TOPIC_CONFIG, "test-topic");
props.put(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "2");
props.put("some.other.config", "some-value");
AutoBalancerConfig config = new AutoBalancerConfig(props, false);
Assertions.assertEquals("test-topic", config.getString(AutoBalancerConfig.AUTO_BALANCER_TOPIC_CONFIG));
Assertions.assertEquals(2, config.getInt(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG));
Assertions.assertThrowsExactly(ConfigException.class, () -> config.getString("some.other.config"));
}

@Test
public void testSSLConfig() {
Map<String, Object> props = new HashMap<>();
props.put(AutoBalancerConfig.AUTO_BALANCER_CLIENT_AUTH_SECURITY_PROTOCOL, SecurityProtocol.SASL_PLAINTEXT.name);
props.put(AutoBalancerConfig.AUTO_BALANCER_CLIENT_AUTH_SASL_MECHANISM, "PLAIN");
Password pwd = new Password("jaas-config");
props.put(AutoBalancerConfig.AUTO_BALANCER_CLIENT_AUTH_SASL_JAAS_CONFIG, pwd);

AutoBalancerMetricsReporterConfig config = new AutoBalancerMetricsReporterConfig(props, false);
Properties clientConfig = new Properties();
AutoBalancerConfigUtils.addSslConfigs(clientConfig, config);
Assertions.assertEquals(SecurityProtocol.SASL_PLAINTEXT.name, clientConfig.getProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG));
Assertions.assertEquals("PLAIN", clientConfig.getProperty(SaslConfigs.SASL_MECHANISM));
Assertions.assertEquals(pwd, clientConfig.get(SaslConfigs.SASL_JAAS_CONFIG));

AutoBalancerControllerConfig ctlConfig = new AutoBalancerControllerConfig(props, false);
clientConfig = new Properties();
AutoBalancerConfigUtils.addSslConfigs(clientConfig, ctlConfig);
Assertions.assertEquals(SecurityProtocol.SASL_PLAINTEXT.name, clientConfig.getProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG));
Assertions.assertEquals("PLAIN", clientConfig.getProperty(SaslConfigs.SASL_MECHANISM));
Assertions.assertEquals(pwd, clientConfig.get(SaslConfigs.SASL_JAAS_CONFIG));
}
}

0 comments on commit de9438d

Please sign in to comment.