From b1a7ad3f31e9ee75b3392cd5aeb77f7d6c404f04 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 15 Sep 2021 11:34:27 +0100 Subject: [PATCH 01/19] Adding support for SASL auth in kafka --- kiln_lib/src/kafka.rs | 166 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 143 insertions(+), 23 deletions(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index 23365dc1..a7df8ab6 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -6,9 +6,6 @@ use rdkafka::error::KafkaError; use rdkafka::producer::future_producer::FutureProducer; use std::fmt::Display; -#[derive(Debug, Clone)] -pub struct KafkaBootstrapTlsConfig(Vec); - #[derive(Debug)] pub enum ValidationFailureReason { Missing, @@ -16,6 +13,19 @@ pub enum ValidationFailureReason { CouldNotBeParsed, } +#[derive(Debug,Clone)] +pub struct KafkaAuthConfig { + auth_required: bool, + username: Option, + password: Option +} + +#[derive(Debug, Clone)] +pub struct KafkaBootstrapConfig { + tls_config: Vec, + auth_config: KafkaAuthConfig +} + impl Display for ValidationFailureReason { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -44,7 +54,7 @@ pub enum KafkaConfigError { TlsTrustStore, } -pub fn get_bootstrap_config(vars: &mut I) -> Result +pub fn get_bootstrap_config(vars: &mut I) -> Result where I: Iterator, { @@ -110,11 +120,33 @@ where } }?; - Ok(KafkaBootstrapTlsConfig(kafka_bootstrap_tls)) + let kafka_auth_config = match local_vars.iter().find(|var| var.0 == "ENABLE_KAFKA_AUTH") { + None => KafkaAuthConfig { auth_required: false, username: None, password: None}, + Some(_) => + match local_vars.iter().find(|var| var.0 == "KAFKA_SASL_AUTH_USERNAME") { + None => + return Err(KafkaConfigError::OptionalValueValidationFailure { + var: "KAFKA_SASL_AUTH_USERNAME".into(), + reason: ValidationFailureReason::Missing, + }), + Some(u) => + match local_vars.iter().find(|var| var.0 == "KAFKA_SASL_AUTH_PASSWORD") { + None => + return Err(KafkaConfigError::OptionalValueValidationFailure { + var: "KAFKA_SASL_AUTH_PASSWORD".into(), + reason: ValidationFailureReason::Missing, + }), + Some(p) => + KafkaAuthConfig { auth_required: true, username: Some(u.1.to_owned()), password: Some(p.1.to_owned())} + } + } + }; + + Ok(KafkaBootstrapConfig { tls_config: kafka_bootstrap_tls, auth_config: kafka_auth_config}) } pub fn build_kafka_producer( - config: KafkaBootstrapTlsConfig, + config: KafkaBootstrapConfig, ) -> Result { let cert_probe_result = openssl_probe::probe(); let cert_location = match cert_probe_result { @@ -122,20 +154,38 @@ pub fn build_kafka_producer( ProbeResult { cert_dir, .. } if cert_dir.is_some() => Ok(cert_dir), _ => Err(KafkaConfigError::TlsTrustStore), }?; - - ClientConfig::new() - .set("metadata.broker.list", &config.0.join(",")) - .set("compression.type", "gzip") - .set("security.protocol", "SSL") - .set("ssl.cipher.suites", "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256") - .set("ssl.ca.location", cert_location.unwrap().to_string_lossy()) - .set("message.max.bytes", "10000000") - .create() - .map_err(|err| err.into()) + if config.auth_config.auth_required { + println!("Creating auth config for producer"); + ClientConfig::new() + .set("metadata.broker.list", &config.tls_config.join(",")) + .set("compression.type", "gzip") + .set("ssl.cipher.suites", "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256") + .set("ssl.ca.location", cert_location.unwrap().to_string_lossy()) + .set("message.max.bytes", "10000000") + .set("sasl.mechanism", "PLAIN") + .set("security.protocol","SASL_SSL") + .set("sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule", format!("required \ + username=\"{}\" \ + password=\"{}\"", config.auth_config.username.unwrap(), config.auth_config.password.unwrap()) + ) + .create() + .map_err(|err| err.into()) + } else { + println!("Creating non-auth config for producer"); + ClientConfig::new() + .set("metadata.broker.list", &config.tls_config.join(",")) + .set("compression.type", "gzip") + .set("security.protocol", "SSL") + .set("ssl.cipher.suites", "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256") + .set("ssl.ca.location", cert_location.unwrap().to_string_lossy()) + .set("message.max.bytes", "10000000") + .create() + .map_err(|err| err.into()) + } } pub fn build_kafka_consumer( - config: KafkaBootstrapTlsConfig, + config: KafkaBootstrapConfig, consumer_group_name: String, ) -> Result { let cert_probe_result = openssl_probe::probe(); @@ -145,8 +195,26 @@ pub fn build_kafka_consumer( _ => Err(KafkaConfigError::TlsTrustStore), }?; - ClientConfig::new() - .set("metadata.broker.list", &config.0.join(",")) + if config.auth_config.auth_required { + println!("Creating auth config for producer"); + ClientConfig::new() + .set("metadata.broker.list", &config.tls_config.join(",")) + .set("compression.type", "gzip") + .set("ssl.cipher.suites", "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256") + .set("ssl.ca.location", cert_location.unwrap().to_string_lossy()) + .set("fetch.message.max.bytes", "10000000") + .set("sasl.mechanism", "PLAIN") + .set("security.protocol","SASL_SSL") + .set("sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule", format!("required \ + username=\"{}\" \ + password=\"{}\"", config.auth_config.username.unwrap(), config.auth_config.password.unwrap()) + ) + .create() + .map_err(|err| err.into()) + } else { + println!("Creating non-auth config for producer"); + ClientConfig::new() + .set("metadata.broker.list", &config.tls_config.join(",")) .set("group.id", &consumer_group_name) .set("compression.type", "gzip") .set("security.protocol", "SSL") @@ -155,6 +223,7 @@ pub fn build_kafka_consumer( .set("fetch.message.max.bytes", "10000000") .create() .map_err(|err| err.into()) + } } #[cfg(test)] @@ -165,7 +234,8 @@ mod tests { #[tokio::test] async fn creating_kafka_producer_does_not_return_a_client_config_error() { let config = - KafkaBootstrapTlsConfig(vec!["host1:1234".to_string(), "host2:1234".to_string()]); + KafkaBootstrapConfig{ tls_config: vec!["host1:1234".to_string(), "host2:1234".to_string()], + auth_config: KafkaAuthConfig { auth_required: false, username: None, password: None}}; build_kafka_producer(config).unwrap(); } @@ -173,7 +243,8 @@ mod tests { #[tokio::test] async fn creating_kafka_consumer_does_not_return_a_client_config_error() { let config = - KafkaBootstrapTlsConfig(vec!["host1:1234".to_string(), "host2:1234".to_string()]); + KafkaBootstrapConfig{ tls_config: vec!["host1:1234".to_string(), "host2:1234".to_string()], + auth_config: KafkaAuthConfig { auth_required: false, username: None, password: None}}; build_kafka_consumer(config, "TestConsumerGroup".to_string()).unwrap(); } @@ -190,7 +261,7 @@ mod tests { let actual = get_bootstrap_config(&mut fake_vars).expect("expected Ok(_) value"); - assert_eq!(actual.0, expected); + assert_eq!(actual.tls_config, expected); } #[test] @@ -247,7 +318,7 @@ mod tests { let actual = get_bootstrap_config(&mut fake_vars).expect("expected Ok(_) value"); - assert_eq!(actual.0, expected) + assert_eq!(actual.tls_config, expected) } #[test] @@ -290,4 +361,53 @@ mod tests { "Optional environment variable DISABLE_KAFKA_DOMAIN_VALIDATION failed validation because value could not be parsed" ) } + + + #[test] + fn get_bootstrap_config_returns_error_auth_enabled_but_username_unset() { + let hostname = "my.kafka.host.example.com:1234".to_owned(); + let mut fake_vars = vec![("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), + ("ENABLE_KAFKA_AUTH".to_owned(),"true".to_owned())].into_iter(); + + let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); + + assert_eq!( + actual.to_string(), + "Optional environment variable KAFKA_SASL_AUTH_USERNAME failed validation because value is missing" + ) + } + + + #[test] + fn get_bootstrap_config_returns_error_auth_enabled_but_password_unset() { + let hostname = "my.kafka.host.example.com:1234".to_owned(); + let mut fake_vars = vec![("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), + ("ENABLE_KAFKA_AUTH".to_owned(),"true".to_owned()), + ("KAFKA_SASL_AUTH_USERNAME".to_owned(),"admin".to_owned())].into_iter(); + + let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); + + assert_eq!( + actual.to_string(), + "Optional environment variable KAFKA_SASL_AUTH_PASSWORD failed validation because value is missing" + ) + } + + #[test] + fn get_bootstrap_config_returns_correct_auth_config() { + let hostname = "my.kafka.host.example.com:1234".to_owned(); + let mut fake_vars = vec![("KAFKA_BOOTSTRAP_TLSa.to_owned(), hostname.clone()), + ("ENABLE_KAFKA_AUTH".to_owned(),"true".to_owned()), + ("KAFKA_SASL_AUTH_USERNAME".to_owned(),"admin".to_owned()), + ("KAFKA_SASL_AUTH_PASSWORD".to_owned(),"admin\-password".to_owned()), + ].into_iter(); + + let actual = get_bootstrap_config(&mut fake_vars).expect("No errors should be returned when values are set correctly"); + + assert_eq!(actual.auth_config.auth_required,true); + assert_eq!(actual.auth_config.username.unwrap(),"admin".to_string()); + assert_eq!(actual.auth_config.password.unwrap(),"admin\-password".to_string()); + } + + } From a1d560f8ee00a87abd4a7d534ede36e5038cbf36 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 15 Sep 2021 11:43:21 +0100 Subject: [PATCH 02/19] Fix typo in kafka.rs --- kiln_lib/src/kafka.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index a7df8ab6..a206984b 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -396,17 +396,17 @@ mod tests { #[test] fn get_bootstrap_config_returns_correct_auth_config() { let hostname = "my.kafka.host.example.com:1234".to_owned(); - let mut fake_vars = vec![("KAFKA_BOOTSTRAP_TLSa.to_owned(), hostname.clone()), + let mut fake_vars = vec![("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), ("ENABLE_KAFKA_AUTH".to_owned(),"true".to_owned()), ("KAFKA_SASL_AUTH_USERNAME".to_owned(),"admin".to_owned()), - ("KAFKA_SASL_AUTH_PASSWORD".to_owned(),"admin\-password".to_owned()), + ("KAFKA_SASL_AUTH_PASSWORD".to_owned(),"adminpassword".to_owned()), ].into_iter(); let actual = get_bootstrap_config(&mut fake_vars).expect("No errors should be returned when values are set correctly"); assert_eq!(actual.auth_config.auth_required,true); assert_eq!(actual.auth_config.username.unwrap(),"admin".to_string()); - assert_eq!(actual.auth_config.password.unwrap(),"admin\-password".to_string()); + assert_eq!(actual.auth_config.password.unwrap(),"adminpassword".to_string()); } From 164a47d1c1b2183c81c1a4d8bb4272c6f175b686 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 15 Sep 2021 15:54:58 +0100 Subject: [PATCH 03/19] Make auth_required field public for testing --- kiln_lib/src/kafka.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index a206984b..d176c21e 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -15,7 +15,7 @@ pub enum ValidationFailureReason { #[derive(Debug,Clone)] pub struct KafkaAuthConfig { - auth_required: bool, + pub auth_required: bool, username: Option, password: Option } From 72e098c1bf2502dace985a2965706d4f9e853938 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 15 Sep 2021 15:58:48 +0100 Subject: [PATCH 04/19] Make auth_config field public for testing --- kiln_lib/src/kafka.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index d176c21e..abf4b0b2 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -23,7 +23,7 @@ pub struct KafkaAuthConfig { #[derive(Debug, Clone)] pub struct KafkaBootstrapConfig { tls_config: Vec, - auth_config: KafkaAuthConfig +pub auth_config: KafkaAuthConfig } impl Display for ValidationFailureReason { From ca7e56fdf03c93bd89a94a16f9822b5d86e8f2ee Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Fri, 17 Sep 2021 10:30:28 +0100 Subject: [PATCH 05/19] librdkafka doesn't accept sasl.jaas.config; replace with correct config params --- kiln_lib/src/kafka.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index abf4b0b2..d97d3b96 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -162,12 +162,10 @@ pub fn build_kafka_producer( .set("ssl.cipher.suites", "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256") .set("ssl.ca.location", cert_location.unwrap().to_string_lossy()) .set("message.max.bytes", "10000000") - .set("sasl.mechanism", "PLAIN") .set("security.protocol","SASL_SSL") - .set("sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule", format!("required \ - username=\"{}\" \ - password=\"{}\"", config.auth_config.username.unwrap(), config.auth_config.password.unwrap()) - ) + .set("sasl.mechanism", "PLAIN") + .set("sasl.username", config.auth_config.username.unwrap()) + .set("sasl.password", config.auth_config.password.unwrap()) .create() .map_err(|err| err.into()) } else { @@ -205,10 +203,8 @@ pub fn build_kafka_consumer( .set("fetch.message.max.bytes", "10000000") .set("sasl.mechanism", "PLAIN") .set("security.protocol","SASL_SSL") - .set("sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule", format!("required \ - username=\"{}\" \ - password=\"{}\"", config.auth_config.username.unwrap(), config.auth_config.password.unwrap()) - ) + .set("sasl.username", config.auth_config.username.unwrap()) + .set("sasl.password", config.auth_config.password.unwrap()) .create() .map_err(|err| err.into()) } else { From 0e21af89395d6b88c2ef2e32f2d5e2a8c0cb6108 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Mon, 20 Sep 2021 16:33:57 +0100 Subject: [PATCH 06/19] Add missing group ID to consumer config --- kiln_lib/src/kafka.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index d97d3b96..2002d879 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -198,6 +198,7 @@ pub fn build_kafka_consumer( ClientConfig::new() .set("metadata.broker.list", &config.tls_config.join(",")) .set("compression.type", "gzip") + .set("group.id", &consumer_group_name) .set("ssl.cipher.suites", "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256") .set("ssl.ca.location", cert_location.unwrap().to_string_lossy()) .set("fetch.message.max.bytes", "10000000") From 8841dd89256d75f37527c558c6274129680ba8f7 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 22 Sep 2021 13:42:41 +0100 Subject: [PATCH 07/19] Added docker compose file for testing kafka auth --- docker-compose-auth.yml | 90 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 docker-compose-auth.yml diff --git a/docker-compose-auth.yml b/docker-compose-auth.yml new file mode 100644 index 00000000..f91edef1 --- /dev/null +++ b/docker-compose-auth.yml @@ -0,0 +1,90 @@ +version: '2' +services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + environment: + SERVER_JVMFLAGS: -Djava.security.auth.login.config=/opt/zookeeper_jaas.conf + -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider + -Dzookeeper.authProvider.2=org.apache.zookeeper.server.auth.DigestAuthenticationProvider + -DjaasLoginRenew=3600000 + -DrequireClientAuthScheme=sasl + volumes: + - ./zookeeper_jaas.conf:/opt/zookeeper_jaas.conf + + kafka: + image: wurstmeister/kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + hostname: kafka + environment: + KAFKA_ADVERTISED_HOSTNAME: kafka + KAFKA_LISTENERS: "SASL_SSL://kafka:9092" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" + KAFKA_SSL_TLS_VERSION: TLSv1.2 + KAFKA_SSL_PROTOCOL: TLSv1.2 + KAFKA_SSL_ENABLED_PROTOCOLS: TLSv1.2 + KAFKA_SSL_SECURE_RANDOM_IMPLEMENTATION: NativePRNG + KAFKA_SSL_CIPHER_SUITES: TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 + KAFKA_SSL_KEYSTORE_LOCATION: /tls/kafka.keystore.jks + KAFKA_SSL_KEYSTORE_PASSWORD: password + KAFKA_SSL_KEY_PASSWORD: password + KAFKA_SSL_TRUSTSTORE_LOCATION: /tls/kafka.truststore.jks + KAFKA_SSL_TRUSTSTORE_PASSWORD: password + KAFKA_CREATE_TOPICS: "ToolReports:6:1,DependencyEvents:6:1" + KAFKA_MESSAGE_MAX_BYTES: 10000000 + KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 + KAFKA_SECURITY_PROTOCOL: SASL_SSL + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN + KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/broker-jaas.conf" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./tls:/tls + - ./broker-jaas.conf:/etc/kafka/broker-jaas.conf + data-collector: + depends_on: + - kafka + image: kiln/data-collector:git-latest + ports: + - "8081:8080" + environment: + - KAFKA_BOOTSTRAP_TLS=kafka:9092 + - ENABLE_KAFKA_AUTH=true + - KAFKA_SASL_AUTH_USERNAME=admin + - KAFKA_SASL_AUTH_PASSWORD=admin-secret + - DISABLE_KAFKA_DOMAIN_VALIDATION=true + volumes: + - ./tls:/tls + report-parser: + depends_on: + - kafka + image: kiln/report-parser:git-latest + environment: + - KAFKA_BOOTSTRAP_TLS=kafka:9092 + - ENABLE_KAFKA_AUTH=true + - KAFKA_SASL_AUTH_USERNAME=admin + - KAFKA_SASL_AUTH_PASSWORD=admin-secret + - DISABLE_KAFKA_DOMAIN_VALIDATION=true + volumes: + - ./tls:/tls + slack-connector: + depends_on: + - kafka + image: kiln/slack-connector:git-latest + environment: + - KAFKA_BOOTSTRAP_TLS=kafka:9092 + - RUST_LOG=info + - DISABLE_KAFKA_DOMAIN_VALIDATION=true + - ENABLE_KAFKA_AUTH=true + - KAFKA_SASL_AUTH_USERNAME=admin + - KAFKA_SASL_AUTH_PASSWORD=admin + - OAUTH2_TOKEN + - SLACK_CHANNEL_ID + volumes: + - ./tls:/tls From 99aa6e48a391b57a81e91a53edc028d5f9ef6428 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 22 Sep 2021 14:13:36 +0100 Subject: [PATCH 08/19] Remove print statements and fix formatting --- kiln_lib/src/kafka.rs | 145 ++++++++++++++++++++++++++---------------- 1 file changed, 89 insertions(+), 56 deletions(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index 2002d879..ed5006ab 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -13,17 +13,17 @@ pub enum ValidationFailureReason { CouldNotBeParsed, } -#[derive(Debug,Clone)] -pub struct KafkaAuthConfig { +#[derive(Debug, Clone)] +pub struct KafkaAuthConfig { pub auth_required: bool, username: Option, - password: Option -} + password: Option, +} #[derive(Debug, Clone)] -pub struct KafkaBootstrapConfig { +pub struct KafkaBootstrapConfig { tls_config: Vec, -pub auth_config: KafkaAuthConfig + pub auth_config: KafkaAuthConfig, } impl Display for ValidationFailureReason { @@ -121,28 +121,44 @@ where }?; let kafka_auth_config = match local_vars.iter().find(|var| var.0 == "ENABLE_KAFKA_AUTH") { - None => KafkaAuthConfig { auth_required: false, username: None, password: None}, - Some(_) => - match local_vars.iter().find(|var| var.0 == "KAFKA_SASL_AUTH_USERNAME") { - None => - return Err(KafkaConfigError::OptionalValueValidationFailure { + None => KafkaAuthConfig { + auth_required: false, + username: None, + password: None, + }, + Some(_) => match local_vars + .iter() + .find(|var| var.0 == "KAFKA_SASL_AUTH_USERNAME") + { + None => { + return Err(KafkaConfigError::OptionalValueValidationFailure { var: "KAFKA_SASL_AUTH_USERNAME".into(), reason: ValidationFailureReason::Missing, - }), - Some(u) => - match local_vars.iter().find(|var| var.0 == "KAFKA_SASL_AUTH_PASSWORD") { - None => - return Err(KafkaConfigError::OptionalValueValidationFailure { - var: "KAFKA_SASL_AUTH_PASSWORD".into(), - reason: ValidationFailureReason::Missing, - }), - Some(p) => - KafkaAuthConfig { auth_required: true, username: Some(u.1.to_owned()), password: Some(p.1.to_owned())} - } + }) } + Some(u) => match local_vars + .iter() + .find(|var| var.0 == "KAFKA_SASL_AUTH_PASSWORD") + { + None => { + return Err(KafkaConfigError::OptionalValueValidationFailure { + var: "KAFKA_SASL_AUTH_PASSWORD".into(), + reason: ValidationFailureReason::Missing, + }) + } + Some(p) => KafkaAuthConfig { + auth_required: true, + username: Some(u.1.to_owned()), + password: Some(p.1.to_owned()), + }, + }, + }, }; - - Ok(KafkaBootstrapConfig { tls_config: kafka_bootstrap_tls, auth_config: kafka_auth_config}) + + Ok(KafkaBootstrapConfig { + tls_config: kafka_bootstrap_tls, + auth_config: kafka_auth_config, + }) } pub fn build_kafka_producer( @@ -155,7 +171,6 @@ pub fn build_kafka_producer( _ => Err(KafkaConfigError::TlsTrustStore), }?; if config.auth_config.auth_required { - println!("Creating auth config for producer"); ClientConfig::new() .set("metadata.broker.list", &config.tls_config.join(",")) .set("compression.type", "gzip") @@ -168,8 +183,7 @@ pub fn build_kafka_producer( .set("sasl.password", config.auth_config.password.unwrap()) .create() .map_err(|err| err.into()) - } else { - println!("Creating non-auth config for producer"); + } else { ClientConfig::new() .set("metadata.broker.list", &config.tls_config.join(",")) .set("compression.type", "gzip") @@ -179,7 +193,7 @@ pub fn build_kafka_producer( .set("message.max.bytes", "10000000") .create() .map_err(|err| err.into()) - } + } } pub fn build_kafka_consumer( @@ -193,8 +207,7 @@ pub fn build_kafka_consumer( _ => Err(KafkaConfigError::TlsTrustStore), }?; - if config.auth_config.auth_required { - println!("Creating auth config for producer"); + if config.auth_config.auth_required { ClientConfig::new() .set("metadata.broker.list", &config.tls_config.join(",")) .set("compression.type", "gzip") @@ -208,8 +221,7 @@ pub fn build_kafka_consumer( .set("sasl.password", config.auth_config.password.unwrap()) .create() .map_err(|err| err.into()) - } else { - println!("Creating non-auth config for producer"); + } else { ClientConfig::new() .set("metadata.broker.list", &config.tls_config.join(",")) .set("group.id", &consumer_group_name) @@ -230,18 +242,28 @@ mod tests { #[allow(unused_must_use)] #[tokio::test] async fn creating_kafka_producer_does_not_return_a_client_config_error() { - let config = - KafkaBootstrapConfig{ tls_config: vec!["host1:1234".to_string(), "host2:1234".to_string()], - auth_config: KafkaAuthConfig { auth_required: false, username: None, password: None}}; + let config = KafkaBootstrapConfig { + tls_config: vec!["host1:1234".to_string(), "host2:1234".to_string()], + auth_config: KafkaAuthConfig { + auth_required: false, + username: None, + password: None, + }, + }; build_kafka_producer(config).unwrap(); } #[allow(unused_must_use)] #[tokio::test] async fn creating_kafka_consumer_does_not_return_a_client_config_error() { - let config = - KafkaBootstrapConfig{ tls_config: vec!["host1:1234".to_string(), "host2:1234".to_string()], - auth_config: KafkaAuthConfig { auth_required: false, username: None, password: None}}; + let config = KafkaBootstrapConfig { + tls_config: vec!["host1:1234".to_string(), "host2:1234".to_string()], + auth_config: KafkaAuthConfig { + auth_required: false, + username: None, + password: None, + }, + }; build_kafka_consumer(config, "TestConsumerGroup".to_string()).unwrap(); } @@ -359,12 +381,14 @@ mod tests { ) } - #[test] fn get_bootstrap_config_returns_error_auth_enabled_but_username_unset() { let hostname = "my.kafka.host.example.com:1234".to_owned(); - let mut fake_vars = vec![("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), - ("ENABLE_KAFKA_AUTH".to_owned(),"true".to_owned())].into_iter(); + let mut fake_vars = vec![ + ("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), + ("ENABLE_KAFKA_AUTH".to_owned(), "true".to_owned()), + ] + .into_iter(); let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); @@ -374,13 +398,15 @@ mod tests { ) } - #[test] fn get_bootstrap_config_returns_error_auth_enabled_but_password_unset() { let hostname = "my.kafka.host.example.com:1234".to_owned(); - let mut fake_vars = vec![("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), - ("ENABLE_KAFKA_AUTH".to_owned(),"true".to_owned()), - ("KAFKA_SASL_AUTH_USERNAME".to_owned(),"admin".to_owned())].into_iter(); + let mut fake_vars = vec![ + ("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), + ("ENABLE_KAFKA_AUTH".to_owned(), "true".to_owned()), + ("KAFKA_SASL_AUTH_USERNAME".to_owned(), "admin".to_owned()), + ] + .into_iter(); let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); @@ -393,18 +419,25 @@ mod tests { #[test] fn get_bootstrap_config_returns_correct_auth_config() { let hostname = "my.kafka.host.example.com:1234".to_owned(); - let mut fake_vars = vec![("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), - ("ENABLE_KAFKA_AUTH".to_owned(),"true".to_owned()), - ("KAFKA_SASL_AUTH_USERNAME".to_owned(),"admin".to_owned()), - ("KAFKA_SASL_AUTH_PASSWORD".to_owned(),"adminpassword".to_owned()), - ].into_iter(); + let mut fake_vars = vec![ + ("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), + ("ENABLE_KAFKA_AUTH".to_owned(), "true".to_owned()), + ("KAFKA_SASL_AUTH_USERNAME".to_owned(), "admin".to_owned()), + ( + "KAFKA_SASL_AUTH_PASSWORD".to_owned(), + "adminpassword".to_owned(), + ), + ] + .into_iter(); - let actual = get_bootstrap_config(&mut fake_vars).expect("No errors should be returned when values are set correctly"); + let actual = get_bootstrap_config(&mut fake_vars) + .expect("No errors should be returned when values are set correctly"); - assert_eq!(actual.auth_config.auth_required,true); - assert_eq!(actual.auth_config.username.unwrap(),"admin".to_string()); - assert_eq!(actual.auth_config.password.unwrap(),"adminpassword".to_string()); + assert_eq!(actual.auth_config.auth_required, true); + assert_eq!(actual.auth_config.username.unwrap(), "admin".to_string()); + assert_eq!( + actual.auth_config.password.unwrap(), + "adminpassword".to_string() + ); } - - } From 2e0a003f9b08bc80a337167897c32a58580860c0 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 22 Sep 2021 14:50:13 +0100 Subject: [PATCH 09/19] Temporarily downgrade version to git2 to get the CI/CD build to pass --- data-forwarder/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-forwarder/Cargo.toml b/data-forwarder/Cargo.toml index a3d65865..c56b104e 100644 --- a/data-forwarder/Cargo.toml +++ b/data-forwarder/Cargo.toml @@ -8,7 +8,7 @@ kiln_lib = { git = "https://github.com/simplybusiness/Kiln", features = [ "json" clap = "2" chrono = "0.4" reqwest = { version = "0.11", features = [ "blocking", "json",] } -git2 = "0.13" +git2 = "0.13.17" uuid = { version = "0.8", features = [ "v4",] } openssl-probe = "0.1.4" toml = "0.5" From c545e61e308f240009f84e1ad911197fafdb5915 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 22 Sep 2021 15:00:13 +0100 Subject: [PATCH 10/19] Change git2 version --- data-forwarder/Cargo.lock | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-forwarder/Cargo.lock b/data-forwarder/Cargo.lock index 8c34d01d..5afb9a63 100644 --- a/data-forwarder/Cargo.lock +++ b/data-forwarder/Cargo.lock @@ -491,11 +491,12 @@ dependencies = [ [[package]] name = "kiln_lib" version = "0.5.0" -source = "git+https://github.com/simplybusiness/Kiln?branch=main#a91159e65c69eb9f4a9b59c4f4351d36c1fbedef" +source = "git+https://github.com/simplybusiness/Kiln?branch=main#884e96059622c72e99254ac737bee25aee964adf" dependencies = [ "anyhow", "chrono", "hex", + "mime", "openssl-probe", "regex", "ring", From bd2d96dd2b89f808e0540f9eb4fc8f9fde46e306 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 22 Sep 2021 15:32:45 +0100 Subject: [PATCH 11/19] Downgrade git2 to 0.12 --- data-forwarder/Cargo.lock | 8 ++++---- data-forwarder/Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/data-forwarder/Cargo.lock b/data-forwarder/Cargo.lock index 5afb9a63..a320c1aa 100644 --- a/data-forwarder/Cargo.lock +++ b/data-forwarder/Cargo.lock @@ -310,9 +310,9 @@ checksum = "f6503fe142514ca4799d4c26297c4248239fe8838d827db6bd6065c6ed29a6ce" [[package]] name = "git2" -version = "0.13.19" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17929de7239dea9f68aa14f94b2ab4974e7b24c1314275ffcc12a7758172fa18" +checksum = "26e07ef27260a78f7e8d218ebac2c72f2c4db50493741b190b6e8eade1da7c68" dependencies = [ "bitflags", "libc", @@ -522,9 +522,9 @@ checksum = "ba4aede83fc3617411dc6993bc8c70919750c1c257c6ca6a502aed6e0e2394ae" [[package]] name = "libgit2-sys" -version = "0.12.20+1.1.0" +version = "0.11.0+0.99.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e2f09917e00b9ad194ae72072bb5ada2cca16d8171a43e91ddba2afbb02664b" +checksum = "4d5d1459353d397a029fb18862166338de938e6be976606bd056cf8f1a912ecf" dependencies = [ "cc", "libc", diff --git a/data-forwarder/Cargo.toml b/data-forwarder/Cargo.toml index c56b104e..5c31bedc 100644 --- a/data-forwarder/Cargo.toml +++ b/data-forwarder/Cargo.toml @@ -8,7 +8,7 @@ kiln_lib = { git = "https://github.com/simplybusiness/Kiln", features = [ "json" clap = "2" chrono = "0.4" reqwest = { version = "0.11", features = [ "blocking", "json",] } -git2 = "0.13.17" +git2 = "0.12" uuid = { version = "0.8", features = [ "v4",] } openssl-probe = "0.1.4" toml = "0.5" From 101f31e1ed0849ed738e484c6301ed915792c1f0 Mon Sep 17 00:00:00 2001 From: DM-sb <55985798+DM-sb@users.noreply.github.com> Date: Wed, 22 Sep 2021 16:06:41 +0100 Subject: [PATCH 12/19] Update the data-collector docs to mention authentication --- data-collector/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data-collector/README.md b/data-collector/README.md index 90c1906c..ba01c272 100644 --- a/data-collector/README.md +++ b/data-collector/README.md @@ -17,6 +17,8 @@ By default, this component will validate that hosts in the `KAFKA_BOOTSTRAP_TLS` If your Kafka cluster uses TLS certificates issued by a private Certificate Authority, you will need to provide the CA Certificate in PEM format so that certificate validation can be performed when connecting to the Kafka cluster. You should do this by including the CA certificate in PEM format in the `/tls` directory of the container, probably through a volume mount. +At present Kiln supports authentication between brokers and producers/consumers using the SASL_PLAIN mechanism. Authentication is optional and configured by setting the `ENABLE_KAFKA_AUTH` environment variable. If this variable is set, you also need to supply the username and password for authentication using `AFKA_SASL_AUTH_USERNAME` and `AFKA_SASL_AUTH_PASSWORD` environment variables respectively. + ## Request & Response Documentation You shouldn't generally need to make manual requests to the data-collector, instead prefer to use the ToolReport struct from kiln_lib and serialise that to JSON before sending to the data-collector. If you do need to make a manual request to the data-collector, see [docs/request-response.md](docs/request-response.md). From 7e0381aef609676ec1d804744f32c080672688c6 Mon Sep 17 00:00:00 2001 From: DM-sb <55985798+DM-sb@users.noreply.github.com> Date: Wed, 22 Sep 2021 16:07:18 +0100 Subject: [PATCH 13/19] fix typo --- data-collector/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-collector/README.md b/data-collector/README.md index ba01c272..33ea8bb5 100644 --- a/data-collector/README.md +++ b/data-collector/README.md @@ -17,7 +17,7 @@ By default, this component will validate that hosts in the `KAFKA_BOOTSTRAP_TLS` If your Kafka cluster uses TLS certificates issued by a private Certificate Authority, you will need to provide the CA Certificate in PEM format so that certificate validation can be performed when connecting to the Kafka cluster. You should do this by including the CA certificate in PEM format in the `/tls` directory of the container, probably through a volume mount. -At present Kiln supports authentication between brokers and producers/consumers using the SASL_PLAIN mechanism. Authentication is optional and configured by setting the `ENABLE_KAFKA_AUTH` environment variable. If this variable is set, you also need to supply the username and password for authentication using `AFKA_SASL_AUTH_USERNAME` and `AFKA_SASL_AUTH_PASSWORD` environment variables respectively. +At present Kiln supports authentication between brokers and producers/consumers using the SASL_PLAIN mechanism. Authentication is optional and configured by setting the `ENABLE_KAFKA_AUTH` environment variable. If this variable is set, you also need to supply the username and password for authentication using `KAFKA_SASL_AUTH_USERNAME` and `KAFKA_SASL_AUTH_PASSWORD` environment variables respectively. ## Request & Response Documentation From 68107146cc55a61e86e1cda0e24b1c9d7c748375 Mon Sep 17 00:00:00 2001 From: DM-sb <55985798+DM-sb@users.noreply.github.com> Date: Wed, 22 Sep 2021 16:08:17 +0100 Subject: [PATCH 14/19] Update the report-parser docs to mention authentication --- report-parser/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/report-parser/README.md b/report-parser/README.md index 074eb335..6631a73a 100644 --- a/report-parser/README.md +++ b/report-parser/README.md @@ -17,4 +17,6 @@ By default, this component will validate that hosts in the `KAFKA_BOOTSTRAP_TLS` If your Kafka cluster uses TLS certificates issued by a private Certificate Authority, you will need to provide the CA Certificate in PEM format so that certificate validation can be performed when connecting to the Kafka cluster. You should do this by including the CA certificate in PEM format in the `/tls` directory of the container, probably through a volume mount. +At present Kiln supports authentication between brokers and producers/consumers using the SASL_PLAIN mechanism. Authentication is optional and configured by setting the `ENABLE_KAFKA_AUTH` environment variable. If this variable is set, you also need to supply the username and password for authentication using `KAFKA_SASL_AUTH_USERNAME` and `KAFKA_SASL_AUTH_PASSWORD` environment variables respectively. + If you want to provide an alternative URL for downloading NIST NVD data, this can be configured by starting the report-paser with the `NVD_BASE_URL` environment variable set to the URL of your NVD mirror. From f17c99e9378cc4be9610489212bdd1dc4eda8a24 Mon Sep 17 00:00:00 2001 From: DM-sb <55985798+DM-sb@users.noreply.github.com> Date: Wed, 22 Sep 2021 16:08:52 +0100 Subject: [PATCH 15/19] Update the slack connector docs to mention authentication --- slack-connector/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/slack-connector/README.md b/slack-connector/README.md index 5b8dcbe2..904927e2 100644 --- a/slack-connector/README.md +++ b/slack-connector/README.md @@ -22,3 +22,6 @@ If your Kafka cluster uses TLS certificates issued by a private Certificate Auth You will also need the Channel ID for the Slack Channel you want to route notifications to. This can be found by opening Slack in a web browser and loading the channel you want Kiln to send notifications to. The last components of the URL path will contain the channel ID and will begin with a 'C'. This is supplied to the connector using the `SLACK_CHANNEL_ID` environment variable. Lastly, you will need to supply the OAuth2 access token you created earlier as the `OAUTH2_TOKEN` environment variable. This value is a secret and should be handled accordingly to avoid accidental disclosure in shell history, logs etc. Unfortunately the topic of secrets management is out of the scope of this documentation. + +At present Kiln supports authentication between brokers and producers/consumers using the SASL_PLAIN mechanism. Authentication is optional and configured by setting the `ENABLE_KAFKA_AUTH` environment variable. If this variable is set, you also need to supply the username and password for authentication using `KAFKA_SASL_AUTH_USERNAME` and `KAFKA_SASL_AUTH_PASSWORD` environment variables respectively. + From ecaf3d6af523fc11cf80f6350371d1cf306d63c1 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Wed, 22 Sep 2021 16:12:18 +0100 Subject: [PATCH 16/19] Added the JAAS config file for broker and zookeeper to enable testing auth --- broker-jaas.conf | 13 +++++++++++++ zookeeper_jaas.conf | 6 ++++++ 2 files changed, 19 insertions(+) create mode 100644 broker-jaas.conf create mode 100644 zookeeper_jaas.conf diff --git a/broker-jaas.conf b/broker-jaas.conf new file mode 100644 index 00000000..dd800453 --- /dev/null +++ b/broker-jaas.conf @@ -0,0 +1,13 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret"; + }; + +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret"; + }; diff --git a/zookeeper_jaas.conf b/zookeeper_jaas.conf new file mode 100644 index 00000000..2649160f --- /dev/null +++ b/zookeeper_jaas.conf @@ -0,0 +1,6 @@ +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + user_admin="admin-secret"; +}; + + From f9d6a6b4a053dfd9c832c86569b13d9be933e292 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Thu, 23 Sep 2021 09:59:45 +0100 Subject: [PATCH 17/19] Remove pub fields --- kiln_lib/src/kafka.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index ed5006ab..470d00b6 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -15,7 +15,7 @@ pub enum ValidationFailureReason { #[derive(Debug, Clone)] pub struct KafkaAuthConfig { - pub auth_required: bool, + auth_required: bool, username: Option, password: Option, } @@ -23,7 +23,7 @@ pub struct KafkaAuthConfig { #[derive(Debug, Clone)] pub struct KafkaBootstrapConfig { tls_config: Vec, - pub auth_config: KafkaAuthConfig, + auth_config: KafkaAuthConfig, } impl Display for ValidationFailureReason { From ed25407ca0a81014a694d79a12cb61e41cfce824 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Thu, 23 Sep 2021 13:06:37 +0100 Subject: [PATCH 18/19] Refactored repeated code into closure and added additional tests --- kiln_lib/src/kafka.rs | 165 ++++++++++++++++++++++++++---------------- 1 file changed, 104 insertions(+), 61 deletions(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index 470d00b6..b37021c2 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -62,26 +62,26 @@ where let disable_kafka_domain_validation = match local_vars .iter() .find(|var| var.0 == "DISABLE_KAFKA_DOMAIN_VALIDATION") - { - None => Ok(false), - Some(var) => { - if var.1.is_empty() { - return Err(KafkaConfigError::OptionalValueValidationFailure { - var: "DISABLE_KAFKA_DOMAIN_VALIDATION".into(), - reason: ValidationFailureReason::PresentButEmpty, - }); - } else { - match var.1.as_ref() { - "true" => Ok(true), - "false" => Ok(false), - _ => Err(KafkaConfigError::OptionalValueValidationFailure { + { + None => Ok(false), + Some(var) => { + if var.1.is_empty() { + return Err(KafkaConfigError::OptionalValueValidationFailure { var: "DISABLE_KAFKA_DOMAIN_VALIDATION".into(), - reason: ValidationFailureReason::CouldNotBeParsed, - }), + reason: ValidationFailureReason::PresentButEmpty, + }); + } else { + match var.1.as_ref() { + "true" => Ok(true), + "false" => Ok(false), + _ => Err(KafkaConfigError::OptionalValueValidationFailure { + var: "DISABLE_KAFKA_DOMAIN_VALIDATION".into(), + reason: ValidationFailureReason::CouldNotBeParsed, + }), + } } } - } - }?; + }?; let kafka_bootstrap_tls = match local_vars.iter().find(|var| var.0 == "KAFKA_BOOTSTRAP_TLS") { None => Err(KafkaConfigError::RequiredValueValidationFailure { @@ -120,38 +120,40 @@ where } }?; - let kafka_auth_config = match local_vars.iter().find(|var| var.0 == "ENABLE_KAFKA_AUTH") { - None => KafkaAuthConfig { - auth_required: false, - username: None, - password: None, - }, - Some(_) => match local_vars - .iter() - .find(|var| var.0 == "KAFKA_SASL_AUTH_USERNAME") - { - None => { - return Err(KafkaConfigError::OptionalValueValidationFailure { - var: "KAFKA_SASL_AUTH_USERNAME".into(), - reason: ValidationFailureReason::Missing, - }) - } - Some(u) => match local_vars - .iter() - .find(|var| var.0 == "KAFKA_SASL_AUTH_PASSWORD") - { - None => { - return Err(KafkaConfigError::OptionalValueValidationFailure { - var: "KAFKA_SASL_AUTH_PASSWORD".into(), + let check_config_var = |x: String| + match local_vars.iter().find(|var| var.0 == x) { + None => + Err(KafkaConfigError::OptionalValueValidationFailure { + var: x, reason: ValidationFailureReason::Missing, - }) - } - Some(p) => KafkaAuthConfig { - auth_required: true, - username: Some(u.1.to_owned()), - password: Some(p.1.to_owned()), - }, + }), + Some(v) => + if v.1.is_empty(){ + Err(KafkaConfigError::OptionalValueValidationFailure { + var: x, + reason: ValidationFailureReason::PresentButEmpty, + }) + } else { + Ok(Some(v.1.to_owned())) + } + }; + + + let kafka_auth_config = match local_vars.iter().find(|var| var.0 == "ENABLE_KAFKA_AUTH") { + None => + KafkaAuthConfig { + auth_required: false, + username: None, + password: None, }, + Some(_) => { + let username = check_config_var("KAFKA_SASL_AUTH_USERNAME".into())?; + let password = check_config_var("KAFKA_SASL_AUTH_PASSWORD".into())?; + KafkaAuthConfig { + auth_required: true, + username: username, + password: password, + } }, }; @@ -223,15 +225,15 @@ pub fn build_kafka_consumer( .map_err(|err| err.into()) } else { ClientConfig::new() - .set("metadata.broker.list", &config.tls_config.join(",")) - .set("group.id", &consumer_group_name) - .set("compression.type", "gzip") - .set("security.protocol", "SSL") - .set("ssl.cipher.suites", "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256") - .set("ssl.ca.location", cert_location.unwrap().to_string_lossy()) - .set("fetch.message.max.bytes", "10000000") - .create() - .map_err(|err| err.into()) + .set("metadata.broker.list", &config.tls_config.join(",")) + .set("group.id", &consumer_group_name) + .set("compression.type", "gzip") + .set("security.protocol", "SSL") + .set("ssl.cipher.suites", "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256") + .set("ssl.ca.location", cert_location.unwrap().to_string_lossy()) + .set("fetch.message.max.bytes", "10000000") + .create() + .map_err(|err| err.into()) } } @@ -332,7 +334,7 @@ mod tests { "true".to_owned(), ), ] - .into_iter(); + .into_iter(); let expected = vec![hostname.clone()]; let actual = get_bootstrap_config(&mut fake_vars).expect("expected Ok(_) value"); @@ -372,7 +374,7 @@ mod tests { "DISABLE_KAFKA_DOMAIN_VALIDATION".to_owned(), "blah".to_owned(), )] - .into_iter(); + .into_iter(); let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); assert_eq!( @@ -388,7 +390,7 @@ mod tests { ("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), ("ENABLE_KAFKA_AUTH".to_owned(), "true".to_owned()), ] - .into_iter(); + .into_iter(); let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); @@ -398,6 +400,26 @@ mod tests { ) } + + #[test] + fn get_bootstrap_config_returns_error_auth_enabled_but_username_empty() { + let hostname = "my.kafka.host.example.com:1234".to_owned(); + let mut fake_vars = vec![ + ("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), + ("ENABLE_KAFKA_AUTH".to_owned(), "true".to_owned()), + ("KAFKA_SASL_AUTH_USERNAME".to_owned(), "".to_owned()), + ] + .into_iter(); + + let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); + + assert_eq!( + actual.to_string(), + "Optional environment variable KAFKA_SASL_AUTH_USERNAME failed validation because value is present but empty" + ) + } + + #[test] fn get_bootstrap_config_returns_error_auth_enabled_but_password_unset() { let hostname = "my.kafka.host.example.com:1234".to_owned(); @@ -406,7 +428,7 @@ mod tests { ("ENABLE_KAFKA_AUTH".to_owned(), "true".to_owned()), ("KAFKA_SASL_AUTH_USERNAME".to_owned(), "admin".to_owned()), ] - .into_iter(); + .into_iter(); let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); @@ -416,6 +438,27 @@ mod tests { ) } + #[test] + fn get_bootstrap_config_returns_error_auth_enabled_but_password_empty() { + let hostname = "my.kafka.host.example.com:1234".to_owned(); + let mut fake_vars = vec![ + ("KAFKA_BOOTSTRAP_TLS".to_owned(), hostname.clone()), + ("ENABLE_KAFKA_AUTH".to_owned(), "true".to_owned()), + ("KAFKA_SASL_AUTH_USERNAME".to_owned(), "admin".to_owned()), + ("KAFKA_SASL_AUTH_PASSWORD".to_owned(), "".to_owned()), + ] + .into_iter(); + + let actual = get_bootstrap_config(&mut fake_vars).expect_err("expected Err(_) value"); + + assert_eq!( + actual.to_string(), + "Optional environment variable KAFKA_SASL_AUTH_PASSWORD failed validation because value is present but empty" + ) + } + + + #[test] fn get_bootstrap_config_returns_correct_auth_config() { let hostname = "my.kafka.host.example.com:1234".to_owned(); @@ -428,7 +471,7 @@ mod tests { "adminpassword".to_owned(), ), ] - .into_iter(); + .into_iter(); let actual = get_bootstrap_config(&mut fake_vars) .expect("No errors should be returned when values are set correctly"); From 40110fd9db8649492159ad6c823e642abe1809c2 Mon Sep 17 00:00:00 2001 From: Divya Muthukumaran Date: Thu, 23 Sep 2021 13:07:37 +0100 Subject: [PATCH 19/19] Format fixes --- kiln_lib/src/kafka.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/kiln_lib/src/kafka.rs b/kiln_lib/src/kafka.rs index b37021c2..616b017a 100644 --- a/kiln_lib/src/kafka.rs +++ b/kiln_lib/src/kafka.rs @@ -123,19 +123,19 @@ where let check_config_var = |x: String| match local_vars.iter().find(|var| var.0 == x) { None => + Err(KafkaConfigError::OptionalValueValidationFailure { + var: x, + reason: ValidationFailureReason::Missing, + }), + Some(v) => + if v.1.is_empty(){ Err(KafkaConfigError::OptionalValueValidationFailure { var: x, - reason: ValidationFailureReason::Missing, - }), - Some(v) => - if v.1.is_empty(){ - Err(KafkaConfigError::OptionalValueValidationFailure { - var: x, - reason: ValidationFailureReason::PresentButEmpty, - }) - } else { - Ok(Some(v.1.to_owned())) - } + reason: ValidationFailureReason::PresentButEmpty, + }) + } else { + Ok(Some(v.1.to_owned())) + } };