diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 211442af80960..736371bfb2c37 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -94,7 +94,7 @@ rand = "0.8.5" rand_distr = "0.4.3" tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] } vector-common = { path = "../vector-common", default-features = false, features = ["test"] } -vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua"] } +vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua", "test"] } [features] api = ["dep:async-graphql"] diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 3ff5152a293a7..71786155d1d8f 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, fmt, num::NonZeroUsize}; use bitmask_enum::bitmask; @@ -111,7 +112,7 @@ pub struct SourceOutput { // NOTE: schema definitions are only implemented/supported for log-type events. There is no // inherent blocker to support other types as well, but it'll require additional work to add // the relevant schemas, and store them separately in this type. - pub schema_definition: Option, + pub schema_definition: Option>, } impl SourceOutput { @@ -129,7 +130,7 @@ impl SourceOutput { Self { port: None, ty, - schema_definition: Some(schema_definition), + schema_definition: Some(Arc::new(schema_definition)), } } @@ -168,17 +169,15 @@ impl SourceOutput { /// Schema enabled is set in the users configuration. #[must_use] pub fn schema_definition(&self, schema_enabled: bool) -> Option { + use std::ops::Deref; + self.schema_definition.as_ref().map(|definition| { if schema_enabled { - definition.clone() + definition.deref().clone() } else { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); - - if definition.log_namespaces().contains(&LogNamespace::Vector) { - new_definition.add_meanings(definition.meanings()); - } - + new_definition.add_meanings(definition.meanings()); new_definition } }) @@ -203,7 +202,7 @@ pub struct TransformOutput { /// enabled, at least one definition should be output. If the transform /// has multiple connected sources, it is possible to have multiple output /// definitions - one for each input. - log_schema_definitions: HashMap, + pub log_schema_definitions: HashMap, } impl TransformOutput { @@ -245,11 +244,7 @@ impl TransformOutput { .map(|(output, definition)| { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); - - if definition.log_namespaces().contains(&LogNamespace::Vector) { - new_definition.add_meanings(definition.meanings()); - } - + new_definition.add_meanings(definition.meanings()); (output.clone(), new_definition) }) .collect() @@ -606,7 +601,10 @@ mod test { // There should be the default legacy definition without schemas enabled. assert_eq!( - Some(schema::Definition::default_legacy_namespace()), + Some( + schema::Definition::default_legacy_namespace() + .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork") + ), output.schema_definition(false) ); } diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index f13bee6a5e009..d86884be7582c 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -7,7 +7,10 @@ use vector_common::{config::ComponentKey, EventDataEq}; use vrl::value::{Kind, Secrets, Value}; use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus}; -use crate::{config::LogNamespace, schema, ByteSizeOf}; +use crate::{ + config::{LogNamespace, OutputId}, + schema, ByteSizeOf, +}; const DATADOG_API_KEY: &str = "datadog_api_key"; const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token"; @@ -30,8 +33,15 @@ pub struct EventMetadata { /// The id of the source source_id: Option>, + /// The id of the component this event originated from. This is used to + /// determine which schema definition to attach to an event in transforms. + /// This should always have a value set for events in transforms. It will always be `None` + /// in a source, and there is currently no use-case for reading the value in a sink. + upstream_id: Option>, + /// An identifier for a globally registered schema definition which provides information about /// the event shape (type information, and semantic meaning of fields). + /// This definition is only currently valid for logs, and shouldn't be used for other event types. /// /// TODO(Jean): must not skip serialization to track schemas across restarts. #[serde(default = "default_schema_definition", skip)] @@ -71,17 +81,29 @@ impl EventMetadata { &mut self.secrets } - /// Returns a reference to the metadata source. + /// Returns a reference to the metadata source id. #[must_use] pub fn source_id(&self) -> Option<&Arc> { self.source_id.as_ref() } + /// Returns a reference to the metadata parent id. This is the `OutputId` + /// of the previous component the event was sent through (if any). + #[must_use] + pub fn upstream_id(&self) -> Option<&OutputId> { + self.upstream_id.as_deref() + } + /// Sets the `source_id` in the metadata to the provided value. pub fn set_source_id(&mut self, source_id: Arc) { self.source_id = Some(source_id); } + /// Sets the `upstream_id` in the metadata to the provided value. + pub fn set_upstream_id(&mut self, upstream_id: Arc) { + self.upstream_id = Some(upstream_id); + } + /// Return the datadog API key, if it exists pub fn datadog_api_key(&self) -> Option> { self.secrets.get(DATADOG_API_KEY).cloned() @@ -111,6 +133,7 @@ impl Default for EventMetadata { finalizers: Default::default(), schema_definition: default_schema_definition(), source_id: None, + upstream_id: None, } } } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index ae2e51e8a23a8..9547f58dc5ed3 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use crate::ByteSizeOf; +use crate::{config::OutputId, ByteSizeOf}; pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray}; pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf; pub use finalization::{ @@ -309,12 +309,24 @@ impl Event { self.metadata_mut().set_source_id(source_id); } + /// Sets the `upstream_id` in the event metadata to the provided value. + pub fn set_upstream_id(&mut self, upstream_id: Arc) { + self.metadata_mut().set_upstream_id(upstream_id); + } + /// Sets the `source_id` in the event metadata to the provided value. #[must_use] pub fn with_source_id(mut self, source_id: Arc) -> Self { self.metadata_mut().set_source_id(source_id); self } + + /// Sets the `upstream_id` in the event metadata to the provided value. + #[must_use] + pub fn with_upstream_id(mut self, upstream_id: Arc) -> Self { + self.metadata_mut().set_upstream_id(upstream_id); + self + } } impl EventDataEq for Event { diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index a60cd85c8200a..af81c51aa69a1 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, error, pin::Pin}; use futures::{Stream, StreamExt}; @@ -7,13 +8,16 @@ use vector_common::internal_event::{ use vector_common::json_size::JsonSize; use vector_common::EventDataEq; +use crate::config::{ComponentKey, OutputId}; +use crate::event::EventMutRef; +use crate::schema::Definition; use crate::{ config, event::{ into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef, }, fanout::{self, Fanout}, - ByteSizeOf, + schema, ByteSizeOf, }; #[cfg(any(feature = "lua"))] @@ -178,6 +182,8 @@ impl SyncTransform for Box { struct TransformOutput { fanout: Fanout, events_sent: Registered, + log_schema_definitions: HashMap>, + output_id: Arc, } pub struct TransformOutputs { @@ -189,6 +195,7 @@ pub struct TransformOutputs { impl TransformOutputs { pub fn new( outputs_in: Vec, + component_key: &ComponentKey, ) -> (Self, HashMap, fanout::ControlChannel>) { let outputs_spec = outputs_in.clone(); let mut primary_output = None; @@ -197,6 +204,13 @@ impl TransformOutputs { for output in outputs_in { let (fanout, control) = Fanout::new(); + + let log_schema_definitions = output + .log_schema_definitions + .into_iter() + .map(|(id, definition)| (id, Arc::new(definition))) + .collect(); + match output.port { None => { primary_output = Some(TransformOutput { @@ -204,6 +218,11 @@ impl TransformOutputs { events_sent: register(EventsSent::from(internal_event::Output(Some( DEFAULT_OUTPUT.into(), )))), + log_schema_definitions, + output_id: Arc::new(OutputId { + component: component_key.clone(), + port: None, + }), }); controls.insert(None, control); } @@ -215,6 +234,11 @@ impl TransformOutputs { events_sent: register(EventsSent::from(internal_event::Output(Some( name.clone().into(), )))), + log_schema_definitions, + output_id: Arc::new(OutputId { + component: component_key.clone(), + port: Some(name.clone()), + }), }, ); controls.insert(Some(name.clone()), control); @@ -246,31 +270,61 @@ impl TransformOutputs { buf: &mut TransformOutputsBuf, ) -> Result<(), Box> { if let Some(primary) = self.primary_output.as_mut() { - let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len); - let byte_size = buf.primary_buffer.as_ref().map_or( - JsonSize::new(0), - EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of, - ); - buf.primary_buffer - .as_mut() - .expect("mismatched outputs") - .send(&mut primary.fanout) - .await?; - primary.events_sent.emit(CountByteSize(count, byte_size)); + let buf = buf.primary_buffer.as_mut().expect("mismatched outputs"); + Self::send_single_buffer(buf, primary).await?; } - for (key, buf) in &mut buf.named_buffers { - let count = buf.len(); - let byte_size = buf.estimated_json_encoded_size_of(); let output = self.named_outputs.get_mut(key).expect("unknown output"); - buf.send(&mut output.fanout).await?; - output.events_sent.emit(CountByteSize(count, byte_size)); + Self::send_single_buffer(buf, output).await?; } + Ok(()) + } + async fn send_single_buffer( + buf: &mut OutputBuffer, + output: &mut TransformOutput, + ) -> Result<(), Box> { + for event in buf.events_mut() { + update_runtime_schema_definition( + event, + &output.output_id, + &output.log_schema_definitions, + ); + } + let count = buf.len(); + let byte_size = buf.estimated_json_encoded_size_of(); + buf.send(&mut output.fanout).await?; + output.events_sent.emit(CountByteSize(count, byte_size)); Ok(()) } } +#[allow(clippy::implicit_hasher)] +/// `event`: The event that will be updated +/// `output_id`: The `output_id` that the current even is being sent to (will be used as the new `parent_id`) +/// `log_schema_definitions`: A mapping of parent `OutputId` to definitions, that will be used to lookup the new runtime definition of the event +pub fn update_runtime_schema_definition( + mut event: EventMutRef, + output_id: &Arc, + log_schema_definitions: &HashMap>, +) { + if let EventMutRef::Log(log) = &mut event { + if let Some(parent_component_id) = log.metadata().upstream_id() { + if let Some(definition) = log_schema_definitions.get(parent_component_id) { + log.metadata_mut().set_schema_definition(definition); + } + } else { + // there is no parent defined. That means this event originated from a component that + // isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the + // schema definitions _must_ be the same, so the first one is picked + if let Some(definition) = log_schema_definitions.values().next() { + log.metadata_mut().set_schema_definition(definition); + } + } + } + event.metadata_mut().set_upstream_id(Arc::clone(output_id)); +} + #[derive(Debug, Clone)] pub struct TransformOutputsBuf { primary_buffer: Option, @@ -299,34 +353,17 @@ impl TransformOutputsBuf { } } - pub fn push(&mut self, event: Event) { - self.primary_buffer - .as_mut() - .expect("no default output") - .push(event); - } - - pub fn push_named(&mut self, name: &str, event: Event) { - self.named_buffers - .get_mut(name) - .expect("unknown output") - .push(event); - } - - pub fn append(&mut self, slice: &mut Vec) { - self.primary_buffer - .as_mut() - .expect("no default output") - .append(slice); - } - - pub fn append_named(&mut self, name: &str, slice: &mut Vec) { - self.named_buffers - .get_mut(name) - .expect("unknown output") - .append(slice); + /// Adds a new event to the transform output buffer + pub fn push(&mut self, name: Option<&str>, event: Event) { + match name { + Some(name) => self.named_buffers.get_mut(name), + None => self.primary_buffer.as_mut(), + } + .expect("unknown output") + .push(event); } + #[cfg(any(feature = "test", test))] pub fn drain(&mut self) -> impl Iterator + '_ { self.primary_buffer .as_mut() @@ -334,6 +371,7 @@ impl TransformOutputsBuf { .drain() } + #[cfg(any(feature = "test", test))] pub fn drain_named(&mut self, name: &str) -> impl Iterator + '_ { self.named_buffers .get_mut(name) @@ -341,33 +379,15 @@ impl TransformOutputsBuf { .drain() } - pub fn extend(&mut self, events: impl Iterator) { - self.primary_buffer - .as_mut() - .expect("no default output") - .extend(events); - } - + #[cfg(any(feature = "test", test))] pub fn take_primary(&mut self) -> OutputBuffer { std::mem::take(self.primary_buffer.as_mut().expect("no default output")) } + #[cfg(any(feature = "test", test))] pub fn take_all_named(&mut self) -> HashMap { std::mem::take(&mut self.named_buffers) } - - pub fn len(&self) -> usize { - self.primary_buffer.as_ref().map_or(0, OutputBuffer::len) - + self - .named_buffers - .values() - .map(OutputBuffer::len) - .sum::() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } } impl ByteSizeOf for TransformOutputsBuf { @@ -439,6 +459,7 @@ impl OutputBuffer { }) } + #[cfg(any(feature = "test", test))] pub fn drain(&mut self) -> impl Iterator + '_ { self.0.drain(..).flat_map(EventArray::into_events) } @@ -458,12 +479,12 @@ impl OutputBuffer { self.0.iter().flat_map(EventArray::iter_events) } - pub fn into_events(self) -> impl Iterator { - self.0.into_iter().flat_map(EventArray::into_events) + fn events_mut(&mut self) -> impl Iterator { + self.0.iter_mut().flat_map(EventArray::iter_events_mut) } - pub fn take_events(&mut self) -> Vec { - std::mem::take(&mut self.0) + pub fn into_events(self) -> impl Iterator { + self.0.into_iter().flat_map(EventArray::into_events) } } diff --git a/src/config/transform.rs b/src/config/transform.rs index c2be848d53361..1b9f442ef0786 100644 --- a/src/config/transform.rs +++ b/src/config/transform.rs @@ -195,6 +195,9 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + &self, enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], + + // This only exists for transforms that create logs from non-logs, to know which namespace + // to use, such as `metric_to_log` global_log_namespace: LogNamespace, ) -> Vec; diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index a4f4eaae3b751..fea4a3980b64d 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -1,4 +1,5 @@ #![allow(missing_docs)] +use std::sync::Arc; use std::{collections::HashMap, fmt}; use chrono::Utc; @@ -19,6 +20,8 @@ use vrl::value::Value; mod errors; +use crate::config::{ComponentKey, OutputId}; +use crate::schema::Definition; pub use errors::{ClosedError, StreamSendError}; use lookup::PathPrefix; @@ -48,17 +51,37 @@ impl Builder { } } - pub fn add_source_output(&mut self, output: SourceOutput) -> LimitedReceiver { + pub fn add_source_output( + &mut self, + output: SourceOutput, + component_key: ComponentKey, + ) -> LimitedReceiver { let lag_time = self.lag_time.clone(); + let log_definition = output.schema_definition.clone(); + let output_id = OutputId { + component: component_key, + port: output.port.clone(), + }; match output.port { None => { - let (inner, rx) = - Inner::new_with_buffer(self.buf_size, DEFAULT_OUTPUT.to_owned(), lag_time); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + DEFAULT_OUTPUT.to_owned(), + lag_time, + log_definition, + output_id, + ); self.inner = Some(inner); rx } Some(name) => { - let (inner, rx) = Inner::new_with_buffer(self.buf_size, name.clone(), lag_time); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + name.clone(), + lag_time, + log_definition, + output_id, + ); self.named_inners.insert(name, inner); rx } @@ -91,9 +114,15 @@ impl SourceSender { } } - pub fn new_with_buffer(n: usize) -> (Self, LimitedReceiver) { + #[cfg(test)] + pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver) { let lag_time = Some(register_histogram!(LAG_TIME_NAME)); - let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time); + let output_id = OutputId { + component: "test".to_string().into(), + port: None, + }; + let (inner, rx) = + Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id); ( Self { inner: Some(inner), @@ -105,14 +134,14 @@ impl SourceSender { #[cfg(test)] pub fn new_test() -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); let recv = recv.into_stream().flat_map(into_event_stream); (pipe, recv) } #[cfg(test)] pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -131,7 +160,7 @@ impl SourceSender { pub fn new_test_errors( error_at: impl Fn(usize) -> bool, ) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -161,7 +190,11 @@ impl SourceSender { ) -> impl Stream + Unpin { // The lag_time parameter here will need to be filled in if this function is ever used for // non-test situations. - let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None); + let output_id = OutputId { + component: "test".to_string().into(), + port: Some(name.clone()), + }; + let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None, None, output_id); let recv = recv.into_stream().map(move |mut events| { events.iter_events_mut().for_each(|mut event| { let metadata = event.metadata_mut(); @@ -225,6 +258,11 @@ struct Inner { output: String, lag_time: Option, events_sent: Registered, + /// The schema definition that will be attached to Log events sent through here + log_definition: Option>, + /// The OutputId related to this source sender. This is set as the `upstream_id` in + /// `EventMetadata` for all event sent through here. + output_id: Arc, } impl fmt::Debug for Inner { @@ -242,6 +280,8 @@ impl Inner { n: usize, output: String, lag_time: Option, + log_definition: Option>, + output_id: OutputId, ) -> (Self, LimitedReceiver) { let (tx, rx) = channel::limited(n); ( @@ -252,16 +292,29 @@ impl Inner { events_sent: register!(EventsSent::from(internal_event::Output(Some( output.into() )))), + log_definition, + output_id: Arc::new(output_id), }, rx, ) } - async fn send(&mut self, events: EventArray) -> Result<(), ClosedError> { + async fn send(&mut self, mut events: EventArray) -> Result<(), ClosedError> { let reference = Utc::now().timestamp_millis(); events .iter_events() .for_each(|event| self.emit_lag_time(event, reference)); + + events.iter_events_mut().for_each(|mut event| { + // attach runtime schema definitions from the source + if let Some(log_definition) = &self.log_definition { + event.metadata_mut().set_schema_definition(log_definition); + } + event + .metadata_mut() + .set_upstream_id(Arc::clone(&self.output_id)); + }); + let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); self.inner.send(events).await.map_err(|_| ClosedError)?; @@ -290,23 +343,10 @@ impl Inner { E: Into + ByteSizeOf, I: IntoIterator, { - let reference = Utc::now().timestamp_millis(); let events = events.into_iter().map(Into::into); for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { - events - .iter_events() - .for_each(|event| self.emit_lag_time(event, reference)); - let cbs = CountByteSize(events.len(), events.estimated_json_encoded_size_of()); - match self.inner.send(events).await { - Ok(()) => { - self.events_sent.emit(cbs); - } - Err(error) => { - return Err(error.into()); - } - } + self.send(events).await?; } - Ok(()) } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index ca904de314a62..8ba6511c53557 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1121,7 +1121,7 @@ mod integration_test { delay: Duration, status: EventStatus, ) -> (SourceSender, impl Stream + Unpin) { - let (pipe, recv) = SourceSender::new_with_buffer(100); + let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100); let recv = BufferReceiver::new(recv.into()).into_stream(); let recv = recv.then(move |mut events| async move { events.iter_logs_mut().for_each(|log| { diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 798759fa1138d..fc538efad199d 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -10,10 +10,12 @@ use opentelemetry_proto::proto::{ }; use similar_asserts::assert_eq; use std::collections::BTreeMap; +use std::sync::Arc; use tonic::Request; use vector_core::config::LogNamespace; use vrl::value; +use crate::config::OutputId; use crate::{ config::{SourceConfig, SourceContext}, event::{into_event_stream, Event, EventStatus, LogEvent, Value}, @@ -269,7 +271,11 @@ async fn receive_grpc_logs_legacy_namespace() { ("observed_timestamp", Utc.timestamp_nanos(2).into()), ("source_type", "opentelemetry".into()), ]); - let expect_event = Event::from(LogEvent::from(expect_vec)); + let mut expect_event = Event::from(LogEvent::from(expect_vec)); + expect_event.set_upstream_id(Arc::new(OutputId { + component: "test".into(), + port: Some("logs".into()), + })); assert_eq!(actual_event, expect_event); }) .await; diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 93366629f3420..58ef30c3fcf99 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -727,7 +727,7 @@ mod test { // shutdown. let addr = next_addr(); - let (source_tx, source_rx) = SourceSender::new_with_buffer(10_000); + let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000); let source_key = ComponentKey::from("tcp_shutdown_infinite_stream"); let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx); diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 467dae41fec30..a1ae446c56d36 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -453,7 +453,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, rx) = SourceSender::new_with_buffer(4096); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) @@ -547,7 +547,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, _rx) = SourceSender::new_with_buffer(4096); + let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index 62b0d96d76f10..3fb594b677e7a 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -30,12 +30,12 @@ pub fn backpressure_source(counter: &Arc) -> BackpressureSourceConf } pub fn basic_source() -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); (tx, BasicSourceConfig::new(rx)) } pub fn basic_source_with_data(data: &str) -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); (tx, BasicSourceConfig::new_with_data(rx, data)) } @@ -43,7 +43,7 @@ pub fn basic_source_with_event_counter( force_shutdown: bool, ) -> (SourceSender, BasicSourceConfig, Arc) { let event_counter = Arc::new(AtomicUsize::new(0)); - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); let mut source = BasicSourceConfig::new_with_event_counter(rx, Arc::clone(&event_counter)); source.set_force_shutdown(force_shutdown); @@ -75,7 +75,7 @@ pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig } pub fn basic_sink(channel_size: usize) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, true); (rx.into_stream(), sink) } @@ -84,7 +84,7 @@ pub fn basic_sink_with_data( channel_size: usize, data: &str, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new_with_data(tx, true, data); (rx.into_stream(), sink) } @@ -92,7 +92,7 @@ pub fn basic_sink_with_data( pub fn basic_sink_failing_healthcheck( channel_size: usize, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, false); (rx.into_stream(), sink) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index b6385704b2b70..483d4b51aaede 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -20,6 +20,7 @@ use vector_common::internal_event::{ self, CountByteSize, EventsSent, InternalEventHandle as _, Registered, }; use vector_core::config::LogNamespace; +use vector_core::transform::update_runtime_schema_definition; use vector_core::{ buffers::{ topology::{ @@ -242,7 +243,7 @@ impl<'a> Builder<'a> { let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); for output in source_outputs.into_iter() { - let mut rx = builder.add_source_output(output.clone()); + let mut rx = builder.add_source_output(output.clone(), key.clone()); let (mut fanout, control) = Fanout::new(); let source = Arc::new(key.clone()); @@ -732,6 +733,7 @@ fn build_transform( node.input_details.data_type(), node.typetag, &node.key, + &node.outputs, ), } } @@ -741,7 +743,7 @@ fn build_sync_transform( node: TransformNode, input_rx: BufferReceiver, ) -> (Task, HashMap) { - let (outputs, controls) = TransformOutputs::new(node.outputs); + let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key); let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); let transform = if node.enable_concurrency { @@ -923,6 +925,7 @@ fn build_task_transform( input_type: DataType, typetag: &str, key: &ComponentKey, + outputs: &[TransformOutput], ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); @@ -938,8 +941,30 @@ fn build_task_transform( )) }); let events_sent = register!(EventsSent::from(internal_event::Output(None))); + let output_id = Arc::new(OutputId { + component: key.clone(), + port: None, + }); + + // Task transforms can only write to the default output, so only a single schema def map is needed + let schema_definition_map = outputs + .iter() + .find(|x| x.port.is_none()) + .expect("output for default port required for task transforms") + .log_schema_definitions + .clone() + .into_iter() + .map(|(key, value)| (key, Arc::new(value))) + .collect(); + let stream = t .transform(Box::pin(filtered)) + .map(move |mut events| { + for event in events.iter_events_mut() { + update_runtime_schema_definition(event, &output_id, &schema_definition_map); + } + events + }) .inspect(move |events: &EventArray| { events_sent.emit(CountByteSize( events.len(), diff --git a/src/topology/test/compliance.rs b/src/topology/test/compliance.rs index a716d29593998..8f4602aa1bba3 100644 --- a/src/topology/test/compliance.rs +++ b/src/topology/test/compliance.rs @@ -2,8 +2,10 @@ use std::sync::Arc; use tokio::sync::oneshot::{channel, Receiver}; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; use vector_core::event::{Event, EventArray, EventContainer, LogEvent}; +use crate::config::schema::Definition; use crate::{ config::{unit_test::UnitTestSourceConfig, ConfigBuilder}, test_util::{ @@ -57,6 +59,10 @@ async fn test_function_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); @@ -78,6 +84,10 @@ async fn test_sync_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); @@ -98,6 +108,10 @@ async fn test_task_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index aa5720382e96c..b8b9c3a0fd5d0 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -7,6 +7,7 @@ use std::{ }, }; +use crate::schema::Definition; use crate::{ config::{Config, ConfigDiff, SinkOuter}, event::{into_event_stream, Event, EventArray, EventContainer, LogEvent}, @@ -27,6 +28,7 @@ use tokio::{ }; use vector_buffers::{BufferConfig, BufferType, WhenFull}; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; mod backpressure; mod compliance; @@ -149,6 +151,10 @@ async fn topology_source_and_sink() { let res = out1.flat_map(into_event_stream).collect::>().await; event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event], res); } @@ -184,6 +190,16 @@ async fn topology_multiple_sources() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); event2.set_source_id(Arc::new(ComponentKey::from("in2"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(out_event1, Some(event1.into())); assert_eq!(out_event2, Some(event2.into())); } @@ -218,6 +234,12 @@ async fn topology_multiple_sinks() { // We should see that both sinks got the exact same event: event.set_source_id(Arc::new(ComponentKey::from("in1"))); + + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + let expected = vec![event]; assert_eq!(expected, res1); assert_eq!(expected, res2); @@ -293,6 +315,11 @@ async fn topology_remove_one_source() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + let res = h_out1.await.unwrap(); assert_eq!(vec![event1], res); } @@ -332,6 +359,11 @@ async fn topology_remove_one_sink() { event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event], res1); assert_eq!(Vec::::new(), res2); } @@ -442,6 +474,11 @@ async fn topology_swap_source() { assert_eq!(Vec::::new(), res1); event2.set_source_id(Arc::new(ComponentKey::from("in2"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event2], res2); } @@ -554,6 +591,10 @@ async fn topology_swap_sink() { assert_eq!(Vec::::new(), res1); event1.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event1], res2); } @@ -663,6 +704,15 @@ async fn topology_rebuild_connected() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); event2.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event1, event2], res); } @@ -715,6 +765,10 @@ async fn topology_rebuild_connected_transform() { assert_eq!(Vec::::new(), res1); event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event], res2); } diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index a591305764df1..ca5a7ae8679cb 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -156,8 +156,10 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vector_common::config::ComponentKey; + use vrl::value::Kind; use super::*; + use crate::schema::Definition; use crate::{ event::{metric, Event, Metric}, test_util::components::assert_transform_compliance, @@ -174,8 +176,13 @@ mod tests { kind: metric::MetricKind, value: metric::MetricValue, ) -> Event { - Event::Metric(Metric::new(name, kind, value)) + let mut event = Event::Metric(Metric::new(name, kind, value)) .with_source_id(Arc::new(ComponentKey::from("in"))) + .with_upstream_id(Arc::new(OutputId::from("transform"))); + event.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event } #[test] diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 4a6497628d78a..513a91ce9115e 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -289,7 +289,9 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vector_common::config::ComponentKey; + use vector_core::config::OutputId; + use crate::config::schema::Definition; use crate::{ event::{Event, LogEvent, Value}, test_util::components::assert_transform_compliance, @@ -363,6 +365,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event differs in matched field so should be output even though it @@ -371,6 +378,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); // Third event has the same value for "matched" as first event, so it should be dropped. @@ -413,6 +425,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event has a different matched field name with the same value, @@ -421,6 +438,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -466,6 +488,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event is the same just with different field order, so it @@ -511,6 +538,12 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event gets output because it's not a dupe. This causes the first @@ -519,6 +552,12 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(new_event, event2); // Third event is a dupe but gets output anyway because the first @@ -568,6 +607,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -576,6 +620,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -621,6 +670,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -629,6 +683,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -667,6 +726,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through as null is different than @@ -675,6 +739,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); diff --git a/src/transforms/filter.rs b/src/transforms/filter.rs index 95e8877bee255..e14f0c7347ab7 100644 --- a/src/transforms/filter.rs +++ b/src/transforms/filter.rs @@ -104,6 +104,7 @@ mod test { use vector_core::event::{Metric, MetricKind, MetricValue}; use super::*; + use crate::config::schema::Definition; use crate::{ conditions::ConditionConfig, event::{Event, LogEvent}, @@ -129,6 +130,10 @@ mod test { tx.send(log.clone()).await.unwrap(); log.set_source_id(Arc::new(ComponentKey::from("in"))); + log.set_upstream_id(Arc::new(OutputId::from("transform"))); + log.metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(out.recv().await.unwrap(), log); let metric = Event::from(Metric::new( diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index ad44b0a9e6d55..cb99cb186de8b 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, num::ParseFloatError}; use chrono::Utc; @@ -5,6 +6,7 @@ use indexmap::IndexMap; use vector_config::configurable_component; use vector_core::config::LogNamespace; +use crate::config::schema::Definition; use crate::{ config::{ DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, @@ -256,7 +258,10 @@ fn to_metric(config: &MetricConfig, event: &Event) -> Result Vec { let log_namespace = global_log_namespace.merge(self.log_namespace); - let mut schema_definition = - Definition::default_for_namespace(&BTreeSet::from([log_namespace])) - .with_event_field(&owned_value_path!("name"), Kind::bytes(), None) - .with_event_field( - &owned_value_path!("namespace"), - Kind::bytes().or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("tags"), - Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), - None, - ) - .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None) - .with_event_field( - &owned_value_path!("counter"), - Kind::object(Collection::empty().with_known("value", Kind::float())) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("gauge"), - Kind::object(Collection::empty().with_known("value", Kind::float())) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("set"), - Kind::object(Collection::empty().with_known( - "values", - Kind::array(Collection::empty().with_unknown(Kind::bytes())), - )) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("distribution"), - Kind::object( - Collection::empty() - .with_known( - "samples", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("value", Kind::float()) - .with_known("rate", Kind::integer()), - )), - ), - ) - .with_known("statistic", Kind::bytes()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("aggregated_histogram"), - Kind::object( - Collection::empty() - .with_known( - "buckets", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("upper_limit", Kind::float()) - .with_known("count", Kind::integer()), - )), - ), - ) - .with_known("count", Kind::integer()) - .with_known("sum", Kind::float()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("aggregated_summary"), - Kind::object( - Collection::empty() - .with_known( - "quantiles", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("quantile", Kind::float()) - .with_known("value", Kind::float()), - )), - ), - ) - .with_known("count", Kind::integer()) - .with_known("sum", Kind::float()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("sketch"), - Kind::any().or_undefined(), - None, - ); - - match log_namespace { - LogNamespace::Vector => { - // from serializing the Metric (Legacy moves it to another field) - schema_definition = schema_definition.with_event_field( - &owned_value_path!("timestamp"), - Kind::bytes().or_undefined(), - None, - ); - - // This is added as a "marker" field to determine which namespace is being used at runtime. - // This is normally handled automatically by sources, but this is a special case. - schema_definition = schema_definition.with_metadata_field( - &owned_value_path!("vector"), - Kind::object(Collection::empty()), - None, - ); - } - LogNamespace::Legacy => { - if let Some(timestamp_key) = log_schema().timestamp_key() { - schema_definition = - schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None); - } - - schema_definition = schema_definition.with_event_field( - &parse_value_path(log_schema().host_key()).expect("valid host key"), - Kind::bytes().or_undefined(), - None, - ); - } - } + let schema_definition = schema_definition(log_namespace); vec![TransformOutput::new( DataType::Log, @@ -249,6 +120,137 @@ impl TransformConfig for MetricToLogConfig { } } +fn schema_definition(log_namespace: LogNamespace) -> Definition { + let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace])) + .with_event_field(&owned_value_path!("name"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("namespace"), + Kind::bytes().or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("tags"), + Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), + None, + ) + .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("counter"), + Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("gauge"), + Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("set"), + Kind::object(Collection::empty().with_known( + "values", + Kind::array(Collection::empty().with_unknown(Kind::bytes())), + )) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("distribution"), + Kind::object( + Collection::empty() + .with_known( + "samples", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("value", Kind::float()) + .with_known("rate", Kind::integer()), + )), + ), + ) + .with_known("statistic", Kind::bytes()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("aggregated_histogram"), + Kind::object( + Collection::empty() + .with_known( + "buckets", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("upper_limit", Kind::float()) + .with_known("count", Kind::integer()), + )), + ), + ) + .with_known("count", Kind::integer()) + .with_known("sum", Kind::float()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("aggregated_summary"), + Kind::object( + Collection::empty() + .with_known( + "quantiles", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("quantile", Kind::float()) + .with_known("value", Kind::float()), + )), + ), + ) + .with_known("count", Kind::integer()) + .with_known("sum", Kind::float()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("sketch"), + Kind::any().or_undefined(), + None, + ); + + match log_namespace { + LogNamespace::Vector => { + // from serializing the Metric (Legacy moves it to another field) + schema_definition = schema_definition.with_event_field( + &owned_value_path!("timestamp"), + Kind::bytes().or_undefined(), + None, + ); + + // This is added as a "marker" field to determine which namespace is being used at runtime. + // This is normally handled automatically by sources, but this is a special case. + schema_definition = schema_definition.with_metadata_field( + &owned_value_path!("vector"), + Kind::object(Collection::empty()), + None, + ); + } + LogNamespace::Legacy => { + if let Some(timestamp_key) = log_schema().timestamp_key() { + schema_definition = + schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None); + } + + schema_definition = schema_definition.with_event_field( + &parse_value_path(log_schema().host_key()).expect("valid host key"), + Kind::bytes().or_undefined(), + None, + ); + } + } + schema_definition +} + #[derive(Clone, Debug)] pub struct MetricToLog { host_tag: String, @@ -412,6 +414,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = counter.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(counter).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -440,6 +444,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = gauge.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(gauge).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -468,6 +474,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = set.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(set).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -498,6 +506,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = distro.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(distro).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -547,6 +557,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = histo.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(histo).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -594,6 +606,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = summary.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(summary).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 455a4b142e4d6..90c9294b0cb63 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -26,6 +26,7 @@ use crate::{ mod merge_strategy; +use crate::config::schema::Definition; use crate::event::Value; pub use merge_strategy::*; use vector_core::config::LogNamespace; @@ -133,94 +134,101 @@ impl TransformConfig for ReduceConfig { input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { - let mut output_definitions = HashMap::new(); - - for (output, input) in input_definitions { - let mut schema_definition = input.clone(); - - for (key, merge_strategy) in self.merge_strategies.iter() { - let key = if let Ok(key) = parse_target_path(key) { - key - } else { - continue; - }; - - let input_kind = match key.prefix { - PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), - PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), - }; - - let new_kind = match merge_strategy { - MergeStrategy::Discard | MergeStrategy::Retain => { - /* does not change the type */ - input_kind.clone() + // Events may be combined, so there isn't a true single "source" for events. + // All of the definitions must be merged. + let merged_definition: Definition = input_definitions + .iter() + .map(|(_output, definition)| definition.clone()) + .reduce(Definition::merge) + .unwrap_or_else(Definition::any); + + let mut schema_definition = merged_definition; + + for (key, merge_strategy) in self.merge_strategies.iter() { + let key = if let Ok(key) = parse_target_path(key) { + key + } else { + continue; + }; + + let input_kind = match key.prefix { + PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), + PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), + }; + + let new_kind = match merge_strategy { + MergeStrategy::Discard | MergeStrategy::Retain => { + /* does not change the type */ + input_kind.clone() + } + MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => { + // only keeps integer / float values + match (input_kind.contains_integer(), input_kind.contains_float()) { + (true, true) => Kind::float().or_integer(), + (true, false) => Kind::integer(), + (false, true) => Kind::float(), + (false, false) => Kind::undefined(), } - MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => { - // only keeps integer / float values - match (input_kind.contains_integer(), input_kind.contains_float()) { - (true, true) => Kind::float().or_integer(), - (true, false) => Kind::integer(), - (false, true) => Kind::float(), - (false, false) => Kind::undefined(), - } + } + MergeStrategy::Array => { + let unknown_kind = input_kind.clone(); + Kind::array(Collection::empty().with_unknown(unknown_kind)) + } + MergeStrategy::Concat => { + let mut new_kind = Kind::never(); + + if input_kind.contains_bytes() { + new_kind.add_bytes(); } - MergeStrategy::Array => { - let unknown_kind = input_kind.clone(); - Kind::array(Collection::empty().with_unknown(unknown_kind)) + if let Some(array) = input_kind.as_array() { + // array elements can be either any type that the field can be, or any + // element of the array + let array_elements = array.reduced_kind().union(input_kind.without_array()); + new_kind.add_array(Collection::empty().with_unknown(array_elements)); } - MergeStrategy::Concat => { - let mut new_kind = Kind::never(); - - if input_kind.contains_bytes() { - new_kind.add_bytes(); - } - if let Some(array) = input_kind.as_array() { - // array elements can be either any type that the field can be, or any - // element of the array - let array_elements = - array.reduced_kind().union(input_kind.without_array()); - new_kind.add_array(Collection::empty().with_unknown(array_elements)); - } - new_kind + new_kind + } + MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => { + // can only produce bytes (or undefined) + if input_kind.contains_bytes() { + Kind::bytes() + } else { + Kind::undefined() } - MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => { - // can only produce bytes (or undefined) - if input_kind.contains_bytes() { - Kind::bytes() - } else { - Kind::undefined() - } + } + MergeStrategy::ShortestArray | MergeStrategy::LongestArray => { + if let Some(array) = input_kind.as_array() { + Kind::array(array.clone()) + } else { + Kind::undefined() } - MergeStrategy::ShortestArray | MergeStrategy::LongestArray => { - if let Some(array) = input_kind.as_array() { - Kind::array(array.clone()) - } else { - Kind::undefined() - } + } + MergeStrategy::FlatUnique => { + let mut array_elements = input_kind.without_array().without_object(); + if let Some(array) = input_kind.as_array() { + array_elements = array_elements.union(array.reduced_kind()); } - MergeStrategy::FlatUnique => { - let mut array_elements = input_kind.without_array().without_object(); - if let Some(array) = input_kind.as_array() { - array_elements = array_elements.union(array.reduced_kind()); - } - if let Some(object) = input_kind.as_object() { - array_elements = array_elements.union(object.reduced_kind()); - } - Kind::array(Collection::empty().with_unknown(array_elements)) + if let Some(object) = input_kind.as_object() { + array_elements = array_elements.union(object.reduced_kind()); } - }; + Kind::array(Collection::empty().with_unknown(array_elements)) + } + }; - // all of the merge strategies are optional. They won't produce a value unless a value actually exists - let new_kind = if input_kind.contains_undefined() { - new_kind.or_undefined() - } else { - new_kind - }; + // all of the merge strategies are optional. They won't produce a value unless a value actually exists + let new_kind = if input_kind.contains_undefined() { + new_kind.or_undefined() + } else { + new_kind + }; - schema_definition = schema_definition.with_field(&key, new_kind, None); - } + schema_definition = schema_definition.with_field(&key, new_kind, None); + } - output_definitions.insert(output.clone(), schema_definition); + // the same schema definition is used for all inputs + let mut output_definitions = HashMap::new(); + for (output, _input) in input_definitions { + output_definitions.insert(output.clone(), schema_definition.clone()); } vec![TransformOutput::new(DataType::Log, output_definitions)] @@ -474,12 +482,15 @@ impl TaskTransform for Reduce { #[cfg(test)] mod test { + use enrichment::TableRegistry; use serde_json::json; + use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vrl::value::Kind; use super::*; + use crate::config::schema::Definition; use crate::event::{LogEvent, Value}; use crate::test_util::components::assert_transform_compliance; use crate::transforms::test::create_topology; @@ -528,18 +539,33 @@ group_by = [ "request_id" ] .schema_definitions(true) .clone(); + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (tx, rx) = mpsc::channel(1); let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("counter", 1); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); e_2.insert("request_id", "2"); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_3 = LogEvent::from("test message 3"); e_3.insert("counter", 3); @@ -603,6 +629,18 @@ merge_strategies.baz = "max" assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); @@ -610,7 +648,9 @@ merge_strategies.baz = "max" e_1.insert("bar", "first bar"); e_1.insert("baz", 2); e_1.insert("request_id", "1"); - let metadata = e_1.metadata().clone(); + let mut metadata = e_1.metadata().clone(); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); @@ -660,17 +700,32 @@ group_by = [ "request_id" ] assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("counter", 1); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); let mut e_3 = LogEvent::from("test message 3"); @@ -852,20 +907,37 @@ merge_strategies.bar = "concat" assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("foo", json!([1, 3])); e_1.insert("bar", json!([1, 3])); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); + tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("foo", json!([2, 4])); e_2.insert("bar", json!([2, 4])); e_2.insert("request_id", "2"); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); let mut e_3 = LogEvent::from("test message 3"); diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index a6b01dbc8844d..0cbd2af1c119e 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use std::{ collections::BTreeMap, fs::File, @@ -376,8 +375,6 @@ where drop_on_error: bool, drop_on_abort: bool, reroute_dropped: bool, - default_schema_definition: Arc, - dropped_schema_definition: Arc, runner: Runner, metric_tag_values: MetricTagValues, } @@ -444,28 +441,6 @@ where program: Program, runner: Runner, ) -> crate::Result { - let default_schema_definition = context - .schema_definitions - .get(&None) - .expect("default schema required") - // TODO we can now have multiple possible definitions. - // This is going to need to be updated to store these possible definitions and then - // choose the correct one based on the input the event has come from. - .iter() - .map(|(_output, definition)| definition.clone()) - .next() - .unwrap_or_else(Definition::any); - - let dropped_schema_definition = context - .schema_definitions - .get(&Some(DROPPED.to_owned())) - .or_else(|| context.schema_definitions.get(&None)) - .expect("dropped schema required") - .iter() - .map(|(_output, definition)| definition.clone()) - .next() - .unwrap_or_else(Definition::any); - Ok(Remap { component_key: context.key.clone(), program, @@ -475,8 +450,6 @@ where drop_on_error: config.drop_on_error, drop_on_abort: config.drop_on_abort, reroute_dropped: config.reroute_dropped, - default_schema_definition: Arc::new(default_schema_definition), - dropped_schema_definition: Arc::new(dropped_schema_definition), runner, metric_tag_values: config.metric_tag_values, }) @@ -587,13 +560,11 @@ where match result { Ok(_) => match target.into_events() { - TargetEvents::One(event) => { - push_default(event, output, &self.default_schema_definition) + TargetEvents::One(event) => push_default(event, output), + TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)), + TargetEvents::Traces(events) => { + events.for_each(|event| push_default(event, output)) } - TargetEvents::Logs(events) => events - .for_each(|event| push_default(event, output, &self.default_schema_definition)), - TargetEvents::Traces(events) => events - .for_each(|event| push_default(event, output, &self.default_schema_definition)), }, Err(reason) => { let (reason, error, drop) = match reason { @@ -617,12 +588,12 @@ where if !drop { let event = original_event.expect("event will be set"); - push_default(event, output, &self.default_schema_definition); + push_default(event, output); } else if self.reroute_dropped { let mut event = original_event.expect("event will be set"); self.annotate_dropped(&mut event, reason, error); - push_dropped(event, output, &self.dropped_schema_definition); + push_dropped(event, output); } } } @@ -630,29 +601,13 @@ where } #[inline] -fn push_default( - mut event: Event, - output: &mut TransformOutputsBuf, - schema_definition: &Arc, -) { - event - .metadata_mut() - .set_schema_definition(schema_definition); - - output.push(event) +fn push_default(event: Event, output: &mut TransformOutputsBuf) { + output.push(None, event) } #[inline] -fn push_dropped( - mut event: Event, - output: &mut TransformOutputsBuf, - schema_definition: &Arc, -) { - event - .metadata_mut() - .set_schema_definition(schema_definition); - - output.push_named(DROPPED, event) +fn push_dropped(event: Event, output: &mut TransformOutputsBuf) { + output.push(Some(DROPPED), event); } /// If the VRL returns a value that is not an array (see [`merge_array_definitions`]), @@ -721,6 +676,7 @@ pub enum BuildError { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; + use std::sync::Arc; use indoc::{formatdoc, indoc}; use vector_core::{config::GlobalOptions, event::EventMetadata, metric_tags}; @@ -841,10 +797,6 @@ mod tests { let result1 = transform_one(&mut tform, event1).unwrap(); assert_eq!(get_field_string(&result1, "message"), "event1"); assert_eq!(get_field_string(&result1, "foo"), "bar"); - assert_eq!( - result1.metadata().schema_definition(), - &test_default_schema_definition() - ); assert!(tform.runner().runtime.is_empty()); let event2 = { @@ -854,10 +806,6 @@ mod tests { let result2 = transform_one(&mut tform, event2).unwrap(); assert_eq!(get_field_string(&result2, "message"), "event2"); assert_eq!(result2.as_log().get("foo"), Some(&Value::Null)); - assert_eq!( - result2.metadata().schema_definition(), - &test_default_schema_definition() - ); assert!(tform.runner().runtime.is_empty()); } @@ -889,11 +837,6 @@ mod tests { assert_eq!(get_field_string(&result, "foo"), "bar"); assert_eq!(get_field_string(&result, "bar"), "baz"); assert_eq!(get_field_string(&result, "copy"), "buz"); - - assert_eq!( - result.metadata().schema_definition(), - &test_default_schema_definition() - ); } #[test] @@ -927,17 +870,8 @@ mod tests { let r = result.next().unwrap(); assert_eq!(get_field_string(&r, "message"), "foo"); - assert_eq!( - r.metadata().schema_definition(), - &test_default_schema_definition() - ); let r = result.next().unwrap(); assert_eq!(get_field_string(&r, "message"), "bar"); - - assert_eq!( - r.metadata().schema_definition(), - &test_default_schema_definition() - ); } #[test] @@ -1103,7 +1037,9 @@ mod tests { "zork", MetricKind::Incremental, MetricValue::Counter { value: 1.0 }, - metadata.with_schema_definition(&Arc::new(test_default_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + metadata ) .with_namespace(Some("zerk")) .with_tags(Some(metric_tags! { @@ -1313,8 +1249,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_default_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "hello" => "world", @@ -1331,8 +1270,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_dropped_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "hello" => "goodbye", @@ -1352,8 +1294,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_dropped_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "not_hello" => "oops", diff --git a/src/transforms/route.rs b/src/transforms/route.rs index adcac43ff504c..e410277914a8f 100644 --- a/src/transforms/route.rs +++ b/src/transforms/route.rs @@ -42,13 +42,13 @@ impl SyncTransform for Route { for (output_name, condition) in &self.conditions { let (result, event) = condition.check(event.clone()); if result { - output.push_named(output_name, event); + output.push(Some(output_name), event); } else { check_failed += 1; } } if check_failed == self.conditions.len() { - output.push_named(UNMATCHED_ROUTE, event); + output.push(Some(UNMATCHED_ROUTE), event); } } } diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index 8488658e8ea55..5753d0176dd3b 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1,9 +1,12 @@ use std::sync::Arc; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; use vector_core::metric_tags; use super::*; +use crate::config::schema::Definition; +use crate::config::LogNamespace; use crate::event::metric::TagValue; use crate::event::{metric, Event, Metric, MetricTags}; use crate::test_util::components::assert_transform_compliance; @@ -13,6 +16,7 @@ use crate::transforms::tag_cardinality_limit::config::{ use crate::transforms::test::create_topology; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use vrl::compiler::prelude::Kind; #[test] fn generate_config() { @@ -88,6 +92,16 @@ async fn drop_event(config: TagCardinalityLimitConfig) { event1.set_source_id(Arc::new(ComponentKey::from("in"))); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); // Third value rejected since value_limit is 2. @@ -135,6 +149,20 @@ async fn drop_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); // The third event should have been modified to remove "tag1" @@ -207,6 +235,21 @@ async fn drop_tag_multi_value(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + drop(tx); topology.stop().await; @@ -257,6 +300,21 @@ async fn separate_value_limit_per_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); assert_eq!(new_event3, Some(event3));