diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 506c8d333363d..12ea18182d17e 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1219,6 +1219,7 @@ dependencies = [ "serde_json", "thiserror", "time", + "tokio", "tracing", "uuid", ] diff --git a/rust/common/kafka/Cargo.toml b/rust/common/kafka/Cargo.toml index 715e7bc04265a..b2bfddd0088c8 100644 --- a/rust/common/kafka/Cargo.toml +++ b/rust/common/kafka/Cargo.toml @@ -18,3 +18,4 @@ tracing = { workspace = true } uuid = { workspace = true } thiserror = { workspace = true } futures = { workspace = true } +tokio = { workspace = true } diff --git a/rust/common/kafka/src/kafka_consumer.rs b/rust/common/kafka/src/kafka_consumer.rs index 0b55027f682d6..79c3be7f986d5 100644 --- a/rust/common/kafka/src/kafka_consumer.rs +++ b/rust/common/kafka/src/kafka_consumer.rs @@ -6,6 +6,7 @@ use rdkafka::{ ClientConfig, Message, }; use serde::de::DeserializeOwned; +use std::time::Duration; use crate::config::{ConsumerConfig, KafkaConfig}; @@ -97,6 +98,33 @@ impl SingleTopicConsumer { Ok((payload, offset)) } + + pub async fn json_recv_batch<T>( + &self, + max: usize, + timeout: Duration, + ) -> Vec<Result<(T, Offset), RecvErr>> + where + T: DeserializeOwned, + { + let mut results = Vec::with_capacity(max); + + tokio::select! { + _ = tokio::time::sleep(timeout) => {}, + _ = async { + while results.len() < max { + let result = self.json_recv::<T>().await; + let was_err = result.is_err(); + results.push(result); + if was_err { + break; // Early exit on error, since it might indicate a kafka error or something + } + } + } => {} + } + + results + } } pub struct Offset { diff --git a/rust/cymbal/src/config.rs b/rust/cymbal/src/config.rs index 23968d02d1c4a..2b0cdaf20a50a 100644 --- a/rust/cymbal/src/config.rs +++ b/rust/cymbal/src/config.rs @@ -86,6 +86,12 @@ pub struct Config { // Maximum number of lines of pre and post context to get per frame #[envconfig(default = "15")] pub context_line_count: usize, + + #[envconfig(default = "1000")] + pub max_events_per_batch: usize, + + #[envconfig(default = "10")] + pub max_event_batch_wait_seconds: u64, } impl Config { diff --git a/rust/cymbal/src/hack/kafka.rs b/rust/cymbal/src/hack/kafka.rs index c57f27d80b0bd..cb26faede2165 100644 --- a/rust/cymbal/src/hack/kafka.rs +++ b/rust/cymbal/src/hack/kafka.rs @@ -9,7 +9,10 @@ use rdkafka::{ }; use serde::{de::DeserializeOwned, Serialize}; use serde_json::error::Error as SerdeError; -use std::sync::{Arc, Weak}; +use std::{ + sync::{Arc, Weak}, + time::Duration, +}; use thiserror::Error; use tracing::{debug, error, info}; @@ -145,6 +148,33 @@ impl SingleTopicConsumer { Ok((payload, offset)) } + + pub async fn json_recv_batch<T>( + &self, + max: usize, + timeout: Duration, + ) -> Vec<Result<(T, Offset), RecvErr>> + where + T: DeserializeOwned, + { + let mut results = Vec::with_capacity(max); + + tokio::select! { + _ = tokio::time::sleep(timeout) => {}, + _ = async { + while results.len() < max { + let result = self.json_recv::<T>().await; + let was_err = result.is_err(); + results.push(result); + if was_err { + break; // Early exit on error, since it might indicate a kafka error or something + } + } + } => {} + } + + results + } } pub struct Offset { diff --git a/rust/cymbal/src/main.rs b/rust/cymbal/src/main.rs index aeef86329462f..2146c4f46ee6d 100644 --- a/rust/cymbal/src/main.rs +++ b/rust/cymbal/src/main.rs @@ -58,46 +58,64 @@ async fn main() { start_health_liveness_server(&config, context.clone()); + let batch_wait_time = std::time::Duration::from_secs(config.max_event_batch_wait_seconds); + let batch_size = config.max_events_per_batch; + loop { let whole_loop = common_metrics::timing_guard(MAIN_LOOP_TIME, &[]); context.worker_liveness.report_healthy().await; // Just grab the event as a serde_json::Value and immediately drop it, // we can work out a real type for it later (once we're deployed etc) - let (event, offset): (ClickHouseEvent, _) = match context.kafka_consumer.json_recv().await { - Ok(r) => r, - Err(RecvErr::Kafka(e)) => { - panic!("Kafka error: {}", e) - } - Err(err) => { - // If we failed to parse the message, or it was empty, just log and continue, our - // consumer has already stored the offset for us. - metrics::counter!(ERRORS, "cause" => "recv_err").increment(1); - error!("Error receiving message: {:?}", err); - continue; - } - }; - metrics::counter!(EVENT_RECEIVED).increment(1); - - let event = match handle_event(context.clone(), event).await { - Ok(e) => e, - Err(e) => { - error!("Error handling event: {:?}", e); - // If we get an unhandled error, it means we have some logical error in the code, or a - // dependency is down, and we should just fall over. - panic!("Unhandled error: {:?}", e); - } - }; + let received: Vec<Result<(ClickHouseEvent, _), _>> = context + .kafka_consumer + .json_recv_batch(batch_size, batch_wait_time) + .await; + + let mut output = Vec::with_capacity(received.len()); + let mut offsets = Vec::with_capacity(received.len()); + for message in received { + let (event, offset) = match message { + Ok(r) => r, + Err(RecvErr::Kafka(e)) => { + panic!("Kafka error: {}", e) + } + Err(err) => { + // If we failed to parse the message, or it was empty, just log and continue, our + // consumer has already stored the offset for us. + metrics::counter!(ERRORS, "cause" => "recv_err").increment(1); + error!("Error receiving message: {:?}", err); + continue; + } + }; + + metrics::counter!(EVENT_RECEIVED).increment(1); + + let event = match handle_event(context.clone(), event).await { + Ok(e) => e, + Err(e) => { + error!("Error handling event: {:?}", e); + // If we get an unhandled error, it means we have some logical error in the code, or a + // dependency is down, and we should just fall over. + panic!("Unhandled error: {:?}", e); + } + }; + + output.push(event); + offsets.push(offset); + } send_keyed_iter_to_kafka( &context.kafka_producer, &context.config.events_topic, |ev| Some(ev.uuid.to_string()), - &[event], + &output, ) .await .expect("Failed to send event to Kafka"); - offset.store().unwrap(); + for offset in offsets { + offset.store().unwrap(); + } metrics::counter!(STACK_PROCESSED).increment(1); whole_loop.label("finished", "true").fin();