Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cymbal): work in batches #26796

Merged
merged 9 commits into from
Dec 11, 2024
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/common/kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ tracing = { workspace = true }
uuid = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
28 changes: 28 additions & 0 deletions rust/common/kafka/src/kafka_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rdkafka::{
ClientConfig, Message,
};
use serde::de::DeserializeOwned;
use std::time::Duration;

use crate::config::{ConsumerConfig, KafkaConfig};

Expand Down Expand Up @@ -97,6 +98,33 @@ impl SingleTopicConsumer {

Ok((payload, offset))
}

pub async fn json_recv_batch<T>(
&self,
max: usize,
timeout: Duration,
) -> Vec<Result<(T, Offset), RecvErr>>
where
T: DeserializeOwned,
{
let mut results = Vec::with_capacity(max);

tokio::select! {
_ = tokio::time::sleep(timeout) => {},
_ = async {
while results.len() < max {
let result = self.json_recv::<T>().await;
let was_err = result.is_err();
results.push(result);
if was_err {
break; // Early exit on error, since it might indicate a kafka error or something
}
}
} => {}
}

results
}
}

pub struct Offset {
Expand Down
6 changes: 6 additions & 0 deletions rust/cymbal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ pub struct Config {
// Maximum number of lines of pre and post context to get per frame
#[envconfig(default = "15")]
pub context_line_count: usize,

#[envconfig(default = "1000")]
pub max_events_per_batch: usize,

#[envconfig(default = "10")]
pub max_event_batch_wait_seconds: u64,
}

impl Config {
Expand Down
32 changes: 31 additions & 1 deletion rust/cymbal/src/hack/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use rdkafka::{
};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::error::Error as SerdeError;
use std::sync::{Arc, Weak};
use std::{
sync::{Arc, Weak},
time::Duration,
};
use thiserror::Error;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -145,6 +148,33 @@ impl SingleTopicConsumer {

Ok((payload, offset))
}

pub async fn json_recv_batch<T>(
&self,
max: usize,
timeout: Duration,
) -> Vec<Result<(T, Offset), RecvErr>>
where
T: DeserializeOwned,
{
let mut results = Vec::with_capacity(max);

tokio::select! {
_ = tokio::time::sleep(timeout) => {},
_ = async {
while results.len() < max {
let result = self.json_recv::<T>().await;
let was_err = result.is_err();
results.push(result);
if was_err {
break; // Early exit on error, since it might indicate a kafka error or something
}
}
} => {}
}

results
}
}

pub struct Offset {
Expand Down
70 changes: 44 additions & 26 deletions rust/cymbal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,46 +58,64 @@ async fn main() {

start_health_liveness_server(&config, context.clone());

let batch_wait_time = std::time::Duration::from_secs(config.max_event_batch_wait_seconds);
let batch_size = config.max_events_per_batch;

loop {
let whole_loop = common_metrics::timing_guard(MAIN_LOOP_TIME, &[]);
context.worker_liveness.report_healthy().await;
// Just grab the event as a serde_json::Value and immediately drop it,
// we can work out a real type for it later (once we're deployed etc)
let (event, offset): (ClickHouseEvent, _) = match context.kafka_consumer.json_recv().await {
Ok(r) => r,
Err(RecvErr::Kafka(e)) => {
panic!("Kafka error: {}", e)
}
Err(err) => {
// If we failed to parse the message, or it was empty, just log and continue, our
// consumer has already stored the offset for us.
metrics::counter!(ERRORS, "cause" => "recv_err").increment(1);
error!("Error receiving message: {:?}", err);
continue;
}
};
metrics::counter!(EVENT_RECEIVED).increment(1);

let event = match handle_event(context.clone(), event).await {
Ok(e) => e,
Err(e) => {
error!("Error handling event: {:?}", e);
// If we get an unhandled error, it means we have some logical error in the code, or a
// dependency is down, and we should just fall over.
panic!("Unhandled error: {:?}", e);
}
};
let received: Vec<Result<(ClickHouseEvent, _), _>> = context
.kafka_consumer
.json_recv_batch(batch_size, batch_wait_time)
.await;

let mut output = Vec::with_capacity(received.len());
let mut offsets = Vec::with_capacity(received.len());
for message in received {
let (event, offset) = match message {
Ok(r) => r,
Err(RecvErr::Kafka(e)) => {
panic!("Kafka error: {}", e)
}
Err(err) => {
// If we failed to parse the message, or it was empty, just log and continue, our
// consumer has already stored the offset for us.
metrics::counter!(ERRORS, "cause" => "recv_err").increment(1);
error!("Error receiving message: {:?}", err);
continue;
}
};

metrics::counter!(EVENT_RECEIVED).increment(1);

let event = match handle_event(context.clone(), event).await {
Ok(e) => e,
Err(e) => {
error!("Error handling event: {:?}", e);
// If we get an unhandled error, it means we have some logical error in the code, or a
// dependency is down, and we should just fall over.
panic!("Unhandled error: {:?}", e);
}
};

output.push(event);
offsets.push(offset);
}

send_keyed_iter_to_kafka(
&context.kafka_producer,
&context.config.events_topic,
|ev| Some(ev.uuid.to_string()),
&[event],
&output,
)
.await
.expect("Failed to send event to Kafka");

offset.store().unwrap();
for offset in offsets {
offset.store().unwrap();
}

metrics::counter!(STACK_PROCESSED).increment(1);
whole_loop.label("finished", "true").fin();
Expand Down