diff --git a/.github/workflows/rust-docker-build.yml b/.github/workflows/rust-docker-build.yml index c13879481c5d1..a94b6fcd59b3d 100644 --- a/.github/workflows/rust-docker-build.yml +++ b/.github/workflows/rust-docker-build.yml @@ -31,6 +31,8 @@ jobs: dockerfile: ./rust/Dockerfile - image: property-defs-rs dockerfile: ./rust/Dockerfile + - image: error-tracking-rs + dockerfile: ./rust/Dockerfile runs-on: depot-ubuntu-22.04-4 permissions: id-token: write # allow issuing OIDC tokens for this workflow run @@ -46,6 +48,7 @@ jobs: hook-janitor_digest: ${{ steps.digest.outputs.hook-janitor_digest }} hook-worker_digest: ${{ steps.digest.outputs.hook-worker_digest }} hook-migrator_digest: ${{ steps.digest.outputs.hook-migrator_digest }} + error-tracking-rs_digest: ${{ steps.digest.outputs.error-tracking-rs_digest }} defaults: run: @@ -141,6 +144,10 @@ jobs: values: image: sha: '${{ needs.build.outputs.property-defs-rs_digest }}' + # - release: error-tracking-rs - disabled until a charts in place, for now we just build + # values: + # image: + # sha: '${{ needs.build.outputs.error-tracking-rs_digest }}' - release: hoghooks values: api_image: diff --git a/rust/Cargo.lock b/rust/Cargo.lock index bd3e14f286803..3b382f72d58b4 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -621,6 +621,7 @@ dependencies = [ "base64 0.22.0", "bytes", "common-alloc", + "common-types", "envconfig", "flate2", "futures", @@ -746,6 +747,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "common-types" +version = "0.1.0" +dependencies = [ + "serde", + "time", + "uuid", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1082,6 +1092,25 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "error-tracking" +version = "0.1.0" +dependencies = [ + "axum 0.7.5", + "common-alloc", + "common-kafka", + "common-metrics", + "common-types", + "envconfig", + "health", + "metrics", + "rdkafka", + "thiserror", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "etcetera" version = "0.8.0" @@ -2875,13 +2904,13 @@ dependencies = [ "axum 0.7.5", "chrono", "common-alloc", + "common-kafka", "common-metrics", "envconfig", "futures", "health", "metrics", "quick_cache", - "rdkafka", "serde", "serde_json", "serve-metrics", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 5c30dd1a8cf46..18a33a9b41185 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -8,6 +8,7 @@ members = [ "common/metrics", "common/dns", "common/alloc", + "common/types", "feature-flags", "hook-api", "hook-common", @@ -17,6 +18,7 @@ members = [ "cyclotron-node", "cyclotron-janitor", "cyclotron-fetch", + "error-tracking", ] [workspace.lints.rust] diff --git a/rust/capture/Cargo.toml b/rust/capture/Cargo.toml index 7b50fe760b742..5cee6be0fb01d 100644 --- a/rust/capture/Cargo.toml +++ b/rust/capture/Cargo.toml @@ -18,6 +18,7 @@ flate2 = { workspace = true } governor = { workspace = true } health = { path = "../common/health" } common-alloc = { path = "../common/alloc" } +common-types = { path = "../common/types" } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } opentelemetry = { workspace = true } diff --git a/rust/capture/src/api.rs b/rust/capture/src/api.rs index c7441d97f5c42..b3c93c5462c6d 100644 --- a/rust/capture/src/api.rs +++ b/rust/capture/src/api.rs @@ -2,8 +2,6 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use serde::{Deserialize, Serialize}; use thiserror::Error; -use time::OffsetDateTime; -use uuid::Uuid; use crate::token::InvalidTokenReason; @@ -94,37 +92,3 @@ impl IntoResponse for CaptureError { .into_response() } } - -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum DataType { - AnalyticsMain, - AnalyticsHistorical, - ClientIngestionWarning, - HeatmapMain, - ExceptionMain, - SnapshotMain, -} -#[derive(Clone, Debug, Serialize, Eq, PartialEq)] -pub struct ProcessedEvent { - #[serde(skip_serializing)] - pub data_type: DataType, - pub uuid: Uuid, - pub distinct_id: String, - pub ip: String, - pub data: String, - pub now: String, - #[serde( - with = "time::serde::rfc3339::option", - skip_serializing_if = "Option::is_none" - )] - pub sent_at: Option, - pub token: String, - #[serde(skip_serializing)] - pub session_id: Option, -} - -impl ProcessedEvent { - pub fn key(&self) -> String { - format!("{}:{}", self.token, self.distinct_id) - } -} diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index 2d189abefa6b3..093ab6b02da99 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -1,5 +1,7 @@ use crate::limiters::redis::RedisLimiter; +use crate::v0_request::{DataType, ProcessedEvent}; use async_trait::async_trait; + use health::HealthHandle; use metrics::{counter, gauge, histogram}; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; @@ -12,7 +14,7 @@ use tokio::task::JoinSet; use tracing::log::{debug, error, info}; use tracing::{info_span, instrument, Instrument}; -use crate::api::{CaptureError, DataType, ProcessedEvent}; +use crate::api::CaptureError; use crate::config::KafkaConfig; use crate::limiters::overflow::OverflowLimiter; use crate::prometheus::report_dropped_events; @@ -195,15 +197,17 @@ impl KafkaSink { } async fn kafka_send(&self, event: ProcessedEvent) -> Result { + let (event, metadata) = (event.event, event.metadata); + let payload = serde_json::to_string(&event).map_err(|e| { error!("failed to serialize event: {}", e); CaptureError::NonRetryableSinkError })?; let token = event.token.clone(); - let data_type = event.data_type; + let data_type = metadata.data_type; let event_key = event.key(); - let session_id = event.session_id.clone(); + let session_id = metadata.session_id.clone(); drop(event); // Events can be EXTREMELY memory hungry @@ -349,12 +353,14 @@ impl Event for KafkaSink { #[cfg(test)] mod tests { - use crate::api::{CaptureError, DataType, ProcessedEvent}; + use crate::api::CaptureError; use crate::config; use crate::limiters::overflow::OverflowLimiter; use crate::sinks::kafka::KafkaSink; use crate::sinks::Event; use crate::utils::uuid_v7; + use crate::v0_request::{DataType, ProcessedEvent, ProcessedEventMetadata}; + use common_types::CapturedEvent; use health::HealthRegistry; use rand::distributions::Alphanumeric; use rand::Rng; @@ -405,8 +411,7 @@ mod tests { // We test different cases in a single test to amortize the startup cost of the producer. let (cluster, sink) = start_on_mocked_sink(Some(3000000)).await; - let event: ProcessedEvent = ProcessedEvent { - data_type: DataType::AnalyticsMain, + let event: CapturedEvent = CapturedEvent { uuid: uuid_v7(), distinct_id: "id1".to_string(), ip: "".to_string(), @@ -414,9 +419,18 @@ mod tests { now: "".to_string(), sent_at: None, token: "token1".to_string(), + }; + + let metadata = ProcessedEventMetadata { + data_type: DataType::AnalyticsMain, session_id: None, }; + let event = ProcessedEvent { + event, + metadata: metadata.clone(), + }; + // Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster for _ in 0..20 { if sink.send(event.clone()).await.is_ok() { @@ -438,8 +452,7 @@ mod tests { .take(2_000_000) .map(char::from) .collect(); - let big_event: ProcessedEvent = ProcessedEvent { - data_type: DataType::AnalyticsMain, + let captured = CapturedEvent { uuid: uuid_v7(), distinct_id: "id1".to_string(), ip: "".to_string(), @@ -447,8 +460,13 @@ mod tests { now: "".to_string(), sent_at: None, token: "token1".to_string(), - session_id: None, }; + + let big_event = ProcessedEvent { + event: captured, + metadata: metadata.clone(), + }; + sink.send(big_event) .await .expect("failed to send event larger than default max size"); @@ -459,17 +477,20 @@ mod tests { .take(4_000_000) .map(char::from) .collect(); - let big_event: ProcessedEvent = ProcessedEvent { - data_type: DataType::AnalyticsMain, - uuid: uuid_v7(), - distinct_id: "id1".to_string(), - ip: "".to_string(), - data: big_data, - now: "".to_string(), - sent_at: None, - token: "token1".to_string(), - session_id: None, + + let big_event = ProcessedEvent { + event: CapturedEvent { + uuid: uuid_v7(), + distinct_id: "id1".to_string(), + ip: "".to_string(), + data: big_data, + now: "".to_string(), + sent_at: None, + token: "token1".to_string(), + }, + metadata: metadata.clone(), }; + match sink.send(big_event).await { Err(CaptureError::EventTooBig) => {} // Expected Err(err) => panic!("wrong error code {}", err), diff --git a/rust/capture/src/sinks/mod.rs b/rust/capture/src/sinks/mod.rs index bedbcbc8df69d..8b88ec57b65e7 100644 --- a/rust/capture/src/sinks/mod.rs +++ b/rust/capture/src/sinks/mod.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use crate::api::{CaptureError, ProcessedEvent}; +use crate::{api::CaptureError, v0_request::ProcessedEvent}; pub mod kafka; pub mod print; diff --git a/rust/capture/src/sinks/print.rs b/rust/capture/src/sinks/print.rs index 7845a3d039b56..b883ebf67ba2a 100644 --- a/rust/capture/src/sinks/print.rs +++ b/rust/capture/src/sinks/print.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; + use metrics::{counter, histogram}; use tracing::log::info; -use crate::api::{CaptureError, ProcessedEvent}; +use crate::api::CaptureError; use crate::sinks::Event; +use crate::v0_request::ProcessedEvent; pub struct PrintSink {} diff --git a/rust/capture/src/v0_endpoint.rs b/rust/capture/src/v0_endpoint.rs index e01b9cbf5bc22..e21e3a5c94520 100644 --- a/rust/capture/src/v0_endpoint.rs +++ b/rust/capture/src/v0_endpoint.rs @@ -8,15 +8,18 @@ use axum::extract::{MatchedPath, Query, State}; use axum::http::{HeaderMap, Method}; use axum_client_ip::InsecureClientIp; use base64::Engine; +use common_types::CapturedEvent; use metrics::counter; use serde_json::json; use serde_json::Value; use tracing::instrument; use crate::prometheus::report_dropped_events; -use crate::v0_request::{Compression, ProcessingContext, RawRequest}; +use crate::v0_request::{ + Compression, DataType, ProcessedEvent, ProcessedEventMetadata, ProcessingContext, RawRequest, +}; use crate::{ - api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent}, + api::{CaptureError, CaptureResponse, CaptureResponseCode}, router, sinks, utils::uuid_v7, v0_request::{EventFormData, EventQuery, RawEvent}, @@ -277,8 +280,12 @@ pub fn process_single_event( CaptureError::NonRetryableSinkError })?; - Ok(ProcessedEvent { + let metadata = ProcessedEventMetadata { data_type, + session_id: None, + }; + + let event = CapturedEvent { uuid: event.uuid.unwrap_or_else(uuid_v7), distinct_id: event.extract_distinct_id()?, ip: context.client_ip.clone(), @@ -286,8 +293,8 @@ pub fn process_single_event( now: context.now.clone(), sent_at: context.sent_at, token: context.token.clone(), - session_id: None, - }) + }; + Ok(ProcessedEvent { metadata, event }) } #[instrument(skip_all, fields(events = events.len()))] @@ -351,8 +358,17 @@ pub async fn process_replay_events<'a>( } } - let event = ProcessedEvent { + let metadata = ProcessedEventMetadata { data_type: DataType::SnapshotMain, + session_id: Some( + session_id + .as_str() + .ok_or(CaptureError::InvalidSessionId)? + .to_string(), + ), + }; + + let event = CapturedEvent { uuid, distinct_id: distinct_id.clone(), ip: context.client_ip.clone(), @@ -370,13 +386,7 @@ pub async fn process_replay_events<'a>( now: context.now.clone(), sent_at: context.sent_at, token: context.token.clone(), - session_id: Some( - session_id - .as_str() - .ok_or(CaptureError::InvalidSessionId)? - .to_string(), - ), }; - sink.send(event).await + sink.send(ProcessedEvent { metadata, event }).await } diff --git a/rust/capture/src/v0_request.rs b/rust/capture/src/v0_request.rs index c7e62ec3d06fb..e3ea92c7ae53e 100644 --- a/rust/capture/src/v0_request.rs +++ b/rust/capture/src/v0_request.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::io::prelude::*; use bytes::{Buf, Bytes}; +use common_types::CapturedEvent; use flate2::read::GzDecoder; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; @@ -283,6 +284,28 @@ pub struct ProcessingContext { pub historical_migration: bool, } +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum DataType { + AnalyticsMain, + AnalyticsHistorical, + ClientIngestionWarning, + HeatmapMain, + ExceptionMain, + SnapshotMain, +} + +#[derive(Debug, Clone)] +pub struct ProcessedEvent { + pub metadata: ProcessedEventMetadata, + pub event: CapturedEvent, +} + +#[derive(Debug, Clone)] +pub struct ProcessedEventMetadata { + pub data_type: DataType, + pub session_id: Option, +} + #[cfg(test)] mod tests { use crate::token::InvalidTokenReason; diff --git a/rust/capture/tests/django_compat.rs b/rust/capture/tests/django_compat.rs index f6509750ddf78..67be98629cf11 100644 --- a/rust/capture/tests/django_compat.rs +++ b/rust/capture/tests/django_compat.rs @@ -4,13 +4,14 @@ use axum::http::StatusCode; use axum_test_helper::TestClient; use base64::engine::general_purpose; use base64::Engine; -use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent}; +use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode}; use capture::config::CaptureMode; use capture::limiters::redis::{QuotaResource, RedisLimiter, QUOTA_LIMITER_CACHE_KEY}; use capture::redis::MockRedisClient; use capture::router::router; use capture::sinks::Event; use capture::time::TimeSource; +use capture::v0_request::{DataType, ProcessedEvent}; use health::HealthRegistry; use serde::Deserialize; use serde_json::{json, Value}; @@ -161,9 +162,9 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { { // Ensure the data type matches if case.historical_migration { - assert_eq!(DataType::AnalyticsHistorical, message.data_type); + assert_eq!(DataType::AnalyticsHistorical, message.metadata.data_type); } else { - assert_eq!(DataType::AnalyticsMain, message.data_type); + assert_eq!(DataType::AnalyticsMain, message.metadata.data_type); } // Normalizing the expected event to align with known django->rust inconsistencies @@ -193,7 +194,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { object.remove("library_version"); } - let found_props: Value = serde_json::from_str(&message.data)?; + let found_props: Value = serde_json::from_str(&message.event.data)?; let match_config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict); if let Err(e) = @@ -205,7 +206,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { ); mismatches += 1; } else { - *expected_data = json!(&message.data) + *expected_data = json!(&message.event.data) } } @@ -221,7 +222,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { let match_config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict); if let Err(e) = - assert_json_matches_no_panic(&json!(expected), &json!(message), match_config) + assert_json_matches_no_panic(&json!(expected), &json!(message.event), match_config) { println!( "record mismatch at line {}, event {}: {}", diff --git a/rust/common/kafka/src/config.rs b/rust/common/kafka/src/config.rs index 8154a1d3fb7c5..81ef7402ab429 100644 --- a/rust/common/kafka/src/config.rs +++ b/rust/common/kafka/src/config.rs @@ -20,3 +20,23 @@ pub struct KafkaConfig { #[envconfig(default = "localhost:9092")] pub kafka_hosts: String, } + +#[derive(Envconfig, Clone)] +pub struct ConsumerConfig { + pub kafka_consumer_group: String, + pub kafka_consumer_topic: String, +} + +impl ConsumerConfig { + /// Because the consumer config is application specific, we + /// can't set good defaults in the derive macro, so we expose a way + /// for users to set them here before init'ing their main config struct + pub fn set_defaults(consumer_group: &str, consumer_topic: &str) { + if std::env::var("KAFKA_CONSUMER_GROUP").is_err() { + std::env::set_var("KAFKA_CONSUMER_GROUP", consumer_group); + }; + if std::env::var("KAFKA_CONSUMER_TOPIC").is_err() { + std::env::set_var("KAFKA_CONSUMER_TOPIC", consumer_topic); + }; + } +} diff --git a/rust/common/kafka/src/kafka_consumer.rs b/rust/common/kafka/src/kafka_consumer.rs new file mode 100644 index 0000000000000..0b55027f682d6 --- /dev/null +++ b/rust/common/kafka/src/kafka_consumer.rs @@ -0,0 +1,116 @@ +use std::sync::{Arc, Weak}; + +use rdkafka::{ + consumer::{Consumer, StreamConsumer}, + error::KafkaError, + ClientConfig, Message, +}; +use serde::de::DeserializeOwned; + +use crate::config::{ConsumerConfig, KafkaConfig}; + +#[derive(Clone)] +pub struct SingleTopicConsumer { + inner: Arc, +} + +struct Inner { + consumer: StreamConsumer, + topic: String, +} + +#[derive(Debug, thiserror::Error)] +pub enum RecvErr { + #[error("Kafka error: {0}")] + Kafka(#[from] KafkaError), + #[error("Serde error: {0}")] + Serde(#[from] serde_json::Error), + #[error("Received empty payload")] + Empty, +} + +#[derive(Debug, thiserror::Error)] +pub enum OffsetErr { + #[error("Kafka error: {0}")] + Kafka(#[from] KafkaError), + #[error("Consumer gone")] + Gone, +} + +impl SingleTopicConsumer { + pub fn new( + common_config: KafkaConfig, + consumer_config: ConsumerConfig, + ) -> Result { + let mut client_config = ClientConfig::new(); + client_config + .set("bootstrap.servers", &common_config.kafka_hosts) + .set("statistics.interval.ms", "10000") + .set("group.id", consumer_config.kafka_consumer_group); + + client_config.set("enable.auto.offset.store", "false"); + + if common_config.kafka_tls { + client_config + .set("security.protocol", "ssl") + .set("enable.ssl.certificate.verification", "false"); + }; + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[consumer_config.kafka_consumer_topic.as_str()])?; + + let inner = Inner { + consumer, + topic: consumer_config.kafka_consumer_topic, + }; + Ok(Self { + inner: Arc::new(inner), + }) + } + + pub async fn json_recv(&self) -> Result<(T, Offset), RecvErr> + where + T: DeserializeOwned, + { + let message = self.inner.consumer.recv().await?; + + let offset = Offset { + handle: Arc::downgrade(&self.inner), + partition: message.partition(), + offset: message.offset(), + }; + + let Some(payload) = message.payload() else { + // We auto-store poison pills, panicking on failure + offset.store().unwrap(); + return Err(RecvErr::Empty); + }; + + let payload = match serde_json::from_slice(payload) { + Ok(p) => p, + Err(e) => { + // We auto-store poison pills, panicking on failure + offset.store().unwrap(); + return Err(RecvErr::Serde(e)); + } + }; + + Ok((payload, offset)) + } +} + +pub struct Offset { + handle: Weak, + partition: i32, + offset: i64, +} + +impl Offset { + pub fn store(self) -> Result<(), OffsetErr> { + let inner = self.handle.upgrade().ok_or(OffsetErr::Gone)?; + inner + .consumer + .store_offset(&inner.topic, self.partition, self.offset)?; + Ok(()) + } +} diff --git a/rust/common/kafka/src/lib.rs b/rust/common/kafka/src/lib.rs index f506de579d88c..0f39a9504d116 100644 --- a/rust/common/kafka/src/lib.rs +++ b/rust/common/kafka/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod kafka_consumer; pub mod kafka_messages; pub mod kafka_producer; pub mod test; diff --git a/rust/common/types/Cargo.toml b/rust/common/types/Cargo.toml new file mode 100644 index 0000000000000..4346b543d038c --- /dev/null +++ b/rust/common/types/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "common-types" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +serde = { workspace = true } +uuid = { workspace = true } +time = {workspace = true } \ No newline at end of file diff --git a/rust/common/types/README.md b/rust/common/types/README.md new file mode 100644 index 0000000000000..97004f3882fe6 --- /dev/null +++ b/rust/common/types/README.md @@ -0,0 +1,3 @@ +# Common types + +For types used across our projects, like events, persons, etc. Each time you go to copy a type from somewhere, put it here instead. \ No newline at end of file diff --git a/rust/common/types/src/event.rs b/rust/common/types/src/event.rs new file mode 100644 index 0000000000000..9a3c95c3a8e02 --- /dev/null +++ b/rust/common/types/src/event.rs @@ -0,0 +1,25 @@ +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +// The event type that capture produces +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct CapturedEvent { + pub uuid: Uuid, + pub distinct_id: String, + pub ip: String, + pub data: String, + pub now: String, + #[serde( + with = "time::serde::rfc3339::option", + skip_serializing_if = "Option::is_none" + )] + pub sent_at: Option, + pub token: String, +} + +impl CapturedEvent { + pub fn key(&self) -> String { + format!("{}:{}", self.token, self.distinct_id) + } +} diff --git a/rust/common/types/src/lib.rs b/rust/common/types/src/lib.rs new file mode 100644 index 0000000000000..a9df0571b4782 --- /dev/null +++ b/rust/common/types/src/lib.rs @@ -0,0 +1,4 @@ +mod event; + +// Events +pub use event::CapturedEvent; diff --git a/rust/error-tracking/Cargo.toml b/rust/error-tracking/Cargo.toml new file mode 100644 index 0000000000000..732f2bfb648c7 --- /dev/null +++ b/rust/error-tracking/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "error-tracking" +version = "0.1.0" +edition = "2021" + +[dependencies] +rdkafka = { workspace = true } +tokio = { workspace = true } +envconfig = {workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +health = { path = "../common/health" } +axum = { workspace = true } +metrics = { workspace = true } +common-metrics = { path = "../common/metrics" } +common-alloc = { path = "../common/alloc" } +common-types = { path = "../common/types" } +common-kafka = { path = "../common/kafka" } +thiserror = { workspace = true } + +[lints] +workspace = true diff --git a/rust/error-tracking/README.md b/rust/error-tracking/README.md new file mode 100644 index 0000000000000..b78d5e2192c03 --- /dev/null +++ b/rust/error-tracking/README.md @@ -0,0 +1,3 @@ +# Error tracking + +You throw 'em, we catch 'em. diff --git a/rust/error-tracking/src/app_context.rs b/rust/error-tracking/src/app_context.rs new file mode 100644 index 0000000000000..c4d83cdfcabf2 --- /dev/null +++ b/rust/error-tracking/src/app_context.rs @@ -0,0 +1,35 @@ +use std::time::Duration; + +use common_kafka::kafka_consumer::SingleTopicConsumer; +use health::{HealthHandle, HealthRegistry}; +use tracing::info; + +use crate::{config::Config, error::Error}; + +pub struct AppContext { + pub health_registry: HealthRegistry, + pub worker_liveness: HealthHandle, + pub consumer: SingleTopicConsumer, +} + +impl AppContext { + pub async fn new(config: &Config) -> Result { + let health_registry = HealthRegistry::new("liveness"); + let worker_liveness = health_registry + .register("worker".to_string(), Duration::from_secs(60)) + .await; + + let consumer = SingleTopicConsumer::new(config.kafka.clone(), config.consumer.clone())?; + + info!( + "AppContext initialized, subscribed to topic {}", + config.consumer.kafka_consumer_topic + ); + + Ok(Self { + health_registry, + worker_liveness, + consumer, + }) + } +} diff --git a/rust/error-tracking/src/config.rs b/rust/error-tracking/src/config.rs new file mode 100644 index 0000000000000..1e530b6e1722f --- /dev/null +++ b/rust/error-tracking/src/config.rs @@ -0,0 +1,24 @@ +use common_kafka::config::{ConsumerConfig, KafkaConfig}; +use envconfig::Envconfig; + +#[derive(Envconfig, Clone)] +pub struct Config { + #[envconfig(from = "BIND_HOST", default = "::")] + pub host: String, + + #[envconfig(from = "BIND_PORT", default = "3301")] + pub port: u16, + + #[envconfig(nested = true)] + pub kafka: KafkaConfig, + + #[envconfig(nested = true)] + pub consumer: ConsumerConfig, +} + +impl Config { + pub fn init_with_defaults() -> Result { + ConsumerConfig::set_defaults("error-tracking-rs", "exceptions_ingestions"); + Self::init_from_env() + } +} diff --git a/rust/error-tracking/src/error.rs b/rust/error-tracking/src/error.rs new file mode 100644 index 0000000000000..f8a6d90d32930 --- /dev/null +++ b/rust/error-tracking/src/error.rs @@ -0,0 +1,10 @@ +use rdkafka::error::KafkaError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Config error: {0}")] + ConfigError(#[from] envconfig::Error), + #[error("Kafka error: {0}")] + KafkaError(#[from] KafkaError), +} diff --git a/rust/error-tracking/src/lib.rs b/rust/error-tracking/src/lib.rs new file mode 100644 index 0000000000000..806fb49e9cef5 --- /dev/null +++ b/rust/error-tracking/src/lib.rs @@ -0,0 +1,3 @@ +pub mod app_context; +pub mod config; +pub mod error; diff --git a/rust/error-tracking/src/main.rs b/rust/error-tracking/src/main.rs new file mode 100644 index 0000000000000..148c508e02906 --- /dev/null +++ b/rust/error-tracking/src/main.rs @@ -0,0 +1,68 @@ +use std::{future::ready, sync::Arc}; + +use axum::{routing::get, Router}; +use common_metrics::{serve, setup_metrics_routes}; +use common_types::CapturedEvent; +use envconfig::Envconfig; +use error_tracking::{app_context::AppContext, config::Config, error::Error}; +use tokio::task::JoinHandle; +use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; + +common_alloc::used!(); + +fn setup_tracing() { + let log_layer: tracing_subscriber::filter::Filtered< + tracing_subscriber::fmt::Layer, + EnvFilter, + tracing_subscriber::Registry, + > = tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()); + tracing_subscriber::registry().with(log_layer).init(); +} + +pub async fn index() -> &'static str { + "error tracking service" +} + +fn start_health_liveness_server(config: &Config, context: Arc) -> JoinHandle<()> { + let config = config.clone(); + let router = Router::new() + .route("/", get(index)) + .route("/_readiness", get(index)) + .route( + "/_liveness", + get(move || ready(context.health_registry.get_status())), + ); + let router = setup_metrics_routes(router); + let bind = format!("{}:{}", config.host, config.port); + tokio::task::spawn(async move { + serve(router, &bind) + .await + .expect("failed to start serving metrics"); + }) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + setup_tracing(); + info!("Starting up..."); + + let config = Config::init_from_env()?; + let context = Arc::new(AppContext::new(&config).await?); + + start_health_liveness_server(&config, context.clone()); + + loop { + context.worker_liveness.report_healthy().await; + let (_, offset): (CapturedEvent, _) = match context.consumer.json_recv().await { + Ok(r) => r, + Err(err) => { + metrics::counter!("error_tracking_errors").increment(1); + error!("Error receiving message: {:?}", err); + continue; + } + }; + offset.store().unwrap(); + metrics::counter!("error_tracking_events_received").increment(1); + } +} diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml index f0ec58d5a6fac..1321a81ae98fd 100644 --- a/rust/property-defs-rs/Cargo.toml +++ b/rust/property-defs-rs/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" [dependencies] serde = { workspace = true } serde_json = { workspace = true } -rdkafka = { workspace = true } tokio = { workspace = true } envconfig = {workspace = true } tracing = { workspace = true } @@ -22,6 +21,7 @@ chrono = { workspace = true } quick_cache = { workspace = true } common-metrics = { path = "../common/metrics" } common-alloc = { path = "../common/alloc" } +common-kafka = { path = "../common/kafka" } ahash = { workspace = true } uuid = { workspace = true } diff --git a/rust/property-defs-rs/src/bin/benchmark_1million.rs b/rust/property-defs-rs/src/bin/benchmark_1million.rs deleted file mode 100644 index cbb5b1f74d75b..0000000000000 --- a/rust/property-defs-rs/src/bin/benchmark_1million.rs +++ /dev/null @@ -1,147 +0,0 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; - -use property_defs_rs::types::{Event, Update}; -use quick_cache::sync::Cache; -use tokio::sync::mpsc::{ - self, - error::{TryRecvError, TrySendError}, -}; - -// This is a bad hack to just copy the function like this, but I'll refactor later -async fn spawn_producer_loop( - mut consumer: mpsc::Receiver, - channel: mpsc::Sender, - shared_cache: Arc>, - skip_threshold: usize, - compaction_batch_size: usize, - total_updates_received: Arc, -) { - let mut batch = ahash::AHashSet::with_capacity(compaction_batch_size); - let mut last_send = tokio::time::Instant::now(); - loop { - let event = match consumer.try_recv() { - Ok(event) => event, - Err(TryRecvError::Empty) => { - println!("Empty"); - consumer.recv().await.unwrap() - } - Err(TryRecvError::Disconnected) => { - return; - } - }; - - let updates = event.into_updates(skip_threshold); - total_updates_received.fetch_add(updates.len(), std::sync::atomic::Ordering::Relaxed); - - for update in updates { - if batch.contains(&update) { - continue; - } - batch.insert(update); - - if batch.len() >= compaction_batch_size || last_send.elapsed() > Duration::from_secs(10) - { - last_send = tokio::time::Instant::now(); - for update in batch.drain() { - if shared_cache.get(&update).is_some() { - continue; - } - shared_cache.insert(update.clone(), ()); - match channel.try_send(update) { - Ok(_) => {} - Err(TrySendError::Full(update)) => { - println!("Worker blocked"); - channel.send(update).await.unwrap(); - } - Err(e) => { - panic!("Coordinator send failed: {:?}", e); - } - } - } - } - } - } -} - -const EVENT_COUNT: usize = 1_000_000; -const COMPACTION_BATCH_SIZE: usize = 10_000; -const SKIP_THRESHOLD: usize = 10_000; -const CACHE_SIZE: usize = 5_000_000; -const CHANNEL_SIZE: usize = 50_000; - -#[tokio::main] -async fn main() { - let (in_tx, in_rx) = mpsc::channel(CHANNEL_SIZE); - let (out_tx, mut out_rx) = mpsc::channel(CHANNEL_SIZE); - let cache = Arc::new(Cache::new(CACHE_SIZE)); - let total_updates_received = Arc::new(std::sync::atomic::AtomicUsize::new(0)); - - let test_handle = tokio::spawn(spawn_producer_loop( - in_rx, - out_tx, - cache.clone(), - SKIP_THRESHOLD, - COMPACTION_BATCH_SIZE, - total_updates_received.clone(), - )); - - let test_events = (0..EVENT_COUNT) - .map(generate_test_event) - .collect::>(); - - let total_updates_issued: Arc = - Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let total_updates_issued_mv = total_updates_issued.clone(); - let return_handle = tokio::spawn(async move { - let mut batch = Vec::with_capacity(CHANNEL_SIZE); - while out_rx.recv_many(&mut batch, CHANNEL_SIZE).await > 0 { - total_updates_issued_mv.fetch_add(batch.len(), std::sync::atomic::Ordering::Relaxed); - batch.clear() - } - }); - - let sender_handle = tokio::spawn(async move { - for event in test_events { - in_tx.send(event).await.unwrap(); - } - }); - - // Give that a second to run - tokio::time::sleep(Duration::from_secs(1)).await; - - let start = tokio::time::Instant::now(); - test_handle.await.unwrap(); - let elapsed = start.elapsed(); - println!( - "Processed {} events in {}s, {} events/s, issued {} updates, {} total updates ({} ratio)", - EVENT_COUNT, - elapsed.as_secs_f64(), - EVENT_COUNT as f64 / elapsed.as_secs_f64(), - total_updates_issued.load(std::sync::atomic::Ordering::Relaxed), - total_updates_received.load(std::sync::atomic::Ordering::Relaxed), - total_updates_issued.load(std::sync::atomic::Ordering::Relaxed) as f64 - / total_updates_received.load(std::sync::atomic::Ordering::Relaxed) as f64 - ); - - sender_handle.await.unwrap(); - return_handle.await.unwrap(); -} - -// This generates "random" events, in a world where we have N teams, each sending 8 different events, each with 100 properties -// That means we have N * 8 * 100 = N*800 EventProperties, as well as N*8 event definitions and N*100 properties -// in the universe of possible updates to generate. Setting N to 1000 gives 800_000 possible EventProperties, -// 8000 event definitions and 100_000 properties. -fn generate_test_event(seed: usize) -> Event { - let team_id = (seed % 1000) as i32; - let event_name = format!("test_event_{}", seed % 8); // Imagine each team sends about 8 different events - let properties: HashMap = - (0..100) // The average event has 100 properties - .map(|i| (format!("key_{}", i), format!("val_{}", i))) - .collect(); - - Event { - team_id, - event: event_name, - properties: Some(serde_json::to_string(&properties).unwrap()), - } -} diff --git a/rust/property-defs-rs/src/bin/generate_test_data.rs b/rust/property-defs-rs/src/bin/generate_test_data.rs deleted file mode 100644 index f8f226082f048..0000000000000 --- a/rust/property-defs-rs/src/bin/generate_test_data.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::collections::HashMap; - -use envconfig::Envconfig; -use property_defs_rs::{config::Config, types::Event}; -use rdkafka::{ - producer::{FutureProducer, FutureRecord}, - ClientConfig, -}; - -fn generate_test_event(seed: usize) -> Event { - let team_id = (seed % 100) as i32; - let event_name = format!("test_event_{}", seed % 8); - let prop_key = format!("prop_{}", seed % 1000); - let properties: HashMap = - (0..100) // The average event has 100 properties - .map(|i| (prop_key.clone(), format!("val_{}", i))) - .collect(); - - Event { - team_id, - event: event_name, - properties: Some(serde_json::to_string(&properties).unwrap()), - } -} - -// A simple kafka producer that pushes a million events into a topic -#[tokio::main] -async fn main() -> Result<(), Box> { - let config = Config::init_from_env()?; - let kafka_config: ClientConfig = (&config.kafka).into(); - let producer: FutureProducer = kafka_config.create()?; - let topic = config.kafka.event_topic.as_str(); - - let mut acks = Vec::with_capacity(1_000_000); - for i in 0..10_000_000 { - let event = generate_test_event(i); - let key = event.team_id.to_string(); - let payload = serde_json::to_string(&event)?; - let record = FutureRecord { - topic, - key: Some(&key), - payload: Some(&payload), - partition: None, - timestamp: None, - headers: None, - }; - let ack = producer.send_result(record).unwrap(); - acks.push(ack); - - if i % 1000 == 0 { - println!("Sent {} events", i); - } - } - - let mut i = 0; - for ack in acks { - ack.await?.unwrap(); - i += 1; - if i % 1000 == 0 { - println!("Received ack for {} events", i); - } - } - Ok(()) -} diff --git a/rust/property-defs-rs/src/config.rs b/rust/property-defs-rs/src/config.rs index 41494a2eab6ba..f343a61556624 100644 --- a/rust/property-defs-rs/src/config.rs +++ b/rust/property-defs-rs/src/config.rs @@ -1,7 +1,7 @@ use std::{num::ParseIntError, str::FromStr}; +use common_kafka::config::{ConsumerConfig, KafkaConfig}; use envconfig::Envconfig; -use rdkafka::ClientConfig; #[derive(Envconfig, Clone)] pub struct Config { @@ -14,6 +14,9 @@ pub struct Config { #[envconfig(nested = true)] pub kafka: KafkaConfig, + #[envconfig(nested = true)] + pub consumer: ConsumerConfig, + #[envconfig(default = "10")] pub max_concurrent_transactions: usize, @@ -106,38 +109,6 @@ pub struct Config { pub filter_mode: TeamFilterMode, } -#[derive(Envconfig, Clone)] -pub struct KafkaConfig { - #[envconfig(default = "kafka:9092")] - pub kafka_hosts: String, - #[envconfig(default = "clickhouse_events_json")] - pub event_topic: String, - #[envconfig(default = "false")] - pub kafka_tls: bool, - #[envconfig(default = "false")] - pub verify_ssl_certificate: bool, - #[envconfig(default = "property-definitions-rs")] - pub consumer_group: String, -} - -impl From<&KafkaConfig> for ClientConfig { - fn from(config: &KafkaConfig) -> Self { - let mut client_config = ClientConfig::new(); - client_config - .set("bootstrap.servers", &config.kafka_hosts) - .set("statistics.interval.ms", "10000") - .set("group.id", config.consumer_group.clone()); - - if config.kafka_tls { - client_config.set("security.protocol", "ssl").set( - "enable.ssl.certificate.verification", - config.verify_ssl_certificate.to_string(), - ); - }; - client_config - } -} - #[derive(Clone)] pub struct TeamList { pub teams: Vec, @@ -188,3 +159,10 @@ impl TeamFilterMode { } } } + +impl Config { + pub fn init_with_defaults() -> Result { + ConsumerConfig::set_defaults("property-defs-rs", "clickhouse_events_json"); + Config::init_from_env() + } +} diff --git a/rust/property-defs-rs/src/lib.rs b/rust/property-defs-rs/src/lib.rs index b871a5a97738d..7c639d72efa90 100644 --- a/rust/property-defs-rs/src/lib.rs +++ b/rust/property-defs-rs/src/lib.rs @@ -1,41 +1,4 @@ -use metrics_consts::{EMPTY_EVENTS, EVENT_PARSE_ERROR}; -use rdkafka::{message::BorrowedMessage, Message}; -use tracing::warn; -use types::Event; - pub mod app_context; pub mod config; pub mod metrics_consts; pub mod types; - -// This copies event properties, which means the total resident memory usage is higher than we'd like, and that constrains -// our batch size. serde_json provides no zero-copy way to parse a JSON object, so we're stuck with this for now. -pub fn message_to_event(msg: BorrowedMessage) -> Option { - let Some(payload) = msg.payload() else { - warn!("Received empty event"); - metrics::counter!(EMPTY_EVENTS).increment(1); - return None; - }; - - let event = serde_json::from_slice::(payload); - let event = match event { - Ok(e) => e, - Err(e) => { - metrics::counter!(EVENT_PARSE_ERROR).increment(1); - warn!("Failed to parse event: {:?}", e); - return None; - } - }; - Some(event) -} - -pub fn retain_from(buffer: &mut Vec, from: usize, predicate: impl Fn(&T) -> bool) { - let mut i = from; - while i < buffer.len() { - if !predicate(&buffer[i]) { - buffer.swap_remove(i); - } else { - i += 1; - } - } -} diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs index 21604910369a3..fb8beba248240 100644 --- a/rust/property-defs-rs/src/main.rs +++ b/rust/property-defs-rs/src/main.rs @@ -2,25 +2,22 @@ use std::{sync::Arc, time::Duration}; use ahash::AHashSet; use axum::{routing::get, Router}; -use envconfig::Envconfig; +use common_kafka::kafka_consumer::{RecvErr, SingleTopicConsumer}; + use futures::future::ready; use property_defs_rs::{ app_context::AppContext, config::{Config, TeamFilterMode, TeamList}, - message_to_event, metrics_consts::{ - BATCH_ACQUIRE_TIME, CACHE_CONSUMED, COMPACTED_UPDATES, EVENTS_RECEIVED, FORCED_SMALL_BATCH, - ISSUE_FAILED, PERMIT_WAIT_TIME, RECV_DEQUEUED, SKIPPED_DUE_TO_TEAM_FILTER, - TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE, UPDATES_PER_EVENT, UPDATES_SEEN, - UPDATE_ISSUE_TIME, WORKER_BLOCKED, + BATCH_ACQUIRE_TIME, CACHE_CONSUMED, COMPACTED_UPDATES, EMPTY_EVENTS, EVENTS_RECEIVED, + EVENT_PARSE_ERROR, FORCED_SMALL_BATCH, ISSUE_FAILED, PERMIT_WAIT_TIME, RECV_DEQUEUED, + SKIPPED_DUE_TO_TEAM_FILTER, TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE, + UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED, }, - types::Update, + types::{Event, Update}, }; use quick_cache::sync::Cache; -use rdkafka::{ - consumer::{Consumer, StreamConsumer}, - ClientConfig, -}; + use serve_metrics::{serve, setup_metrics_routes}; use tokio::{ sync::{ @@ -66,7 +63,7 @@ fn start_health_liveness_server(config: &Config, context: Arc) -> Jo } async fn spawn_producer_loop( - consumer: Arc, + consumer: SingleTopicConsumer, channel: mpsc::Sender, shared_cache: Arc>, skip_threshold: usize, @@ -77,15 +74,26 @@ async fn spawn_producer_loop( let mut batch = AHashSet::with_capacity(compaction_batch_size); let mut last_send = tokio::time::Instant::now(); loop { - let message = consumer - .recv() - .await - .expect("TODO - workers panic on kafka recv fail"); - - let Some(event) = message_to_event(message) else { - continue; + let (event, offset): (Event, _) = match consumer.json_recv().await { + Ok(r) => r, + Err(RecvErr::Empty) => { + warn!("Received empty event"); + metrics::counter!(EMPTY_EVENTS).increment(1); + continue; + } + Err(RecvErr::Serde(e)) => { + metrics::counter!(EVENT_PARSE_ERROR).increment(1); + warn!("Failed to parse event: {:?}", e); + continue; + } + Err(RecvErr::Kafka(e)) => { + panic!("Kafka error: {:?}", e); // We just panic if we fail to recv from kafka, if it's down, we're down + } }; + // Panicking on offset store failure, same reasoning as the panic above - if kafka's down, we're down + offset.store().expect("Failed to store offset"); + if !team_filter_mode.should_process(&team_list.teams, event.team_id) { metrics::counter!(SKIPPED_DUE_TO_TEAM_FILTER).increment(1); continue; @@ -143,17 +151,16 @@ async fn main() -> Result<(), Box> { setup_tracing(); info!("Starting up..."); - let config = Config::init_from_env()?; - - let kafka_config: ClientConfig = (&config.kafka).into(); + let config = Config::init_with_defaults()?; - let consumer: Arc = Arc::new(kafka_config.create()?); + let consumer = SingleTopicConsumer::new(config.kafka.clone(), config.consumer.clone())?; let context = Arc::new(AppContext::new(&config).await?); - consumer.subscribe(&[config.kafka.event_topic.as_str()])?; - - info!("Subscribed to topic: {}", config.kafka.event_topic); + info!( + "Subscribed to topic: {}", + config.consumer.kafka_consumer_topic + ); start_health_liveness_server(&config, context.clone());