From a8bb9f45867ab2435640258ad07babc1d0b8f747 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 27 Jul 2023 16:52:23 -0400 Subject: [PATCH] feat: LogSchema metadata key refacoring (#18099) * feat: LogSchema metadata key refacoring * fix failing test --- lib/codecs/src/decoding/format/syslog.rs | 11 +++--- lib/vector-core/src/config/log_schema.rs | 45 +++++++++++++----------- lib/vector-core/src/config/mod.rs | 7 ++-- lib/vector-core/src/event/trace.rs | 6 ++-- src/sources/http_client/client.rs | 6 ++-- src/transforms/remap.rs | 2 +- 6 files changed, 40 insertions(+), 37 deletions(-) diff --git a/lib/codecs/src/decoding/format/syslog.rs b/lib/codecs/src/decoding/format/syslog.rs index 83870e30cfaa0..a7ab83aebbbaa 100644 --- a/lib/codecs/src/decoding/format/syslog.rs +++ b/lib/codecs/src/decoding/format/syslog.rs @@ -483,9 +483,8 @@ fn insert_fields_from_syslog( #[cfg(test)] mod tests { - use vector_core::config::{init_log_schema, log_schema, LogSchema}; - use super::*; + use vector_core::config::{init_log_schema, log_schema, LogSchema}; #[test] fn deserialize_syslog_legacy_namespace() { @@ -522,8 +521,12 @@ mod tests { fn init() { let mut schema = LogSchema::default(); - schema.set_message_key(Some(owned_value_path!("legacy_message"))); - schema.set_message_key(Some(owned_value_path!("legacy_timestamp"))); + schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!( + "legacy_message" + )))); + schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!( + "legacy_timestamp" + )))); init_log_schema(schema, false); } } diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index 19ad9922cceca..a21c87e4076db 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -1,8 +1,7 @@ -use lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath}; +use lookup::lookup_v2::OptionalTargetPath; use lookup::{OwnedTargetPath, OwnedValuePath}; use once_cell::sync::{Lazy, OnceCell}; use vector_config::configurable_component; -use vrl::path::PathPrefix; static LOG_SCHEMA: OnceCell = OnceCell::new(); static LOG_SCHEMA_DEFAULT: Lazy = Lazy::new(LogSchema::default); @@ -74,7 +73,7 @@ pub struct LogSchema { /// Generally, this field will be set by Vector to hold event-specific metadata, such as /// annotations by the `remap` transform when an error or abort is encountered. #[serde(default = "LogSchema::default_metadata_key")] - metadata_key: OptionalValuePath, + metadata_key: OptionalTargetPath, } impl Default for LogSchema { @@ -106,8 +105,8 @@ impl LogSchema { OptionalTargetPath::event(SOURCE_TYPE) } - fn default_metadata_key() -> OptionalValuePath { - OptionalValuePath::new(METADATA) + fn default_metadata_key() -> OptionalTargetPath { + OptionalTargetPath::event(METADATA) } pub fn message_key(&self) -> Option<&OwnedValuePath> { @@ -140,7 +139,7 @@ impl LogSchema { } pub fn metadata_key(&self) -> Option<&OwnedValuePath> { - self.metadata_key.path.as_ref() + self.metadata_key.as_ref().map(|key| &key.path) } pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> { @@ -159,24 +158,28 @@ impl LogSchema { self.source_type_key.as_ref() } - pub fn set_message_key(&mut self, path: Option) { - self.message_key = OptionalTargetPath::from(PathPrefix::Event, path); + pub fn metadata_key_target_path(&self) -> Option<&OwnedTargetPath> { + self.metadata_key.as_ref() } - pub fn set_timestamp_key(&mut self, path: Option) { - self.timestamp_key = OptionalTargetPath::from(PathPrefix::Event, path); + pub fn set_message_key(&mut self, path: Option) { + self.message_key = OptionalTargetPath { path }; } - pub fn set_host_key(&mut self, path: Option) { - self.host_key = OptionalTargetPath::from(PathPrefix::Event, path); + pub fn set_timestamp_key(&mut self, path: Option) { + self.timestamp_key = OptionalTargetPath { path }; } - pub fn set_source_type_key(&mut self, path: Option) { - self.source_type_key = OptionalTargetPath::from(PathPrefix::Event, path); + pub fn set_host_key(&mut self, path: Option) { + self.host_key = OptionalTargetPath { path }; } - pub fn set_metadata_key(&mut self, path: Option) { - self.metadata_key = OptionalValuePath { path }; + pub fn set_source_type_key(&mut self, path: Option) { + self.source_type_key = OptionalTargetPath { path }; + } + + pub fn set_metadata_key(&mut self, path: Option) { + self.metadata_key = OptionalTargetPath { path }; } /// Merge two `LogSchema` instances together. @@ -195,35 +198,35 @@ impl LogSchema { { errors.push("conflicting values for 'log_schema.host_key' found".to_owned()); } else { - self.set_host_key(other.host_key().cloned()); + self.set_host_key(other.host_key_target_path().cloned()); } if self.message_key() != LOG_SCHEMA_DEFAULT.message_key() && self.message_key() != other.message_key() { errors.push("conflicting values for 'log_schema.message_key' found".to_owned()); } else { - self.set_message_key(other.message_key().cloned()); + self.set_message_key(other.message_key_target_path().cloned()); } if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key() && self.timestamp_key() != other.timestamp_key() { errors.push("conflicting values for 'log_schema.timestamp_key' found".to_owned()); } else { - self.set_timestamp_key(other.timestamp_key().cloned()); + self.set_timestamp_key(other.timestamp_key_target_path().cloned()); } if self.source_type_key() != LOG_SCHEMA_DEFAULT.source_type_key() && self.source_type_key() != other.source_type_key() { errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned()); } else { - self.set_source_type_key(other.source_type_key().cloned()); + self.set_source_type_key(other.source_type_key_target_path().cloned()); } if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key() && self.metadata_key() != other.metadata_key() { errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned()); } else { - self.set_metadata_key(other.metadata_key().cloned()); + self.set_metadata_key(other.metadata_key_target_path().cloned()); } } diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index ed3dde2bae112..c8a1fa48d149b 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -551,15 +551,14 @@ mod test { use chrono::Utc; use lookup::{event_path, owned_value_path, OwnedTargetPath}; use vector_common::btreemap; - use vrl::path::OwnedValuePath; use vrl::value::Kind; #[test] fn test_insert_standard_vector_source_metadata() { - let nested_path = "a.b.c.d".to_string(); - let mut schema = LogSchema::default(); - schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap())); + schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!( + "a", "b", "c", "d" + )))); init_log_schema(schema, false); let namespace = LogNamespace::Legacy; diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index 48744bc73f0af..d5fc7856c9066 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -7,7 +7,6 @@ use vector_common::{ internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags, EventDataEq, }; -use vrl::path::{PathPrefix, ValuePath}; use super::{ BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata, @@ -95,12 +94,11 @@ impl TraceEvent { pub fn maybe_insert<'a, F: FnOnce() -> Value>( &mut self, - prefix: PathPrefix, - path: Option>, + path: Option>, value_callback: F, ) -> Option { if let Some(path) = path { - return self.0.insert((prefix, path), value_callback()); + return self.0.insert(path, value_callback()); } None } diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index a665a1c09c3e9..4fa3773faeaba 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -350,9 +350,9 @@ impl http_client::HttpClientContext for HttpClientContext { } } Event::Trace(ref mut trace) => { - if let Some(source_type_key) = log_schema().source_type_key_target_path() { - trace.insert(source_type_key, Bytes::from(HttpClientConfig::NAME)); - } + trace.maybe_insert(log_schema().source_type_key_target_path(), || { + Bytes::from(HttpClientConfig::NAME).into() + }); } } } diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index d7060c40562cf..50b01d4d6783b 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -487,7 +487,7 @@ where } } Event::Trace(ref mut trace) => { - trace.maybe_insert(PathPrefix::Event, log_schema().metadata_key(), || { + trace.maybe_insert(log_schema().metadata_key_target_path(), || { self.dropped_data(reason, error).into() }); }