diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 59dbd0969bbfb..c5d8bd79df454 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -61,6 +61,19 @@ impl ConnectorParams { } } +pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + let s: String = de::Deserialize::deserialize(deserializer)?; + s.parse().map_err(|_| { + de::Error::invalid_value( + de::Unexpected::Str(&s), + &"integer greater than or equal to 0", + ) + }) +} + pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result where D: de::Deserializer<'de>, diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 5e44e8c0bea8c..968821c6b4dcd 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -35,7 +35,9 @@ use super::{ }; use crate::common::KafkaCommon; use crate::sink::{datum_to_json_object, record_to_json, Result}; -use crate::{deserialize_bool_from_string, deserialize_duration_from_string}; +use crate::{ + deserialize_bool_from_string, deserialize_duration_from_string, deserialize_u32_from_string, +}; pub const KAFKA_SINK: &str = "kafka"; @@ -55,6 +57,10 @@ const fn _default_use_transaction() -> bool { true } +const fn _default_force_append_only() -> bool { + false +} + #[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct KafkaConfig { @@ -66,8 +72,11 @@ pub struct KafkaConfig { pub r#type: String, // accept "append-only", "debezium", or "upsert" - #[serde(default)] - pub force_append_only: Option, + #[serde( + default = "_default_force_append_only", + deserialize_with = "deserialize_bool_from_string" + )] + pub force_append_only: bool, pub identifier: String, @@ -78,7 +87,11 @@ pub struct KafkaConfig { )] pub timeout: Duration, - #[serde(rename = "properties.retry.max", default = "_default_max_retries")] + #[serde( + rename = "properties.retry.max", + default = "_default_max_retries", + deserialize_with = "deserialize_u32_from_string" + )] pub max_retry_num: u32, #[serde( @@ -89,8 +102,8 @@ pub struct KafkaConfig { pub retry_interval: Duration, #[serde( - deserialize_with = "deserialize_bool_from_string", - default = "_default_use_transaction" + default = "_default_use_transaction", + deserialize_with = "deserialize_bool_from_string" )] pub use_transaction: bool, } @@ -565,17 +578,74 @@ mod test { "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), "topic".to_string() => "test".to_string(), "type".to_string() => "append-only".to_string(), + "force_append_only".to_string() => "true".to_string(), "use_transaction".to_string() => "False".to_string(), "properties.security.protocol".to_string() => "SASL".to_string(), "properties.sasl.mechanism".to_string() => "SASL".to_string(), "properties.sasl.username".to_string() => "test".to_string(), "properties.sasl.password".to_string() => "test".to_string(), "identifier".to_string() => "test_sink_1".to_string(), - "properties.timeout".to_string() => "5s".to_string(), + "properties.timeout".to_string() => "10s".to_string(), + "properties.retry.max".to_string() => "20".to_string(), + "properties.retry.interval".to_string() => "500ms".to_string(), }; - let config = KafkaConfig::from_hashmap(properties).unwrap(); - println!("{:?}", config); + assert_eq!(config.common.brokers, "localhost:9092"); + assert_eq!(config.common.topic, "test"); + assert_eq!(config.r#type, "append-only"); + assert!(config.force_append_only); + assert!(!config.use_transaction); + assert_eq!(config.timeout, Duration::from_secs(10)); + assert_eq!(config.max_retry_num, 20); + assert_eq!(config.retry_interval, Duration::from_millis(500)); + + // Optional fields eliminated. + let properties: HashMap = hashmap! { + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "upsert".to_string(), + "identifier".to_string() => "test_sink_2".to_string(), + }; + let config = KafkaConfig::from_hashmap(properties).unwrap(); + assert!(!config.force_append_only); + assert!(config.use_transaction); + assert_eq!(config.timeout, Duration::from_secs(5)); + assert_eq!(config.max_retry_num, 3); + assert_eq!(config.retry_interval, Duration::from_millis(100)); + + // Invalid u32 input. + let properties: HashMap = hashmap! { + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "upsert".to_string(), + "identifier".to_string() => "test_sink_3".to_string(), + "properties.retry.max".to_string() => "-20".to_string(), // error! + }; + assert!(KafkaConfig::from_hashmap(properties).is_err()); + + // Invalid bool input. + let properties: HashMap = hashmap! { + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "upsert".to_string(), + "identifier".to_string() => "test_sink_4".to_string(), + "force_append_only".to_string() => "yes".to_string(), // error! + }; + assert!(KafkaConfig::from_hashmap(properties).is_err()); + + // Invalid duration input. + let properties: HashMap = hashmap! { + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "upsert".to_string(), + "identifier".to_string() => "test_sink_5".to_string(), + "properties.retry.interval".to_string() => "500minutes".to_string(), // error! + }; + assert!(KafkaConfig::from_hashmap(properties).is_err()); } #[ignore]