Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-4075 Supprt mTLS between Storm and ZK #3692

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 30000
storm.zookeeper.auth.user: null
storm.zookeeper.auth.password: null
storm.zookeeper.ssl.enable: false
storm.zookeeper.ssl.hostnameVerification: false
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "org.apache.storm.security.auth.SimpleTransportPlugin"
Expand Down
5 changes: 5 additions & 0 deletions storm-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
19 changes: 19 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,25 @@ public class Config extends HashMap<String, Object> {
*/
@IsString
public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME = "storm.zookeeper.topology.auth.scheme";

/** Enable SSL/TLS for ZooKeeper client connection. */
@IsBoolean
public static final String ZK_SSL_ENABLE = "storm.zookeeper.ssl.enable";
purushah marked this conversation as resolved.
Show resolved Hide resolved
/** Keystore location for ZooKeeper client connection over SSL. */
@IsString
public static final String STORM_ZOOKEEPER_SSL_KEYSTORE_PATH = "storm.zookeeper.ssl.keystore.path";
/** Keystore password for ZooKeeper client connection over SSL. */
@IsString
public static final String STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD = "storm.zookeeper.ssl.keystore.password";
/** Truststore location for ZooKeeper client connection over SSL. */
@IsString
public static final String STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH = "storm.zookeeper.ssl.truststore.path";
/** Truststore password for ZooKeeper client connection over SSL. */
@IsString
public static final String STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD = "storm.zookeeper.ssl.truststore.password";
/** Enable or disable hostname verification.*/
@IsBoolean
public static final String STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION = "storm.zookeeper.ssl.hostnameVerification";
/**
* The delegate for serializing metadata, should be used for serialized objects stored in zookeeper and on disk. This is NOT used for
* compressing serialized tuples sent between topologies.
Expand Down
118 changes: 118 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.naming.ConfigurationException;
import org.apache.storm.Config;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.shade.org.apache.curator.framework.api.ACLProvider;
import org.apache.storm.shade.org.apache.zookeeper.client.ZKClientConfig;
import org.apache.storm.shade.org.apache.zookeeper.common.ClientX509Util;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CuratorUtils {
public static final Logger LOG = LoggerFactory.getLogger(CuratorUtils.class);
public static final String CLIENT_CNXN
= org.apache.storm.shade.org.apache.zookeeper.ClientCnxnSocketNetty.class.getName();

public static CuratorFramework newCurator(Map<String, Object> conf, List<String> servers, Object port, String root,
List<ACL> defaultAcl) {
Expand Down Expand Up @@ -84,6 +89,119 @@ protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, fina
if (auth != null && auth.scheme != null && auth.payload != null) {
builder.authorization(auth.scheme, auth.payload);
}
boolean sslEnabled = ObjectReader.getBoolean(conf.get(Config.ZK_SSL_ENABLE), false);
if (sslEnabled) {
SslConf sslConf = new SslConf(conf);
ZKClientConfig zkClientConfig = new ZKClientConfig();
try {
setSslConfiguration(zkClientConfig, new ClientX509Util(), sslConf);
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
builder.zkClientConfig(zkClientConfig);
}
}

/**
* Configure ZooKeeper Client with SSL/TLS connection.
* @param zkClientConfig ZooKeeper Client configuration
* @param x509Util The X509 utility
* @param sslConf The truststore and keystore configs
*/
private static void setSslConfiguration(ZKClientConfig zkClientConfig,
ClientX509Util x509Util, SslConf sslConf)
throws ConfigurationException {
validateSslConfiguration(sslConf);
LOG.info("Configuring the ZooKeeper client to use SSL/TLS encryption for connecting to the "
+ "ZooKeeper server.");
LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
sslConf.keystoreLocation,
Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH);
LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
sslConf.truststoreLocation,
Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH);

zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
CLIENT_CNXN);
zkClientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(),
sslConf.keystoreLocation);
zkClientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(),
sslConf.keystorePassword);
zkClientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(),
sslConf.truststoreLocation);
zkClientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(),
sslConf.truststorePassword);
zkClientConfig.setProperty(x509Util.getSslHostnameVerificationEnabledProperty(),
sslConf.hostnameVerification.toString());
}

private static void validateSslConfiguration(SslConf sslConf) throws ConfigurationException {
if (StringUtils.isEmpty(sslConf.getKeystoreLocation())) {
throw new ConfigurationException(
"The keystore location parameter is empty for the ZooKeeper client connection.");
}
if (StringUtils.isEmpty(sslConf.getKeystorePassword())) {
throw new ConfigurationException(
"The keystore password parameter is empty for the ZooKeeper client connection.");
}
if (StringUtils.isEmpty(sslConf.getTruststoreLocation())) {
throw new ConfigurationException(
"The truststore location parameter is empty for the ZooKeeper client connection" + ".");
}
if (StringUtils.isEmpty(sslConf.getTruststorePassword())) {
throw new ConfigurationException(
"The truststore password parameter is empty for the ZooKeeper client connection" + ".");
}
}

public static SslConf getSslConf(Map<String, Object> conf) {
return new SslConf(conf);
}
/**
* Helper class to contain the Truststore/Keystore paths for the ZK client connection over
* SSL/TLS.
*/

static final class SslConf {
private final String keystoreLocation;
private final String keystorePassword;
private final String truststoreLocation;
private final String truststorePassword;
private final Boolean hostnameVerification;

/**
* Configuration for the ZooKeeper connection when SSL/TLS is enabled.
*
* @param conf configuration map
*/
private SslConf(Map<String, Object> conf) {
keystoreLocation = ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH), "");
keystorePassword = ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD), "");
truststoreLocation = ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH), "");
truststorePassword = ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD), "");
hostnameVerification = ObjectReader.getBoolean(conf.get(Config.STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION), true);
}

public String getKeystoreLocation() {
return keystoreLocation;
}

public String getKeystorePassword() {
return keystorePassword;
}

public String getTruststoreLocation() {
return truststoreLocation;
}

public String getTruststorePassword() {
return truststorePassword;
}

public Boolean getHostnameVerification() {
return hostnameVerification;
}
}

public static void testSetupBuilder(CuratorFrameworkFactory.Builder
Expand Down
Loading