From f6c85f4b42fccc1bbfcb234a30a6b563adf04e44 Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Mon, 20 Jan 2025 13:55:43 -0500 Subject: [PATCH] feat(ourlogs): Allow log ingestion behind a flag (#4448) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds log ingestion (currently only working for the OTel log format) behind a feature flag 'organizations:ourlogs-ingestion'. This PR aims to be the minimum possible to support local and test-org ingestion before we move to dogfooding. ![Screenshot 2025-01-15 at 1 26 49 PM](https://github.com/user-attachments/assets/45f0be20-05d8-45b4-988c-d00327fc2e9c) Other notes: - We need to add two `DataCategory`s because we need to track quantity (for current discarded breadcrumb client outcome tracking) and also bytes for total log bytes ingested, which is one of the quota recommendations. - Eventually we will convert Breadcrumbs into logs as well, very similar to span extraction for spans on the event. How exactly that will work is still being discussed with product and sdk folks. - The name `ourlogs` is an internal name to disambiguate between 'our log product' logs (corny, I know) and internally created logs. User facing strings will be set to 'Log' to avoid exposing implementation details. - Depends on https://github.com/getsentry/sentry/pull/81930 for the ingest feature flag. --- CHANGELOG.md | 1 + Cargo.lock | 21 +- Cargo.toml | 3 +- py/sentry_relay/consts.py | 2 +- relay-cabi/include/relay.h | 4 +- relay-cogs/src/lib.rs | 3 + relay-config/src/config.rs | 8 + relay-dynamic-config/src/feature.rs | 6 +- relay-event-schema/src/processor/attrs.rs | 2 + relay-event-schema/src/processor/traits.rs | 2 + relay-event-schema/src/protocol/mod.rs | 2 + relay-event-schema/src/protocol/ourlog.rs | 243 ++++++++++++++++++ relay-kafka/src/config.rs | 6 +- relay-ourlogs/Cargo.toml | 29 +++ relay-ourlogs/src/lib.rs | 13 + relay-ourlogs/src/ourlog.rs | 204 +++++++++++++++ relay-pii/src/selector.rs | 1 + relay-server/Cargo.toml | 63 ++--- relay-server/src/envelope.rs | 15 ++ relay-server/src/services/outcome.rs | 4 + relay-server/src/services/processor.rs | 48 ++++ relay-server/src/services/processor/event.rs | 2 + relay-server/src/services/processor/ourlog.rs | 105 ++++++++ relay-server/src/services/store.rs | 72 ++++++ relay-server/src/utils/managed_envelope.rs | 15 ++ relay-server/src/utils/rate_limits.rs | 74 ++++++ relay-server/src/utils/sizes.rs | 3 + requirements-dev.txt | 4 +- tests/integration/conftest.py | 1 + tests/integration/fixtures/processing.py | 25 ++ tests/integration/test_ourlogs.py | 118 +++++++++ 31 files changed, 1058 insertions(+), 41 deletions(-) create mode 100644 relay-event-schema/src/protocol/ourlog.rs create mode 100644 relay-ourlogs/Cargo.toml create mode 100644 relay-ourlogs/src/lib.rs create mode 100644 relay-ourlogs/src/ourlog.rs create mode 100644 relay-server/src/services/processor/ourlog.rs create mode 100644 tests/integration/test_ourlogs.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ed236fb2a33..1a939124030 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ **Internal**: - Updates performance score calculation on spans and events to also store cdf values as measurements. ([#4438](https://github.com/getsentry/relay/pull/4438)) +- Allow log ingestion behind a flag, only for internal use currently. ([#4448](https://github.com/getsentry/relay/pull/4448)) ## 24.12.2 diff --git a/Cargo.lock b/Cargo.lock index 8cbfb46291c..ccd506d231e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2854,6 +2854,7 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand", + "serde_json", "thiserror", ] @@ -3767,6 +3768,20 @@ dependencies = [ "uuid", ] +[[package]] +name = "relay-ourlogs" +version = "24.12.1" +dependencies = [ + "chrono", + "hex", + "insta", + "once_cell", + "opentelemetry-proto", + "relay-event-schema", + "relay-protocol", + "serde_json", +] + [[package]] name = "relay-pattern" version = "25.1.0" @@ -3984,6 +3999,7 @@ dependencies = [ "relay-log", "relay-metrics", "relay-monitors", + "relay-ourlogs", "relay-pii", "relay-profiling", "relay-protocol", @@ -4460,15 +4476,16 @@ dependencies = [ [[package]] name = "sentry-kafka-schemas" -version = "0.1.122" +version = "0.1.129" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6636bbc9fda2c104d326386bf8fdcc36d4031bca525a74a970ad8bbecb7570d2" +checksum = "790627715d4ea0e58e252dcb657a44146fde401b5520bbbc3b6500764ef71c86" dependencies = [ "jsonschema", "serde", "serde_json", "serde_yaml", "thiserror", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 356c267556c..a5bd8f4454b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ relay-kafka = { path = "relay-kafka" } relay-log = { path = "relay-log" } relay-metrics = { path = "relay-metrics" } relay-monitors = { path = "relay-monitors" } +relay-ourlogs = { path = "relay-ourlogs" } relay-pattern = { path = "relay-pattern" } relay-pii = { path = "relay-pii" } relay-profiling = { path = "relay-profiling" } @@ -150,7 +151,7 @@ reqwest = "0.12.9" rmp-serde = "1.3.0" sentry = "0.34.0" sentry-core = "0.34.0" -sentry-kafka-schemas = { version = "0.1.122", default-features = false } +sentry-kafka-schemas = { version = "0.1.129", default-features = false } sentry-release-parser = { version = "1.3.2", default-features = false } sentry-types = "0.34.0" semver = "1.0.23" diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index 825f7e0a119..b841567ac3d 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -8,7 +8,7 @@ class DataCategory(IntEnum): - # begin generated + # start generated DEFAULT = 0 ERROR = 1 TRANSACTION = 2 diff --git a/relay-cabi/include/relay.h b/relay-cabi/include/relay.h index db50243b8f0..a191829ed6a 100644 --- a/relay-cabi/include/relay.h +++ b/relay-cabi/include/relay.h @@ -143,14 +143,14 @@ enum RelayDataCategory { */ RELAY_DATA_CATEGORY_ATTACHMENT_ITEM = 22, /** - * LogCount + * LogItem * * This is the category for logs for which we store the count log events for users for measuring * missing breadcrumbs, and count of logs for rate limiting purposes. */ RELAY_DATA_CATEGORY_LOG_ITEM = 23, /** - * LogBytes + * LogByte * * This is the category for logs for which we store log event total bytes for users. */ diff --git a/relay-cogs/src/lib.rs b/relay-cogs/src/lib.rs index 46c13bf46b1..d29f8fa392b 100644 --- a/relay-cogs/src/lib.rs +++ b/relay-cogs/src/lib.rs @@ -117,6 +117,8 @@ pub enum AppFeature { Transactions, /// Errors. Errors, + /// Logs. + Logs, /// Spans. Spans, /// Sessions. @@ -159,6 +161,7 @@ impl AppFeature { Self::Transactions => "transactions", Self::Errors => "errors", Self::Spans => "spans", + Self::Logs => "our_logs", Self::Sessions => "sessions", Self::ClientReports => "client_reports", Self::CheckIns => "check_ins", diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 93c8b152a28..fa41ef8b1dd 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -617,6 +617,8 @@ pub struct Limits { /// The maximum payload size for a profile pub max_profile_size: ByteSize, /// The maximum payload size for a span. + pub max_log_size: ByteSize, + /// The maximum payload size for a span. pub max_span_size: ByteSize, /// The maximum payload size for a statsd metric. pub max_statsd_size: ByteSize, @@ -683,6 +685,7 @@ impl Default for Limits { max_api_file_upload_size: ByteSize::mebibytes(40), max_api_chunk_upload_size: ByteSize::mebibytes(100), max_profile_size: ByteSize::mebibytes(50), + max_log_size: ByteSize::mebibytes(1), max_span_size: ByteSize::mebibytes(1), max_statsd_size: ByteSize::mebibytes(1), max_metric_buckets_size: ByteSize::mebibytes(1), @@ -2213,6 +2216,11 @@ impl Config { self.values.limits.max_check_in_size.as_bytes() } + /// Returns the maximum payload size of a log in bytes. + pub fn max_log_size(&self) -> usize { + self.values.limits.max_log_size.as_bytes() + } + /// Returns the maximum payload size of a span in bytes. pub fn max_span_size(&self) -> usize { self.values.limits.max_span_size.as_bytes() diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 2609729e3d2..8144aa9fd6f 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -102,7 +102,11 @@ pub enum Feature { /// Serialized as `organizations:ingest-spans-in-eap` #[serde(rename = "organizations:ingest-spans-in-eap")] IngestSpansInEap, - + /// Enable log ingestion for our log product (this is not internal logging). + /// + /// Serialized as `organizations:ourlogs-ingestion`. + #[serde(rename = "organizations:ourlogs-ingestion")] + OurLogsIngestion, /// This feature has graduated and is hard-coded for external Relays. #[doc(hidden)] #[serde(rename = "projects:profiling-ingest-unsampled-profiles")] diff --git a/relay-event-schema/src/processor/attrs.rs b/relay-event-schema/src/processor/attrs.rs index b8772a09dcf..563b1e65ace 100644 --- a/relay-event-schema/src/processor/attrs.rs +++ b/relay-event-schema/src/processor/attrs.rs @@ -46,6 +46,7 @@ pub enum ValueType { Message, Thread, Breadcrumb, + OurLog, Span, ClientSdkInfo, @@ -84,6 +85,7 @@ relay_common::derive_fromstr_and_display!(ValueType, UnknownValueTypeError, { ValueType::Message => "message", ValueType::Thread => "thread", ValueType::Breadcrumb => "breadcrumb", + ValueType::OurLog => "ourlog", ValueType::Span => "span", ValueType::ClientSdkInfo => "sdk", ValueType::Minidump => "minidump", diff --git a/relay-event-schema/src/processor/traits.rs b/relay-event-schema/src/processor/traits.rs index ac1a78728d7..9c5491872d4 100644 --- a/relay-event-schema/src/processor/traits.rs +++ b/relay-event-schema/src/processor/traits.rs @@ -108,10 +108,12 @@ pub trait Processor: Sized { process_method!(process_breadcrumb, crate::protocol::Breadcrumb); process_method!(process_template_info, crate::protocol::TemplateInfo); process_method!(process_header_name, crate::protocol::HeaderName); + process_method!(process_ourlog, crate::protocol::OurLog); process_method!(process_span, crate::protocol::Span); process_method!(process_trace_context, crate::protocol::TraceContext); process_method!(process_native_image_path, crate::protocol::NativeImagePath); process_method!(process_contexts, crate::protocol::Contexts); + process_method!(process_attribute_value, crate::protocol::AttributeValue); fn process_other( &mut self, diff --git a/relay-event-schema/src/protocol/mod.rs b/relay-event-schema/src/protocol/mod.rs index 7827447ce58..fc1c9307787 100644 --- a/relay-event-schema/src/protocol/mod.rs +++ b/relay-event-schema/src/protocol/mod.rs @@ -18,6 +18,7 @@ mod mechanism; mod metrics; mod metrics_summary; mod nel; +mod ourlog; mod relay_info; mod replay; mod request; @@ -54,6 +55,7 @@ pub use self::mechanism::*; pub use self::metrics::*; pub use self::metrics_summary::*; pub use self::nel::*; +pub use self::ourlog::*; pub use self::relay_info::*; pub use self::replay::*; pub use self::request::*; diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs new file mode 100644 index 00000000000..e4eea9882ff --- /dev/null +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -0,0 +1,243 @@ +use relay_protocol::{ + Annotated, Empty, Error, FromValue, IntoValue, Object, SkipSerialization, Value, +}; + +use serde::ser::SerializeMap; + +use crate::processor::ProcessValue; +use crate::protocol::{SpanId, TraceId}; + +#[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] +#[metastructure(process_func = "process_ourlog", value_type = "OurLog")] +pub struct OurLog { + /// Time when the event occurred. + #[metastructure(required = true, trim = false)] + pub timestamp_nanos: Annotated, + + /// Time when the event was observed. + #[metastructure(required = true, trim = false)] + pub observed_timestamp_nanos: Annotated, + + /// The ID of the trace the log belongs to. + #[metastructure(required = false, trim = false)] + pub trace_id: Annotated, + /// The Span id. + /// + #[metastructure(required = false, trim = false)] + pub span_id: Annotated, + + /// Trace flag bitfield. + #[metastructure(required = false)] + pub trace_flags: Annotated, + + /// This is the original string representation of the severity as it is known at the source + #[metastructure(required = false, max_chars = 32, pii = "true", trim = false)] + pub severity_text: Annotated, + + /// Numerical representation of the severity level + #[metastructure(required = false)] + pub severity_number: Annotated, + + /// Log body. + #[metastructure(required = true, pii = "true", trim = false)] + pub body: Annotated, + + /// Arbitrary attributes on a log. + #[metastructure(pii = "true", trim = false)] + pub attributes: Annotated>, + + /// Additional arbitrary fields for forwards compatibility. + #[metastructure(additional_properties, retain = true, pii = "maybe", trim = false)] + pub other: Object, +} + +#[derive(Debug, Clone, PartialEq, ProcessValue)] +pub enum AttributeValue { + #[metastructure(field = "string_value", pii = "true")] + StringValue(String), + #[metastructure(field = "int_value", pii = "true")] + IntValue(i64), + #[metastructure(field = "double_value", pii = "true")] + DoubleValue(f64), + #[metastructure(field = "bool_value", pii = "true")] + BoolValue(bool), + /// Any other unknown attribute value. + /// + /// This exists to ensure other attribute values such as array and object can be added in the future. + Unknown(String), +} + +impl IntoValue for AttributeValue { + fn into_value(self) -> Value { + let mut map = Object::new(); + match self { + AttributeValue::StringValue(v) => { + map.insert("string_value".to_string(), Annotated::new(Value::String(v))); + } + AttributeValue::IntValue(v) => { + map.insert("int_value".to_string(), Annotated::new(Value::I64(v))); + } + AttributeValue::DoubleValue(v) => { + map.insert("double_value".to_string(), Annotated::new(Value::F64(v))); + } + AttributeValue::BoolValue(v) => { + map.insert("bool_value".to_string(), Annotated::new(Value::Bool(v))); + } + AttributeValue::Unknown(v) => { + map.insert("unknown".to_string(), Annotated::new(Value::String(v))); + } + } + Value::Object(map) + } + + fn serialize_payload(&self, s: S, _behavior: SkipSerialization) -> Result + where + Self: Sized, + S: serde::Serializer, + { + let mut map = s.serialize_map(None)?; + match self { + AttributeValue::StringValue(v) => { + map.serialize_entry("string_value", v)?; + } + AttributeValue::IntValue(v) => { + map.serialize_entry("int_value", v)?; + } + AttributeValue::DoubleValue(v) => { + map.serialize_entry("double_value", v)?; + } + AttributeValue::BoolValue(v) => { + map.serialize_entry("bool_value", v)?; + } + AttributeValue::Unknown(v) => { + map.serialize_entry("unknown", v)?; + } + } + map.end() + } +} + +impl AttributeValue { + pub fn string_value(&self) -> Option<&String> { + match self { + AttributeValue::StringValue(s) => Some(s), + _ => None, + } + } + pub fn int_value(&self) -> Option { + match self { + AttributeValue::IntValue(i) => Some(*i), + _ => None, + } + } + pub fn double_value(&self) -> Option { + match self { + AttributeValue::DoubleValue(d) => Some(*d), + _ => None, + } + } + pub fn bool_value(&self) -> Option { + match self { + AttributeValue::BoolValue(b) => Some(*b), + _ => None, + } + } +} + +impl Empty for AttributeValue { + #[inline] + fn is_empty(&self) -> bool { + matches!(self, Self::Unknown(_)) + } +} + +impl FromValue for AttributeValue { + fn from_value(value: Annotated) -> Annotated { + match value { + Annotated(Some(Value::String(value)), meta) => { + Annotated(Some(AttributeValue::StringValue(value)), meta) + } + Annotated(Some(Value::I64(value)), meta) => { + Annotated(Some(AttributeValue::IntValue(value)), meta) + } + Annotated(Some(Value::F64(value)), meta) => { + Annotated(Some(AttributeValue::DoubleValue(value)), meta) + } + Annotated(Some(Value::Bool(value)), meta) => { + Annotated(Some(AttributeValue::BoolValue(value)), meta) + } + Annotated(Some(value), mut meta) => { + meta.add_error(Error::expected( + "a valid attribute value (string, int, double, bool)", + )); + meta.set_original_value(Some(value)); + Annotated(None, meta) + } + Annotated(None, meta) => Annotated(None, meta), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ourlog_serialization() { + let json = r#"{ + "timestamp_nanos": 1544712660300000000, + "observed_timestamp_nanos": 1544712660300000000, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "severity_text": "Information", + "severity_number": 10, + "body": "Example log record", + "attributes": { + "boolean.attribute": { + "bool_value": true + }, + "double.attribute": { + "double_value": 637.704 + }, + "int.attribute": { + "int_value": 10 + }, + "string.attribute": { + "string_value": "some string" + } + } +}"#; + + let mut attributes = Object::new(); + attributes.insert( + "string.attribute".into(), + Annotated::new(AttributeValue::StringValue("some string".into())), + ); + attributes.insert( + "boolean.attribute".into(), + Annotated::new(AttributeValue::BoolValue(true)), + ); + attributes.insert( + "int.attribute".into(), + Annotated::new(AttributeValue::IntValue(10)), + ); + attributes.insert( + "double.attribute".into(), + Annotated::new(AttributeValue::DoubleValue(637.704)), + ); + + let log = Annotated::new(OurLog { + timestamp_nanos: Annotated::new(1544712660300000000), + observed_timestamp_nanos: Annotated::new(1544712660300000000), + severity_number: Annotated::new(10), + severity_text: Annotated::new("Information".to_string()), + trace_id: Annotated::new(TraceId("5b8efff798038103d269b633813fc60c".into())), + span_id: Annotated::new(SpanId("eee19b7ec3c1b174".into())), + body: Annotated::new("Example log record".to_string()), + attributes: Annotated::new(attributes), + ..Default::default() + }); + + assert_eq!(json, log.to_json_pretty().unwrap()); + } +} diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 7b9e8411241..faaa2b69a00 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -45,6 +45,8 @@ pub enum KafkaTopic { ReplayRecordings, /// Monitor check-ins. Monitors, + /// Logs (our log product). + OurLogs, /// Standalone spans without a transaction. Spans, /// Feedback events topic. @@ -56,7 +58,7 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 13] = [ + static TOPICS: [KafkaTopic; 14] = [ Events, Attachments, Transactions, @@ -68,6 +70,7 @@ impl KafkaTopic { ReplayEvents, ReplayRecordings, Monitors, + OurLogs, Spans, Feedback, ]; @@ -128,6 +131,7 @@ define_topic_assignments! { profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"), replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."), replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."), + ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."), monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."), spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."), feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."), diff --git a/relay-ourlogs/Cargo.toml b/relay-ourlogs/Cargo.toml new file mode 100644 index 00000000000..c132967b924 --- /dev/null +++ b/relay-ourlogs/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "relay-ourlogs" +authors = ["Sentry "] +description = "Log normalization and processing" +homepage = "https://getsentry.github.io/relay/" +repository = "https://github.com/getsentry/relay" +version = "24.12.1" +edition = "2021" +license-file = "../LICENSE" +publish = false + +[lints] +workspace = true + +[dependencies] +chrono = { workspace = true } +hex = { workspace = true } +once_cell = { workspace = true } +opentelemetry-proto = { workspace = true, features = [ + "gen-tonic", + "with-serde", + "logs", +] } +relay-event-schema = { workspace = true } +relay-protocol = { workspace = true } +serde_json = { workspace = true } + +[dev-dependencies] +insta = { workspace = true } diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs new file mode 100644 index 00000000000..38130a4a4c6 --- /dev/null +++ b/relay-ourlogs/src/lib.rs @@ -0,0 +1,13 @@ +//! Structs and functions needed to ingest OpenTelemetry logs. + +#![warn(missing_docs)] +#![doc( + html_logo_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png", + html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" +)] + +pub use crate::ourlog::otel_to_sentry_log; + +pub use opentelemetry_proto::tonic::logs::v1::LogRecord as OtelLog; + +mod ourlog; diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs new file mode 100644 index 00000000000..9662a822a11 --- /dev/null +++ b/relay-ourlogs/src/ourlog.rs @@ -0,0 +1,204 @@ +use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; + +use crate::OtelLog; +use relay_event_schema::protocol::{AttributeValue, OurLog, SpanId, TraceId}; +use relay_protocol::{Annotated, Object}; + +/// Transform an OtelLog to a Sentry log. +pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { + let OtelLog { + severity_number, + severity_text, + body, + attributes, + trace_id, + span_id, + .. + } = otel_log; + + let span_id = hex::encode(span_id); + let trace_id = hex::encode(trace_id); + + let body = body + .and_then(|v| v.value) + .and_then(|v| match v { + OtelValue::StringValue(s) => Some(s), + _ => None, + }) + .unwrap_or_else(String::new); + + let mut attribute_data = Object::new(); + + for attribute in attributes.into_iter() { + if let Some(value) = attribute.value.and_then(|v| v.value) { + let key = attribute.key; + match value { + OtelValue::ArrayValue(_) => {} + OtelValue::BoolValue(v) => { + attribute_data.insert(key, Annotated::new(AttributeValue::BoolValue(v))); + } + OtelValue::BytesValue(v) => { + if let Ok(v) = String::from_utf8(v) { + attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); + } + } + OtelValue::DoubleValue(v) => { + attribute_data.insert(key, Annotated::new(AttributeValue::DoubleValue(v))); + } + OtelValue::IntValue(v) => { + attribute_data.insert(key, Annotated::new(AttributeValue::IntValue(v))); + } + OtelValue::KvlistValue(_) => {} + OtelValue::StringValue(v) => { + attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); + } + } + } + } + + OurLog { + timestamp_nanos: Annotated::new(otel_log.time_unix_nano), + observed_timestamp_nanos: Annotated::new(otel_log.observed_time_unix_nano), + trace_id: TraceId(trace_id).into(), + span_id: Annotated::new(SpanId(span_id)), + trace_flags: Annotated::new(0.0), + severity_text: severity_text.into(), + severity_number: Annotated::new(severity_number as i64), + attributes: attribute_data.into(), + body: Annotated::new(body), + ..Default::default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use relay_protocol::{get_path, get_value}; + + #[test] + fn parse_log() { + // https://github.com/open-telemetry/opentelemetry-proto/blob/c4214b8168d0ce2a5236185efb8a1c8950cccdd6/examples/logs.json + let json = r#"{ + "timeUnixNano": "1544712660300000000", + "observedTimeUnixNano": "1544712660300000000", + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": { + "stringValue": "Example log record" + }, + "attributes": [ + { + "key": "string.attribute", + "value": { + "stringValue": "some string" + } + }, + { + "key": "boolean.attribute", + "value": { + "boolValue": true + } + }, + { + "key": "int.attribute", + "value": { + "intValue": "10" + } + }, + { + "key": "double.attribute", + "value": { + "doubleValue": 637.704 + } + }, + { + "key": "array.attribute", + "value": { + "arrayValue": { + "values": [ + { + "stringValue": "many" + }, + { + "stringValue": "values" + } + ] + } + } + }, + { + "key": "map.attribute", + "value": { + "kvlistValue": { + "values": [ + { + "key": "some.map.key", + "value": { + "stringValue": "some value" + } + } + ] + } + } + } + ] + }"#; + + let otel_log: OtelLog = serde_json::from_str(json).unwrap(); + let our_log: OurLog = otel_to_sentry_log(otel_log); + let annotated_log: Annotated = Annotated::new(our_log); + assert_eq!( + get_path!(annotated_log.body), + Some(&Annotated::new("Example log record".into())) + ); + } + + #[test] + fn parse_log_with_db_attributes() { + let json = r#"{ + "timeUnixNano": "1544712660300000000", + "observedTimeUnixNano": "1544712660300000000", + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": { + "stringValue": "Database query executed" + }, + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "database" + } + }, + { + "key": "db.type", + "value": { + "stringValue": "sql" + } + }, + { + "key": "db.statement", + "value": { + "stringValue": "SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s" + } + } + ] + }"#; + let otel_log: OtelLog = serde_json::from_str(json).unwrap(); + let our_log = otel_to_sentry_log(otel_log); + let annotated_log: Annotated = Annotated::new(our_log); + + assert_eq!( + get_path!(annotated_log.body), + Some(&Annotated::new("Database query executed".into())) + ); + assert_eq!( + get_value!(annotated_log.attributes["db.statement"]!).string_value(), + Some(&"SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into()) + ); + } +} diff --git a/relay-pii/src/selector.rs b/relay-pii/src/selector.rs index c0481107bf2..880a71044c4 100644 --- a/relay-pii/src/selector.rs +++ b/relay-pii/src/selector.rs @@ -135,6 +135,7 @@ impl SelectorPathItem { | ValueType::Message | ValueType::Thread | ValueType::Breadcrumb + | ValueType::OurLog | ValueType::Span | ValueType::Minidump | ValueType::HeapMemory diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index ea6aa647688..89bbd03b9e5 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -13,16 +13,16 @@ publish = false [features] default = [] processing = [ - "dep:minidump", - "dep:symbolic-common", - "dep:symbolic-unreal", - "relay-cardinality/redis", - "relay-config/processing", - "relay-kafka/producer", - "relay-metrics/redis", - "relay-quotas/redis", - "relay-redis/impl", - "relay-sampling/redis", + "dep:minidump", + "dep:symbolic-common", + "dep:symbolic-unreal", + "relay-cardinality/redis", + "relay-config/processing", + "relay-kafka/producer", + "relay-metrics/redis", + "relay-quotas/redis", + "relay-redis/impl", + "relay-sampling/redis", ] [lints] @@ -75,6 +75,7 @@ relay-event-schema = { workspace = true } relay-filter = { workspace = true } relay-kafka = { workspace = true, optional = true } relay-log = { workspace = true, features = ["sentry"] } +relay-ourlogs = { workspace = true } relay-metrics = { workspace = true } relay-monitors = { workspace = true } relay-pii = { workspace = true } @@ -88,10 +89,10 @@ relay-spans = { workspace = true } relay-statsd = { workspace = true } relay-system = { workspace = true } reqwest = { workspace = true, features = [ - "gzip", - "hickory-dns", - "stream", - "native-tls-vendored", + "gzip", + "hickory-dns", + "stream", + "native-tls-vendored", ] } rmp-serde = { workspace = true } serde = { workspace = true } @@ -100,14 +101,14 @@ serde_json = { workspace = true } smallvec = { workspace = true, features = ["drain_filter"] } socket2 = { workspace = true } sqlx = { workspace = true, features = [ - "macros", - "migrate", - "sqlite", - "runtime-tokio", + "macros", + "migrate", + "sqlite", + "runtime-tokio", ], default-features = false } symbolic-common = { workspace = true, optional = true, default-features = false } symbolic-unreal = { workspace = true, optional = true, default-features = false, features = [ - "serde", + "serde", ] } sysinfo = { workspace = true } thiserror = { workspace = true } @@ -115,18 +116,18 @@ tokio = { workspace = true, features = ["sync", "time"] } tokio-util = { workspace = true, default-features = false } tower = { workspace = true, default-features = false, features = ["limit"] } tower-http = { workspace = true, default-features = false, features = [ - "catch-panic", - "compression-br", - "compression-deflate", - "compression-gzip", - "compression-zstd", - "cors", - "decompression-br", - "decompression-deflate", - "decompression-gzip", - "decompression-zstd", - "set-header", - "trace", + "catch-panic", + "compression-br", + "compression-deflate", + "compression-gzip", + "compression-zstd", + "cors", + "decompression-br", + "decompression-deflate", + "decompression-gzip", + "decompression-zstd", + "set-header", + "trace", ] } url = { workspace = true, features = ["serde"] } uuid = { workspace = true, features = ["v5"] } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index e29444199c5..6a3d6101eea 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -117,6 +117,10 @@ pub enum ItemType { ReplayVideo, /// Monitor check-in encoded as JSON. CheckIn, + /// A log from the [OTEL Log format](https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition) + OtelLog, + /// A log for the log product, not internal logs. + Log, /// A standalone span. Span, /// A standalone OpenTelemetry span serialized as JSON. @@ -174,6 +178,8 @@ impl ItemType { Self::ReplayRecording => "replay_recording", Self::ReplayVideo => "replay_video", Self::CheckIn => "check_in", + Self::Log => "log", + Self::OtelLog => "otel_log", Self::Span => "span", Self::OtelSpan => "otel_span", Self::OtelTracesData => "otel_traces_data", @@ -227,6 +233,8 @@ impl std::str::FromStr for ItemType { "replay_recording" => Self::ReplayRecording, "replay_video" => Self::ReplayVideo, "check_in" => Self::CheckIn, + "log" => Self::Log, + "otel_log" => Self::OtelLog, "span" => Self::Span, "otel_span" => Self::OtelSpan, "otel_traces_data" => Self::OtelTracesData, @@ -696,6 +704,10 @@ impl Item { CountFor::Outcomes => smallvec![], }, ItemType::Statsd | ItemType::MetricBuckets => smallvec![], + ItemType::Log | ItemType::OtelLog => smallvec![ + (DataCategory::LogByte, self.len().max(1)), + (DataCategory::LogItem, 1) + ], ItemType::FormData => smallvec![], ItemType::UserReport => smallvec![], ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)], @@ -952,6 +964,8 @@ impl Item { | ItemType::Profile | ItemType::CheckIn | ItemType::Span + | ItemType::Log + | ItemType::OtelLog | ItemType::OtelSpan | ItemType::OtelTracesData | ItemType::ProfileChunk => false, @@ -986,6 +1000,7 @@ impl Item { ItemType::Profile => true, ItemType::CheckIn => false, ItemType::Span => false, + ItemType::Log | ItemType::OtelLog => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, ItemType::ProfileChunk => false, diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index d11f1f9a97c..6b869fe8197 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -457,6 +457,9 @@ pub enum DiscardReason { /// (Relay) Profiling related discard reasons Profiling(&'static str), + /// (Relay) A log that is not valid after normalization. + InvalidLog, + /// (Relay) A span is not valid after normalization. InvalidSpan, @@ -506,6 +509,7 @@ impl DiscardReason { DiscardReason::InvalidReplayRecordingEvent => "invalid_replay_recording", DiscardReason::InvalidReplayVideoEvent => "invalid_replay_video", DiscardReason::Profiling(reason) => reason, + DiscardReason::InvalidLog => "invalid_log", DiscardReason::InvalidSpan => "invalid_span", DiscardReason::FeatureDisabled(_) => "feature_disabled", } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 3417f5ed08a..9418e9065c6 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -86,6 +86,7 @@ mod attachment; mod dynamic_sampling; mod event; mod metrics; +mod ourlog; mod profile; mod profile_chunk; mod replay; @@ -193,6 +194,7 @@ processing_group!(StandaloneGroup, Standalone); processing_group!(ClientReportGroup, ClientReport); processing_group!(ReplayGroup, Replay); processing_group!(CheckInGroup, CheckIn); +processing_group!(LogGroup, Log); processing_group!(SpanGroup, Span); impl Sampling for SpanGroup { @@ -240,6 +242,8 @@ pub enum ProcessingGroup { Replay, /// Crons. CheckIn, + /// Logs. + Log, /// Spans. Span, /// Metrics. @@ -303,6 +307,7 @@ impl ProcessingGroup { &ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData ) }); + if !span_items.is_empty() { grouped_envelopes.push(( ProcessingGroup::Span, @@ -310,6 +315,17 @@ impl ProcessingGroup { )) } + // Extract logs. + let logs_items = + envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLog)); + + if !logs_items.is_empty() { + grouped_envelopes.push(( + ProcessingGroup::Log, + Envelope::from_parts(headers.clone(), logs_items), + )) + } + // Extract all metric items. // // Note: Should only be relevant in proxy mode. In other modes we send metrics through @@ -411,6 +427,7 @@ impl ProcessingGroup { ProcessingGroup::ClientReport => "client_report", ProcessingGroup::Replay => "replay", ProcessingGroup::CheckIn => "check_in", + ProcessingGroup::Log => "log", ProcessingGroup::Span => "span", ProcessingGroup::Metrics => "metrics", ProcessingGroup::ProfileChunk => "profile_chunk", @@ -430,6 +447,7 @@ impl From for AppFeature { ProcessingGroup::ClientReport => AppFeature::ClientReports, ProcessingGroup::Replay => AppFeature::Replays, ProcessingGroup::CheckIn => AppFeature::CheckIns, + ProcessingGroup::Log => AppFeature::Logs, ProcessingGroup::Span => AppFeature::Spans, ProcessingGroup::Metrics => AppFeature::UnattributedMetrics, ProcessingGroup::ProfileChunk => AppFeature::Profiles, @@ -2000,6 +2018,35 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } + /// Process logs + /// + fn process_logs( + &self, + managed_envelope: &mut TypedEnvelope, + project_info: Arc, + #[allow(unused_variables)] rate_limits: Arc, + ) -> Result, ProcessingError> { + #[allow(unused_mut)] + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + + ourlog::filter( + managed_envelope, + self.inner.config.clone(), + project_info.clone(), + ); + if_processing!(self.inner.config, { + self.enforce_quotas( + managed_envelope, + Annotated::empty(), + &mut extracted_metrics, + project_info.clone(), + rate_limits, + )?; + ourlog::process(managed_envelope, project_info.clone()); + }); + Ok(Some(extracted_metrics)) + } + /// Processes standalone spans. /// /// This function does *not* run for spans extracted from transactions. @@ -2155,6 +2202,7 @@ impl EnvelopeProcessorService { ProcessingGroup::CheckIn => { run!(process_checkins, project_id, project_info, rate_limits) } + ProcessingGroup::Log => run!(process_logs, project_info, rate_limits), ProcessingGroup::Span => run!( process_standalone_spans, self.inner.config.clone(), diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index d6032e3e98c..0769e5c8b8f 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -468,6 +468,8 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { ItemType::ReplayRecording => false, ItemType::ReplayVideo => false, ItemType::CheckIn => false, + ItemType::Log => false, + ItemType::OtelLog => false, ItemType::Span => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs new file mode 100644 index 00000000000..d4a6ad8bbfa --- /dev/null +++ b/relay-server/src/services/processor/ourlog.rs @@ -0,0 +1,105 @@ +//! Log processing code. +use std::sync::Arc; + +use crate::services::processor::LogGroup; +use relay_config::Config; +use relay_dynamic_config::Feature; + +use crate::services::processor::should_filter; +use crate::services::projects::project::ProjectInfo; +use crate::utils::{ItemAction, TypedEnvelope}; + +#[cfg(feature = "processing")] +use { + crate::envelope::ContentType, + crate::envelope::{Item, ItemType}, + crate::services::outcome::{DiscardReason, Outcome}, + crate::services::processor::ProcessingError, + relay_dynamic_config::ProjectConfig, + relay_event_schema::processor::{process_value, ProcessingState}, + relay_event_schema::protocol::OurLog, + relay_ourlogs::OtelLog, + relay_pii::PiiProcessor, + relay_protocol::Annotated, +}; + +/// Removes logs from the envelope if the feature is not enabled. +pub fn filter( + managed_envelope: &mut TypedEnvelope, + config: Arc, + project_info: Arc, +) { + let logging_disabled = should_filter(&config, &project_info, Feature::OurLogsIngestion); + managed_envelope.retain_items(|_| { + if logging_disabled { + ItemAction::DropSilently + } else { + ItemAction::Keep + } + }); +} + +/// Processes logs. +#[cfg(feature = "processing")] +pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc) { + managed_envelope.retain_items(|item| { + let mut annotated_log = match item.ty() { + ItemType::OtelLog => match serde_json::from_slice::(&item.payload()) { + Ok(otel_log) => Annotated::new(relay_ourlogs::otel_to_sentry_log(otel_log)), + Err(err) => { + relay_log::debug!("failed to parse OTel Log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); + } + }, + ItemType::Log => match Annotated::::from_json_bytes(&item.payload()) { + Ok(our_log) => our_log, + Err(err) => { + relay_log::debug!("failed to parse Sentry Log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); + } + }, + + _ => return ItemAction::Keep, + }; + + if let Err(e) = scrub(&mut annotated_log, &project_info.config) { + relay_log::error!("failed to scrub pii from log: {}", e); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); + } + + let mut new_item = Item::new(ItemType::Log); + let payload = match annotated_log.to_json() { + Ok(payload) => payload, + Err(err) => { + relay_log::debug!("failed to serialize log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); + } + }; + new_item.set_payload(ContentType::Json, payload); + + *item = new_item; + + ItemAction::Keep + }); +} + +#[cfg(feature = "processing")] +fn scrub( + annotated_log: &mut Annotated, + project_config: &ProjectConfig, +) -> Result<(), ProcessingError> { + if let Some(ref config) = project_config.pii_config { + let mut processor = PiiProcessor::new(config.compiled()); + process_value(annotated_log, &mut processor, ProcessingState::root())?; + } + let pii_config = project_config + .datascrubbing_settings + .pii_config() + .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?; + if let Some(config) = pii_config { + let mut processor = PiiProcessor::new(config.compiled()); + process_value(annotated_log, &mut processor, ProcessingState::root())?; + } + + Ok(()) +} diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 68bdee36106..6f1ea14d680 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -290,6 +290,7 @@ impl StoreService { ItemType::Span => { self.produce_span(scoping, received_at, event_id, retention, item)? } + ItemType::Log => self.produce_log(scoping, received_at, retention, item)?, ItemType::ProfileChunk => self.produce_profile_chunk( scoping.organization_id, scoping.project_id, @@ -940,6 +941,52 @@ impl StoreService { scoping, timestamp: received_at, }); + Ok(()) + } + + fn produce_log( + &self, + scoping: Scoping, + received_at: DateTime, + retention_days: u16, + item: &Item, + ) -> Result<(), StoreError> { + relay_log::trace!("Producing log"); + let payload = item.payload(); + let payload_len = payload.len(); + + let message = KafkaMessage::Log { + headers: BTreeMap::from([("project_id".to_string(), scoping.project_id.to_string())]), + message: LogKafkaMessage { + payload, + organization_id: scoping.organization_id.value(), + project_id: scoping.project_id.value(), + retention_days, + received: safe_timestamp(received_at), + }, + }; + + self.produce(KafkaTopic::OurLogs, message)?; + + // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes. + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogItem, + event_id: None, + outcome: Outcome::Accepted, + quantity: 1, + remote_addr: None, + scoping, + timestamp: received_at, + }); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogByte, + event_id: None, + outcome: Outcome::Accepted, + quantity: payload_len as u32, + remote_addr: None, + scoping, + timestamp: received_at, + }); Ok(()) } @@ -1306,6 +1353,17 @@ struct SpanKafkaMessage<'a> { platform: Cow<'a, str>, // We only use this for logging for now } +#[derive(Debug, Deserialize, Serialize)] +struct LogKafkaMessage { + /// Raw log payload. + payload: Bytes, + organization_id: u64, + project_id: u64, + /// Number of days until these data should be deleted. + retention_days: u16, + received: u64, +} + fn none_or_empty_object(value: &Option<&RawValue>) -> bool { match value { None => true, @@ -1347,6 +1405,12 @@ enum KafkaMessage<'a> { #[serde(flatten)] message: SpanKafkaMessage<'a>, }, + Log { + #[serde(skip)] + headers: BTreeMap, + #[serde(flatten)] + message: LogKafkaMessage, + }, ProfileChunk(ProfileChunkKafkaMessage), } @@ -1369,6 +1433,7 @@ impl Message for KafkaMessage<'_> { KafkaMessage::ReplayEvent(_) => "replay_event", KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked", KafkaMessage::CheckIn(_) => "check_in", + KafkaMessage::Log { .. } => "log", KafkaMessage::Span { .. } => "span", KafkaMessage::ProfileChunk(_) => "profile_chunk", } @@ -1392,6 +1457,7 @@ impl Message for KafkaMessage<'_> { // Random partitioning Self::Profile(_) | Self::Span { .. } + | Self::Log { .. } | Self::ReplayRecordingNotChunked(_) | Self::ProfileChunk(_) => Uuid::nil(), @@ -1420,6 +1486,12 @@ impl Message for KafkaMessage<'_> { } None } + KafkaMessage::Log { headers, .. } => { + if !headers.is_empty() { + return Some(headers); + } + None + } KafkaMessage::Span { headers, .. } => { if !headers.is_empty() { return Some(headers); diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index ad9c6234643..0ca927b3d87 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -414,6 +414,21 @@ impl ManagedEnvelope { ); } + if self.context.summary.log_item_quantity > 0 { + self.track_outcome( + outcome.clone(), + DataCategory::LogItem, + self.context.summary.log_item_quantity, + ); + } + if self.context.summary.log_byte_quantity > 0 { + self.track_outcome( + outcome.clone(), + DataCategory::LogByte, + self.context.summary.log_byte_quantity, + ); + } + // Track outcomes for attached secondary transactions, e.g. extracted from metrics. // // Primary transaction count is already tracked through the event category diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index c2d1ac98547..54347e7ccf5 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -128,6 +128,8 @@ fn infer_event_category(item: &Item) -> Option { ItemType::ReplayVideo => None, ItemType::ClientReport => None, ItemType::CheckIn => None, + ItemType::Log => None, + ItemType::OtelLog => None, ItemType::Span => None, ItemType::OtelSpan => None, ItemType::OtelTracesData => None, @@ -164,6 +166,12 @@ pub struct EnvelopeSummary { /// The number of monitor check-ins. pub monitor_quantity: usize, + /// The number of log for the log product sent. + pub log_item_quantity: usize, + + /// The number of log bytes for the log product sent, in bytes + pub log_byte_quantity: usize, + /// Secondary number of transactions. /// /// This is 0 for envelopes which contain a transaction, @@ -221,6 +229,7 @@ impl EnvelopeSummary { } summary.payload_size += item.len(); + for (category, quantity) in item.quantities(CountFor::RateLimits) { summary.add_quantity(category, quantity); } @@ -239,6 +248,8 @@ impl EnvelopeSummary { DataCategory::ReplayVideo => &mut self.replay_quantity, DataCategory::Monitor => &mut self.monitor_quantity, DataCategory::Span => &mut self.span_quantity, + DataCategory::LogItem => &mut self.log_item_quantity, + DataCategory::LogByte => &mut self.log_byte_quantity, DataCategory::ProfileChunk => &mut self.profile_chunk_quantity, // TODO: This catch-all return looks dangerous _ => return, @@ -344,6 +355,10 @@ pub struct Enforcement { pub replays: CategoryLimit, /// The combined check-in item rate limit. pub check_ins: CategoryLimit, + /// The combined logs (our product logs) rate limit. + pub log_items: CategoryLimit, + /// The combined logs (our product logs) rate limit. + pub log_bytes: CategoryLimit, /// The combined spans rate limit. pub spans: CategoryLimit, /// The rate limit for the indexed span category. @@ -385,6 +400,8 @@ impl Enforcement { profiles_indexed, replays, check_ins, + log_items, + log_bytes, spans, spans_indexed, user_reports_v2, @@ -400,6 +417,8 @@ impl Enforcement { profiles_indexed, replays, check_ins, + log_items, + log_bytes, spans, spans_indexed, user_reports_v2, @@ -488,6 +507,9 @@ impl Enforcement { ItemType::ReplayVideo => !self.replays.is_active(), ItemType::ReplayRecording => !self.replays.is_active(), ItemType::CheckIn => !self.check_ins.is_active(), + ItemType::OtelLog | ItemType::Log => { + !(self.log_items.is_active() || self.log_bytes.is_active()) + } ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => { !self.spans_indexed.is_active() } @@ -700,6 +722,28 @@ where rate_limits.merge(session_limits); } + // Handle logs. + if summary.log_item_quantity > 0 { + let item_scoping = scoping.item(DataCategory::LogItem); + let log_limits = self.check.apply(item_scoping, summary.log_item_quantity)?; + enforcement.log_items = CategoryLimit::new( + DataCategory::LogItem, + summary.log_item_quantity, + log_limits.longest(), + ); + rate_limits.merge(log_limits); + } + if summary.log_byte_quantity > 0 { + let item_scoping = scoping.item(DataCategory::LogByte); + let log_limits = self.check.apply(item_scoping, summary.log_byte_quantity)?; + enforcement.log_bytes = CategoryLimit::new( + DataCategory::LogByte, + summary.log_byte_quantity, + log_limits.longest(), + ); + rate_limits.merge(log_limits); + } + // Handle profiles. if enforcement.is_event_active() { enforcement.profiles = enforcement @@ -1612,4 +1656,34 @@ mod tests { assert_eq!(summary.profile_quantity, 2); assert_eq!(summary.secondary_transaction_quantity, 7); } + + #[test] + fn test_enforce_limit_logs_count() { + let mut envelope = envelope![Log, Log]; + + let mut mock = MockLimiter::default().deny(DataCategory::LogItem); + let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); + + assert!(limits.is_limited()); + assert_eq!(envelope.envelope().len(), 0); + mock.assert_call(DataCategory::LogItem, 2); + mock.assert_call(DataCategory::LogByte, 20); + + assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]); + } + + #[test] + fn test_enforce_limit_logs_bytes() { + let mut envelope = envelope![Log, Log]; + + let mut mock = MockLimiter::default().deny(DataCategory::LogByte); + let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); + + assert!(limits.is_limited()); + assert_eq!(envelope.envelope().len(), 0); + mock.assert_call(DataCategory::LogItem, 2); + mock.assert_call(DataCategory::LogByte, 20); + + assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]); + } } diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 6c03420c2df..96ec13e3de3 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -15,6 +15,7 @@ use crate::utils::{ItemAction, ManagedEnvelope}; /// - `max_attachments_size` /// - `max_check_in_size` /// - `max_event_size` +/// - `max_log_size` /// - `max_metric_buckets_size` /// - `max_profile_size` /// - `max_replay_compressed_size` @@ -61,6 +62,8 @@ pub fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> Resul ItemType::UserReport => NO_LIMIT, ItemType::Statsd => config.max_statsd_size(), ItemType::MetricBuckets => config.max_metric_buckets_size(), + ItemType::Log => config.max_log_size(), + ItemType::OtelLog => config.max_log_size(), ItemType::Span | ItemType::OtelSpan => config.max_span_size(), ItemType::OtelTracesData => config.max_event_size(), // a spans container similar to `Transaction` ItemType::ProfileChunk => config.max_profile_size(), diff --git a/requirements-dev.txt b/requirements-dev.txt index a3f517ab67a..942dfd3ee4d 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,13 +10,13 @@ devservices==1.0.8 flake8==7.0.0 confluent-kafka==2.1.1 flask==3.0.3 -msgpack==1.0.7 +msgpack==1.1.0 opentelemetry-proto==1.22.0 pytest-localserver==0.8.1 pytest-sentry==0.3.0 pytest-xdist==3.5.0 pytest==7.4.3 -PyYAML==6.0.1 +PyYAML==6.0.2 redis==4.5.4 requests==2.32.2 sentry_sdk==2.10.0 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 97cad120413..a89f638780e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -34,6 +34,7 @@ replay_events_consumer, monitors_consumer, spans_consumer, + ourlogs_consumer, profiles_consumer, feedback_consumer, ) diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 98efbefee38..2a1b19b849f 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -60,6 +60,7 @@ def inner(options=None): "metrics_generic": metrics_topic, "replay_events": get_topic_name("replay_events"), "replay_recordings": get_topic_name("replay_recordings"), + "ourlogs": get_topic_name("ourlogs"), "monitors": get_topic_name("monitors"), "spans": get_topic_name("spans"), "profiles": get_topic_name("profiles"), @@ -348,6 +349,11 @@ def spans_consumer(consumer_fixture): yield from consumer_fixture(SpansConsumer, "spans") +@pytest.fixture +def ourlogs_consumer(consumer_fixture): + yield from consumer_fixture(OurLogsConsumer, "ourlogs") + + @pytest.fixture def profiles_consumer(consumer_fixture): yield from consumer_fixture(ProfileConsumer, "profiles") @@ -508,6 +514,25 @@ def get_spans(self, timeout=None, n=None): return spans +class OurLogsConsumer(ConsumerBase): + def get_ourlog(self): + message = self.poll() + assert message is not None + assert message.error() is None + + message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) + return json.loads(message_dict["payload"].decode("utf8")), message_dict + + def get_ourlogs(self): + ourlogs = [] + for message in self.poll_many(): + assert message is not None + assert message.error() is None + message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) + ourlogs.append(json.loads(message_dict["payload"].decode("utf8"))) + return ourlogs + + class ProfileConsumer(ConsumerBase): def get_profile(self): message = self.poll() diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py new file mode 100644 index 00000000000..6652f9bb486 --- /dev/null +++ b/tests/integration/test_ourlogs.py @@ -0,0 +1,118 @@ +import json +from datetime import datetime, timedelta, timezone + +from sentry_sdk.envelope import Envelope, Item, PayloadRef + + +TEST_CONFIG = { + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + } +} + + +def envelope_with_ourlogs(start: datetime, end: datetime) -> Envelope: + envelope = Envelope() + envelope.add_item( + Item( + type="otel_log", + payload=PayloadRef( + bytes=json.dumps( + { + "timeUnixNano": str(int(start.timestamp() * 1e9)), + "observedTimeUnixNano": str(int(end.timestamp() * 1e9)), + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": {"stringValue": "Example log record"}, + "attributes": [ + { + "key": "string.attribute", + "value": {"stringValue": "some string"}, + }, + {"key": "boolean.attribute", "value": {"boolValue": True}}, + {"key": "int.attribute", "value": {"intValue": "10"}}, + { + "key": "double.attribute", + "value": {"doubleValue": 637.704}, + }, + ], + } + ).encode() + ), + ) + ) + return envelope + + +def test_ourlog_extraction( + mini_sentry, + relay_with_processing, + ourlogs_consumer, +): + ourlogs_consumer = ourlogs_consumer() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:ourlogs-ingestion", + ] + + relay = relay_with_processing(options=TEST_CONFIG) + + duration = timedelta(milliseconds=500) + now = datetime.now(timezone.utc) + end = now - timedelta(seconds=1) + start = end - duration + + # Send OTel log and sentry log via envelope + envelope = envelope_with_ourlogs(start, end) + relay.send_envelope(project_id, envelope) + + ourlogs = ourlogs_consumer.get_ourlogs() + assert len(ourlogs) == 1 + expected_0 = { + "timestamp_nanos": int(start.timestamp() * 1e9), + "observed_timestamp_nanos": int(end.timestamp() * 1e9), + "trace_id": "5b8efff798038103d269b633813fc60c", + "body": "Example log record", + "trace_flags": 0.0, + "span_id": "eee19b7ec3c1b174", + "severity_text": "Information", + "severity_number": 10, + "attributes": { + "string.attribute": {"string_value": "some string"}, + "boolean.attribute": {"bool_value": True}, + "int.attribute": {"int_value": 10}, + "double.attribute": {"double_value": 637.704}, + }, + } + assert ourlogs[0] == expected_0 + + ourlogs_consumer.assert_empty() + + +def test_ourlog_extraction_is_disabled_without_feature( + mini_sentry, + relay_with_processing, + ourlogs_consumer, +): + ourlogs_consumer = ourlogs_consumer() + relay = relay_with_processing(options=TEST_CONFIG) + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [] + + duration = timedelta(milliseconds=500) + now = datetime.now(timezone.utc) + end = now - timedelta(seconds=1) + start = end - duration + + envelope = envelope_with_ourlogs(start, end) + relay.send_envelope(project_id, envelope) + + ourlogs = ourlogs_consumer.get_ourlogs() + assert len(ourlogs) == 0 + + ourlogs_consumer.assert_empty()