From b051f26ec3026437bddf6a7baca2849bf7c0e55e Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Tue, 13 Jun 2023 15:46:17 -0400 Subject: [PATCH 01/18] save --- lib/vector-core/src/config/mod.rs | 2 +- lib/vector-core/src/event/metadata.rs | 21 ++++- lib/vector-core/src/transform/mod.rs | 123 ++++++++++++-------------- src/source_sender/mod.rs | 77 +++++++++++----- src/sources/kafka.rs | 2 +- src/sources/socket/mod.rs | 2 +- src/sources/statsd/mod.rs | 4 +- src/test_util/mock/mod.rs | 12 +-- src/topology/builder.rs | 2 +- src/transforms/remap.rs | 91 +++++++++---------- src/transforms/route.rs | 4 +- 11 files changed, 187 insertions(+), 153 deletions(-) diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 97cbc091d8f5e..fb5e20767fe40 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -201,7 +201,7 @@ pub struct TransformOutput { /// enabled, at least one definition should be output. If the transform /// has multiple connected sources, it is possible to have multiple output /// definitions - one for each input. - log_schema_definitions: HashMap, + pub log_schema_definitions: HashMap, } impl TransformOutput { diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 403b43bfc52b9..1f9ed94ede42e 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -32,6 +32,12 @@ pub struct EventMetadata { /// The id of the source source_id: Option>, + /// The id of the component this event originated from. This is used to + /// determine which schema definition to attach to an event in transforms. + /// This should always have a value set for events in transforms. It will always be `None` + /// in a source, and there is currently no use-case for reading the value in a sink. + parent_id: Option>, + /// An identifier for a globally registered schema definition which provides information about /// the event shape (type information, and semantic meaning of fields). /// @@ -73,17 +79,29 @@ impl EventMetadata { &mut self.secrets } - /// Returns a reference to the metadata source. + /// Returns a reference to the metadata source id. #[must_use] pub fn source_id(&self) -> Option<&OutputId> { self.source_id.as_deref() } + /// Returns a reference to the metadata parent id. This is the `OutputId` + /// of the previous component the event was sent through (if any). + #[must_use] + pub fn parent_id(&self) -> Option<&OutputId> { + self.parent_id.as_deref() + } + /// Sets the `source_id` in the metadata to the provided value. pub fn set_source_id(&mut self, source_id: Arc) { self.source_id = Some(source_id); } + /// Sets the `parent_id` in the metadata to the provided value. + pub fn set_parent_id(&mut self, parent_id: Arc) { + self.parent_id = Some(parent_id); + } + /// Return the datadog API key, if it exists pub fn datadog_api_key(&self) -> Option> { self.secrets.get(DATADOG_API_KEY).cloned() @@ -113,6 +131,7 @@ impl Default for EventMetadata { finalizers: Default::default(), schema_definition: default_schema_definition(), source_id: None, + parent_id: None, } } } diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index a60cd85c8200a..666fbfc516f2f 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, error, pin::Pin}; use futures::{Stream, StreamExt}; @@ -7,13 +8,15 @@ use vector_common::internal_event::{ use vector_common::json_size::JsonSize; use vector_common::EventDataEq; +use crate::config::OutputId; +use crate::event::EventMutRef; use crate::{ config, event::{ into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef, }, fanout::{self, Fanout}, - ByteSizeOf, + schema, ByteSizeOf, }; #[cfg(any(feature = "lua"))] @@ -178,6 +181,7 @@ impl SyncTransform for Box { struct TransformOutput { fanout: Fanout, events_sent: Registered, + log_schema_definitions: HashMap>, } pub struct TransformOutputs { @@ -197,6 +201,13 @@ impl TransformOutputs { for output in outputs_in { let (fanout, control) = Fanout::new(); + + let log_schema_definitions = output + .log_schema_definitions + .into_iter() + .map(|(id, definition)| (id, Arc::new(definition))) + .collect(); + match output.port { None => { primary_output = Some(TransformOutput { @@ -204,6 +215,7 @@ impl TransformOutputs { events_sent: register(EventsSent::from(internal_event::Output(Some( DEFAULT_OUTPUT.into(), )))), + log_schema_definitions, }); controls.insert(None, control); } @@ -215,6 +227,7 @@ impl TransformOutputs { events_sent: register(EventsSent::from(internal_event::Output(Some( name.clone().into(), )))), + log_schema_definitions, }, ); controls.insert(Some(name.clone()), control); @@ -246,27 +259,35 @@ impl TransformOutputs { buf: &mut TransformOutputsBuf, ) -> Result<(), Box> { if let Some(primary) = self.primary_output.as_mut() { - let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len); - let byte_size = buf.primary_buffer.as_ref().map_or( - JsonSize::new(0), - EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of, - ); - buf.primary_buffer - .as_mut() - .expect("mismatched outputs") - .send(&mut primary.fanout) - .await?; - primary.events_sent.emit(CountByteSize(count, byte_size)); + let buf = buf.primary_buffer.as_mut().expect("mismatched outputs"); + Self::send_single_buffer(buf, primary).await?; } - for (key, buf) in &mut buf.named_buffers { - let count = buf.len(); - let byte_size = buf.estimated_json_encoded_size_of(); let output = self.named_outputs.get_mut(key).expect("unknown output"); - buf.send(&mut output.fanout).await?; - output.events_sent.emit(CountByteSize(count, byte_size)); + Self::send_single_buffer(buf, output).await?; } + Ok(()) + } + async fn send_single_buffer( + buf: &mut OutputBuffer, + output: &mut TransformOutput, + ) -> Result<(), Box> { + // update schema definitions for log events + for event in buf.events_mut() { + if let EventMutRef::Log(log) = event { + if let Some(parent_component_id) = log.metadata_mut().parent_id() { + if let Some(definition) = output.log_schema_definitions.get(parent_component_id) + { + log.metadata_mut().set_schema_definition(definition); + } + } + } + } + let count = buf.len(); + let byte_size = buf.estimated_json_encoded_size_of(); + buf.send(&mut output.fanout).await?; + output.events_sent.emit(CountByteSize(count, byte_size)); Ok(()) } } @@ -299,34 +320,17 @@ impl TransformOutputsBuf { } } - pub fn push(&mut self, event: Event) { - self.primary_buffer - .as_mut() - .expect("no default output") - .push(event); - } - - pub fn push_named(&mut self, name: &str, event: Event) { - self.named_buffers - .get_mut(name) - .expect("unknown output") - .push(event); - } - - pub fn append(&mut self, slice: &mut Vec) { - self.primary_buffer - .as_mut() - .expect("no default output") - .append(slice); - } - - pub fn append_named(&mut self, name: &str, slice: &mut Vec) { - self.named_buffers - .get_mut(name) - .expect("unknown output") - .append(slice); + /// Adds a new event to the transform output buffer + pub fn push(&mut self, name: Option<&str>, event: Event) { + match name { + Some(name) => self.named_buffers.get_mut(name), + None => self.primary_buffer.as_mut(), + } + .expect("unknown output") + .push(event); } + #[cfg(test)] pub fn drain(&mut self) -> impl Iterator + '_ { self.primary_buffer .as_mut() @@ -334,6 +338,7 @@ impl TransformOutputsBuf { .drain() } + #[cfg(test)] pub fn drain_named(&mut self, name: &str) -> impl Iterator + '_ { self.named_buffers .get_mut(name) @@ -341,33 +346,15 @@ impl TransformOutputsBuf { .drain() } - pub fn extend(&mut self, events: impl Iterator) { - self.primary_buffer - .as_mut() - .expect("no default output") - .extend(events); - } - + #[cfg(test)] pub fn take_primary(&mut self) -> OutputBuffer { std::mem::take(self.primary_buffer.as_mut().expect("no default output")) } + #[cfg(test)] pub fn take_all_named(&mut self) -> HashMap { std::mem::take(&mut self.named_buffers) } - - pub fn len(&self) -> usize { - self.primary_buffer.as_ref().map_or(0, OutputBuffer::len) - + self - .named_buffers - .values() - .map(OutputBuffer::len) - .sum::() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } } impl ByteSizeOf for TransformOutputsBuf { @@ -419,6 +406,7 @@ impl OutputBuffer { } } + #[cfg(test)] pub fn is_empty(&self) -> bool { self.0.is_empty() } @@ -439,6 +427,7 @@ impl OutputBuffer { }) } + #[cfg(test)] pub fn drain(&mut self) -> impl Iterator + '_ { self.0.drain(..).flat_map(EventArray::into_events) } @@ -458,12 +447,12 @@ impl OutputBuffer { self.0.iter().flat_map(EventArray::iter_events) } - pub fn into_events(self) -> impl Iterator { - self.0.into_iter().flat_map(EventArray::into_events) + fn events_mut(&mut self) -> impl Iterator { + self.0.iter_mut().flat_map(EventArray::iter_events_mut) } - pub fn take_events(&mut self) -> Vec { - std::mem::take(&mut self.0) + pub fn into_events(self) -> impl Iterator { + self.0.into_iter().flat_map(EventArray::into_events) } } diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index a4f4eaae3b751..4c416743d63d9 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -1,4 +1,5 @@ #![allow(missing_docs)] +use std::sync::Arc; use std::{collections::HashMap, fmt}; use chrono::Utc; @@ -19,6 +20,8 @@ use vrl::value::Value; mod errors; +use crate::config::{ComponentKey, OutputId}; +use crate::schema::Definition; pub use errors::{ClosedError, StreamSendError}; use lookup::PathPrefix; @@ -48,17 +51,37 @@ impl Builder { } } - pub fn add_source_output(&mut self, output: SourceOutput) -> LimitedReceiver { + pub fn add_source_output( + &mut self, + output: SourceOutput, + component_key: ComponentKey, + ) -> LimitedReceiver { let lag_time = self.lag_time.clone(); + let log_definition = output.schema_definition.clone().map(Arc::new); + let output_id = OutputId { + component: component_key, + port: output.port.clone(), + }; match output.port { None => { - let (inner, rx) = - Inner::new_with_buffer(self.buf_size, DEFAULT_OUTPUT.to_owned(), lag_time); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + DEFAULT_OUTPUT.to_owned(), + lag_time, + log_definition, + output_id, + ); self.inner = Some(inner); rx } Some(name) => { - let (inner, rx) = Inner::new_with_buffer(self.buf_size, name.clone(), lag_time); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + name.clone(), + lag_time, + log_definition, + output_id, + ); self.named_inners.insert(name, inner); rx } @@ -91,9 +114,10 @@ impl SourceSender { } } - pub fn new_with_buffer(n: usize) -> (Self, LimitedReceiver) { + #[cfg(test)] + pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver) { let lag_time = Some(register_histogram!(LAG_TIME_NAME)); - let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time); + let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None); ( Self { inner: Some(inner), @@ -105,14 +129,14 @@ impl SourceSender { #[cfg(test)] pub fn new_test() -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); let recv = recv.into_stream().flat_map(into_event_stream); (pipe, recv) } #[cfg(test)] pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -131,7 +155,7 @@ impl SourceSender { pub fn new_test_errors( error_at: impl Fn(usize) -> bool, ) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -225,6 +249,11 @@ struct Inner { output: String, lag_time: Option, events_sent: Registered, + /// The schema definition that will be attached to Log events sent through here + log_definition: Option>, + /// The OutputId related to this source sender. This is set as the `parent_id` in + /// `EventMetadata` for all event sent through here. + output_id: Arc, } impl fmt::Debug for Inner { @@ -242,6 +271,8 @@ impl Inner { n: usize, output: String, lag_time: Option, + log_definition: Option>, + output_id: OutputId, ) -> (Self, LimitedReceiver) { let (tx, rx) = channel::limited(n); ( @@ -252,16 +283,27 @@ impl Inner { events_sent: register!(EventsSent::from(internal_event::Output(Some( output.into() )))), + log_definition, + output_id: Arc::new(output_id), }, rx, ) } - async fn send(&mut self, events: EventArray) -> Result<(), ClosedError> { + async fn send(&mut self, mut events: EventArray) -> Result<(), ClosedError> { let reference = Utc::now().timestamp_millis(); events .iter_events() .for_each(|event| self.emit_lag_time(event, reference)); + + events.iter_events_mut().for_each(|mut event| { + // attach runtime schema definitions from the source + if let Some(log_definition) = &self.log_definition { + event.metadata_mut().set_schema_definition(log_definition); + } + event.metadata_mut().set_parent_id(self.output_id.clone()); + }); + let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); self.inner.send(events).await.map_err(|_| ClosedError)?; @@ -290,23 +332,10 @@ impl Inner { E: Into + ByteSizeOf, I: IntoIterator, { - let reference = Utc::now().timestamp_millis(); let events = events.into_iter().map(Into::into); for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { - events - .iter_events() - .for_each(|event| self.emit_lag_time(event, reference)); - let cbs = CountByteSize(events.len(), events.estimated_json_encoded_size_of()); - match self.inner.send(events).await { - Ok(()) => { - self.events_sent.emit(cbs); - } - Err(error) => { - return Err(error.into()); - } - } + self.send(events).await?; } - Ok(()) } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 39f16074f5342..2676b29fb956f 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1121,7 +1121,7 @@ mod integration_test { delay: Duration, status: EventStatus, ) -> (SourceSender, impl Stream + Unpin) { - let (pipe, recv) = SourceSender::new_with_buffer(100); + let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100); let recv = BufferReceiver::new(recv.into()).into_stream(); let recv = recv.then(move |mut events| async move { events.iter_logs_mut().for_each(|log| { diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 93366629f3420..58ef30c3fcf99 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -727,7 +727,7 @@ mod test { // shutdown. let addr = next_addr(); - let (source_tx, source_rx) = SourceSender::new_with_buffer(10_000); + let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000); let source_key = ComponentKey::from("tcp_shutdown_infinite_stream"); let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx); diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 4f18e1d14be1b..2bf5e393370fc 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -453,7 +453,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, rx) = SourceSender::new_with_buffer(4096); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) @@ -547,7 +547,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, _rx) = SourceSender::new_with_buffer(4096); + let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index 62b0d96d76f10..3fb594b677e7a 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -30,12 +30,12 @@ pub fn backpressure_source(counter: &Arc) -> BackpressureSourceConf } pub fn basic_source() -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); (tx, BasicSourceConfig::new(rx)) } pub fn basic_source_with_data(data: &str) -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); (tx, BasicSourceConfig::new_with_data(rx, data)) } @@ -43,7 +43,7 @@ pub fn basic_source_with_event_counter( force_shutdown: bool, ) -> (SourceSender, BasicSourceConfig, Arc) { let event_counter = Arc::new(AtomicUsize::new(0)); - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); let mut source = BasicSourceConfig::new_with_event_counter(rx, Arc::clone(&event_counter)); source.set_force_shutdown(force_shutdown); @@ -75,7 +75,7 @@ pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig } pub fn basic_sink(channel_size: usize) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, true); (rx.into_stream(), sink) } @@ -84,7 +84,7 @@ pub fn basic_sink_with_data( channel_size: usize, data: &str, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new_with_data(tx, true, data); (rx.into_stream(), sink) } @@ -92,7 +92,7 @@ pub fn basic_sink_with_data( pub fn basic_sink_failing_healthcheck( channel_size: usize, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, false); (rx.into_stream(), sink) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index a02dddd13f3d3..3fa43f198fd3d 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -243,7 +243,7 @@ impl<'a> Builder<'a> { let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); for output in source_outputs.into_iter() { - let mut rx = builder.add_source_output(output.clone()); + let mut rx = builder.add_source_output(output.clone(), key.clone()); let (mut fanout, control) = Fanout::new(); let source = Arc::new(OutputId { diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index 49c84faad97e2..841384f47cef3 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::Arc; +// use std::sync::Arc; use std::{ collections::BTreeMap, fs::File, @@ -376,8 +376,8 @@ where drop_on_error: bool, drop_on_abort: bool, reroute_dropped: bool, - default_schema_definition: Arc, - dropped_schema_definition: Arc, + // default_schema_definition: Arc, + // dropped_schema_definition: Arc, runner: Runner, metric_tag_values: MetricTagValues, } @@ -444,27 +444,27 @@ where program: Program, runner: Runner, ) -> crate::Result { - let default_schema_definition = context - .schema_definitions - .get(&None) - .expect("default schema required") - // TODO we can now have multiple possible definitions. - // This is going to need to be updated to store these possible definitions and then - // choose the correct one based on the input the event has come from. - .iter() - .map(|(_output, definition)| definition.clone()) - .next() - .unwrap_or_else(Definition::any); - - let dropped_schema_definition = context - .schema_definitions - .get(&Some(DROPPED.to_owned())) - .or_else(|| context.schema_definitions.get(&None)) - .expect("dropped schema required") - .iter() - .map(|(_output, definition)| definition.clone()) - .next() - .unwrap_or_else(Definition::any); + // let default_schema_definition = context + // .schema_definitions + // .get(&None) + // .expect("default schema required") + // // TODO we can now have multiple possible definitions. + // // This is going to need to be updated to store these possible definitions and then + // // choose the correct one based on the input the event has come from. + // .iter() + // .map(|(_output, definition)| definition.clone()) + // .next() + // .unwrap_or_else(Definition::any); + // + // let dropped_schema_definition = context + // .schema_definitions + // .get(&Some(DROPPED.to_owned())) + // .or_else(|| context.schema_definitions.get(&None)) + // .expect("dropped schema required") + // .iter() + // .map(|(_output, definition)| definition.clone()) + // .next() + // .unwrap_or_else(Definition::any); Ok(Remap { component_key: context.key.clone(), @@ -475,8 +475,8 @@ where drop_on_error: config.drop_on_error, drop_on_abort: config.drop_on_abort, reroute_dropped: config.reroute_dropped, - default_schema_definition: Arc::new(default_schema_definition), - dropped_schema_definition: Arc::new(dropped_schema_definition), + // default_schema_definition: Arc::new(default_schema_definition), + // dropped_schema_definition: Arc::new(dropped_schema_definition), runner, metric_tag_values: config.metric_tag_values, }) @@ -587,13 +587,11 @@ where match result { Ok(_) => match target.into_events() { - TargetEvents::One(event) => { - push_default(event, output, &self.default_schema_definition) + TargetEvents::One(event) => push_default(event, output), + TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)), + TargetEvents::Traces(events) => { + events.for_each(|event| push_default(event, output)) } - TargetEvents::Logs(events) => events - .for_each(|event| push_default(event, output, &self.default_schema_definition)), - TargetEvents::Traces(events) => events - .for_each(|event| push_default(event, output, &self.default_schema_definition)), }, Err(reason) => { let (reason, error, drop) = match reason { @@ -617,12 +615,12 @@ where if !drop { let event = original_event.expect("event will be set"); - push_default(event, output, &self.default_schema_definition); + push_default(event, output); } else if self.reroute_dropped { let mut event = original_event.expect("event will be set"); self.annotate_dropped(&mut event, reason, error); - push_dropped(event, output, &self.dropped_schema_definition); + push_dropped(event, output); } } } @@ -631,28 +629,27 @@ where #[inline] fn push_default( - mut event: Event, + event: Event, output: &mut TransformOutputsBuf, - schema_definition: &Arc, + // schema_definition: &Arc, ) { - event - .metadata_mut() - .set_schema_definition(schema_definition); + // event + // .metadata_mut() + // .set_schema_definition(schema_definition); - output.push(event) + output.push(None, event) } #[inline] fn push_dropped( - mut event: Event, + event: Event, output: &mut TransformOutputsBuf, - schema_definition: &Arc, + // schema_definition: &Arc, ) { - event - .metadata_mut() - .set_schema_definition(schema_definition); - - output.push_named(DROPPED, event) + // event + // .metadata_mut() + // .set_schema_definition(schema_definition); + output.push(Some(DROPPED), event); } /// If the VRL returns a value that is not an array (see [`merge_array_definitions`]), diff --git a/src/transforms/route.rs b/src/transforms/route.rs index adcac43ff504c..e410277914a8f 100644 --- a/src/transforms/route.rs +++ b/src/transforms/route.rs @@ -42,13 +42,13 @@ impl SyncTransform for Route { for (output_name, condition) in &self.conditions { let (result, event) = condition.check(event.clone()); if result { - output.push_named(output_name, event); + output.push(Some(output_name), event); } else { check_failed += 1; } } if check_failed == self.conditions.len() { - output.push_named(UNMATCHED_ROUTE, event); + output.push(Some(UNMATCHED_ROUTE), event); } } } From b07004a142de4c83c40516481b6112a5c1fd14e2 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Tue, 13 Jun 2023 17:09:17 -0400 Subject: [PATCH 02/18] clippy --- lib/vector-core/src/transform/mod.rs | 11 +++++------ src/source_sender/mod.rs | 17 ++++++++++++++--- src/transforms/remap.rs | 1 + 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index 666fbfc516f2f..5c7db14a53369 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -330,7 +330,7 @@ impl TransformOutputsBuf { .push(event); } - #[cfg(test)] + #[cfg(any(feature = "test", test))] pub fn drain(&mut self) -> impl Iterator + '_ { self.primary_buffer .as_mut() @@ -338,7 +338,7 @@ impl TransformOutputsBuf { .drain() } - #[cfg(test)] + #[cfg(any(feature = "test", test))] pub fn drain_named(&mut self, name: &str) -> impl Iterator + '_ { self.named_buffers .get_mut(name) @@ -346,12 +346,12 @@ impl TransformOutputsBuf { .drain() } - #[cfg(test)] + #[cfg(any(feature = "test", test))] pub fn take_primary(&mut self) -> OutputBuffer { std::mem::take(self.primary_buffer.as_mut().expect("no default output")) } - #[cfg(test)] + #[cfg(any(feature = "test", test))] pub fn take_all_named(&mut self) -> HashMap { std::mem::take(&mut self.named_buffers) } @@ -406,7 +406,6 @@ impl OutputBuffer { } } - #[cfg(test)] pub fn is_empty(&self) -> bool { self.0.is_empty() } @@ -427,7 +426,7 @@ impl OutputBuffer { }) } - #[cfg(test)] + #[cfg(any(feature = "test", test))] pub fn drain(&mut self) -> impl Iterator + '_ { self.0.drain(..).flat_map(EventArray::into_events) } diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 4c416743d63d9..1d061f0396bf1 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -117,7 +117,12 @@ impl SourceSender { #[cfg(test)] pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver) { let lag_time = Some(register_histogram!(LAG_TIME_NAME)); - let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None); + let output_id = OutputId { + component: "test".to_string().into(), + port: None, + }; + let (inner, rx) = + Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id); ( Self { inner: Some(inner), @@ -185,7 +190,11 @@ impl SourceSender { ) -> impl Stream + Unpin { // The lag_time parameter here will need to be filled in if this function is ever used for // non-test situations. - let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None); + let output_id = OutputId { + component: "test".to_string().into(), + port: Some(name.clone()), + }; + let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None, None, output_id); let recv = recv.into_stream().map(move |mut events| { events.iter_events_mut().for_each(|mut event| { let metadata = event.metadata_mut(); @@ -301,7 +310,9 @@ impl Inner { if let Some(log_definition) = &self.log_definition { event.metadata_mut().set_schema_definition(log_definition); } - event.metadata_mut().set_parent_id(self.output_id.clone()); + event + .metadata_mut() + .set_parent_id(Arc::clone(&self.output_id)); }); let byte_size = events.estimated_json_encoded_size_of(); diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index 841384f47cef3..4aade9d969632 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -718,6 +718,7 @@ pub enum BuildError { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; + use std::sync::Arc; use indoc::{formatdoc, indoc}; use vector_core::{config::GlobalOptions, event::EventMetadata, metric_tags}; From 65db52dbf22a11146ee84f304ca45d4df41b3d9b Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Wed, 14 Jun 2023 09:01:05 -0400 Subject: [PATCH 03/18] cleanup --- src/transforms/remap.rs | 44 ++--------------------------------------- 1 file changed, 2 insertions(+), 42 deletions(-) diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index 4aade9d969632..059ce779d3337 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -// use std::sync::Arc; use std::{ collections::BTreeMap, fs::File, @@ -376,8 +375,6 @@ where drop_on_error: bool, drop_on_abort: bool, reroute_dropped: bool, - // default_schema_definition: Arc, - // dropped_schema_definition: Arc, runner: Runner, metric_tag_values: MetricTagValues, } @@ -444,28 +441,6 @@ where program: Program, runner: Runner, ) -> crate::Result { - // let default_schema_definition = context - // .schema_definitions - // .get(&None) - // .expect("default schema required") - // // TODO we can now have multiple possible definitions. - // // This is going to need to be updated to store these possible definitions and then - // // choose the correct one based on the input the event has come from. - // .iter() - // .map(|(_output, definition)| definition.clone()) - // .next() - // .unwrap_or_else(Definition::any); - // - // let dropped_schema_definition = context - // .schema_definitions - // .get(&Some(DROPPED.to_owned())) - // .or_else(|| context.schema_definitions.get(&None)) - // .expect("dropped schema required") - // .iter() - // .map(|(_output, definition)| definition.clone()) - // .next() - // .unwrap_or_else(Definition::any); - Ok(Remap { component_key: context.key.clone(), program, @@ -628,27 +603,12 @@ where } #[inline] -fn push_default( - event: Event, - output: &mut TransformOutputsBuf, - // schema_definition: &Arc, -) { - // event - // .metadata_mut() - // .set_schema_definition(schema_definition); - +fn push_default(event: Event, output: &mut TransformOutputsBuf) { output.push(None, event) } #[inline] -fn push_dropped( - event: Event, - output: &mut TransformOutputsBuf, - // schema_definition: &Arc, -) { - // event - // .metadata_mut() - // .set_schema_definition(schema_definition); +fn push_dropped(event: Event, output: &mut TransformOutputsBuf) { output.push(Some(DROPPED), event); } From 257a1686dc970b8b73b56be2f19ea7efc660d02e Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Wed, 14 Jun 2023 10:54:34 -0400 Subject: [PATCH 04/18] fix log to metric test --- lib/vector-core/src/event/metadata.rs | 1 + src/transforms/log_to_metric.rs | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 1f9ed94ede42e..65238abe06726 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -40,6 +40,7 @@ pub struct EventMetadata { /// An identifier for a globally registered schema definition which provides information about /// the event shape (type information, and semantic meaning of fields). + /// This definition is only currently valid for logs, and shouldn't be used for other event types. /// /// TODO(Jean): must not skip serialization to track schemas across restarts. #[serde(default = "default_schema_definition", skip)] diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index aaf7a68e42225..0dcc38f79773a 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, num::ParseFloatError}; use chrono::Utc; @@ -5,6 +6,7 @@ use indexmap::IndexMap; use vector_config::configurable_component; use vector_core::config::LogNamespace; +use crate::config::schema::Definition; use crate::{ config::{ DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, @@ -256,7 +258,10 @@ fn to_metric(config: &MetricConfig, event: &Event) -> Result Date: Wed, 14 Jun 2023 11:30:29 -0400 Subject: [PATCH 05/18] fix all log_to_metric tests --- src/transforms/log_to_metric.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index 0dcc38f79773a..7230eb783ff60 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -513,6 +513,9 @@ mod tests { let event = create_event("status", "42"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -545,6 +548,9 @@ mod tests { event.as_mut_log().insert("method", "post"); event.as_mut_log().insert("code", "200"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -634,6 +640,10 @@ mod tests { let event = create_event("backtrace", "message"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -678,6 +688,9 @@ mod tests { let event = create_event("amount", "33.99"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -708,7 +721,11 @@ mod tests { let event = create_event("amount", "33.99"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); + let metric = do_transform(config, event).await.unwrap(); assert_eq!( @@ -826,6 +843,9 @@ mod tests { event.as_mut_log().insert("status", "42"); event.as_mut_log().insert("backtrace", "message"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let output = do_transform_multiple_events(config, event, 2).await; @@ -880,6 +900,9 @@ mod tests { event.as_mut_log().insert("worker", "abc"); event.as_mut_log().insert("service", "xyz"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let output = do_transform_multiple_events(config, event, 2).await; @@ -923,6 +946,9 @@ mod tests { let event = create_event("user_ip", "1.2.3.4"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -952,6 +978,9 @@ mod tests { let event = create_event("response_time", "2.5"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -982,6 +1011,9 @@ mod tests { let event = create_event("response_time", "2.5"); let mut metadata = event.metadata().clone(); + // definitions aren't valid for metrics yet, it's just set to the default (anything). + metadata.set_schema_definition(&Arc::new(Definition::any())); + metadata.set_parent_id(Arc::new(OutputId::from("in"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); From f0d496f765f41d496343805eed42792610c1d909 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Wed, 14 Jun 2023 16:23:04 -0400 Subject: [PATCH 06/18] fix several tests --- lib/vector-core/src/event/mod.rs | 5 ++ lib/vector-core/src/transform/mod.rs | 45 ++++++++++++++---- src/topology/builder.rs | 25 +++++++++- src/transforms/dedupe.rs | 68 ++++++++++++++++++++++++++++ src/transforms/log_to_metric.rs | 22 ++++----- 5 files changed, 143 insertions(+), 22 deletions(-) diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index 04522793e3436..44ac76ad3169b 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -293,6 +293,11 @@ impl Event { self.metadata_mut().set_source_id(source_id); } + /// Sets the `parent_id` in the event metadata to the provided value. + pub fn set_parent_id(&mut self, parent_id: Arc) { + self.metadata_mut().set_parent_id(parent_id); + } + /// Sets the `source_id` in the event metadata to the provided value. #[must_use] pub fn with_source_id(mut self, source_id: Arc) -> Self { diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index 5c7db14a53369..fb99c53ab1032 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -8,8 +8,9 @@ use vector_common::internal_event::{ use vector_common::json_size::JsonSize; use vector_common::EventDataEq; -use crate::config::OutputId; +use crate::config::{ComponentKey, OutputId}; use crate::event::EventMutRef; +use crate::schema::Definition; use crate::{ config, event::{ @@ -182,6 +183,7 @@ struct TransformOutput { fanout: Fanout, events_sent: Registered, log_schema_definitions: HashMap>, + output_id: Arc, } pub struct TransformOutputs { @@ -193,6 +195,7 @@ pub struct TransformOutputs { impl TransformOutputs { pub fn new( outputs_in: Vec, + component_key: ComponentKey, ) -> (Self, HashMap, fanout::ControlChannel>) { let outputs_spec = outputs_in.clone(); let mut primary_output = None; @@ -216,6 +219,10 @@ impl TransformOutputs { DEFAULT_OUTPUT.into(), )))), log_schema_definitions, + output_id: Arc::new(OutputId { + component: component_key.clone(), + port: None, + }), }); controls.insert(None, control); } @@ -228,6 +235,10 @@ impl TransformOutputs { name.clone().into(), )))), log_schema_definitions, + output_id: Arc::new(OutputId { + component: component_key.clone(), + port: Some(name.clone()), + }), }, ); controls.insert(Some(name.clone()), control); @@ -273,16 +284,12 @@ impl TransformOutputs { buf: &mut OutputBuffer, output: &mut TransformOutput, ) -> Result<(), Box> { - // update schema definitions for log events for event in buf.events_mut() { - if let EventMutRef::Log(log) = event { - if let Some(parent_component_id) = log.metadata_mut().parent_id() { - if let Some(definition) = output.log_schema_definitions.get(parent_component_id) - { - log.metadata_mut().set_schema_definition(definition); - } - } - } + update_runtime_schema_definition( + event, + &output.output_id, + &output.log_schema_definitions, + ); } let count = buf.len(); let byte_size = buf.estimated_json_encoded_size_of(); @@ -292,6 +299,24 @@ impl TransformOutputs { } } +pub fn update_runtime_schema_definition( + // The event that will be updated + mut event: EventMutRef, + // The output_id that the current even is being sent to (will be used as the new parent_id) + output_id: &Arc, + // A mapping of parent OutputId's to definitions, that will be used to lookup the new runtime definition of the event + log_schema_definitions: &HashMap>, +) { + if let EventMutRef::Log(log) = &mut event { + if let Some(parent_component_id) = log.metadata_mut().parent_id() { + if let Some(definition) = log_schema_definitions.get(parent_component_id) { + log.metadata_mut().set_schema_definition(definition); + } + } + } + event.metadata_mut().set_parent_id(Arc::clone(&output_id)) +} + #[derive(Debug, Clone)] pub struct TransformOutputsBuf { primary_buffer: Option, diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 3fa43f198fd3d..e3bf06aca3216 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -21,6 +21,7 @@ use vector_common::internal_event::{ }; use vector_config::NamedComponent; use vector_core::config::LogNamespace; +use vector_core::transform::update_runtime_schema_definition; use vector_core::{ buffers::{ topology::{ @@ -736,6 +737,7 @@ fn build_transform( node.input_details.data_type(), node.typetag, &node.key, + &node.outputs, ), } } @@ -745,7 +747,7 @@ fn build_sync_transform( node: TransformNode, input_rx: BufferReceiver, ) -> (Task, HashMap) { - let (outputs, controls) = TransformOutputs::new(node.outputs); + let (outputs, controls) = TransformOutputs::new(node.outputs, node.key.clone()); let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); let transform = if node.enable_concurrency { @@ -927,6 +929,7 @@ fn build_task_transform( input_type: DataType, typetag: &str, key: &ComponentKey, + outputs: &Vec, ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); @@ -942,8 +945,28 @@ fn build_task_transform( )) }); let events_sent = register!(EventsSent::from(internal_event::Output(None))); + let output_id = Arc::new(OutputId { + component: key.clone(), + port: None, + }); + let schema_definition_map = outputs + .iter() + .find(|x| x.port.is_none()) + .expect("output for default port required for task transforms") + .log_schema_definitions + .clone() + .into_iter() + .map(|(key, value)| (key, Arc::new(value))) + .collect(); + let stream = t .transform(Box::pin(filtered)) + .map(move |mut events| { + for event in events.iter_events_mut() { + update_runtime_schema_definition(event, &output_id, &schema_definition_map); + } + events + }) .inspect(move |events: &EventArray| { events_sent.emit(CountByteSize( events.len(), diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 8375e125763f1..a44e8bc007d21 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -290,6 +290,7 @@ mod tests { use tokio_stream::wrappers::ReceiverStream; use vector_core::config::OutputId; + use crate::config::schema::Definition; use crate::{ event::{Event, LogEvent, Value}, test_util::components::assert_transform_compliance, @@ -363,6 +364,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event differs in matched field so should be output even though it @@ -371,6 +377,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); // Third event has the same value for "matched" as first event, so it should be dropped. @@ -413,6 +424,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event has a different matched field name with the same value, @@ -421,6 +437,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -466,6 +487,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event is the same just with different field order, so it @@ -511,6 +537,12 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event gets output because it's not a dupe. This causes the first @@ -519,6 +551,12 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(new_event, event2); // Third event is a dupe but gets output anyway because the first @@ -568,6 +606,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -576,6 +619,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -621,6 +669,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -629,6 +682,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -667,6 +725,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through as null is different than @@ -675,6 +738,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index 7230eb783ff60..a1a2e3b2996b9 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -515,7 +515,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -550,7 +550,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -642,7 +642,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -690,7 +690,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -724,7 +724,7 @@ mod tests { // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); let metric = do_transform(config, event).await.unwrap(); @@ -758,7 +758,7 @@ mod tests { metadata.set_schema_definition(&Arc::new(Definition::any())); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); let metric = do_transform(config, event).await.unwrap(); @@ -845,7 +845,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let output = do_transform_multiple_events(config, event, 2).await; @@ -902,7 +902,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let output = do_transform_multiple_events(config, event, 2).await; @@ -948,7 +948,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -980,7 +980,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -1013,7 +1013,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); From 9fdc49ba5304601c76c32ca3f026bbc50d89c594 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Wed, 14 Jun 2023 17:19:37 -0400 Subject: [PATCH 07/18] fix more tests --- src/transforms/metric_to_log.rs | 274 +++++++++--------- src/transforms/tag_cardinality_limit/tests.rs | 18 ++ 2 files changed, 162 insertions(+), 130 deletions(-) diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index b1c7c7394fca2..62f1d713d9535 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -104,136 +104,7 @@ impl TransformConfig for MetricToLogConfig { global_log_namespace: LogNamespace, ) -> Vec { let log_namespace = global_log_namespace.merge(self.log_namespace); - let mut schema_definition = - Definition::default_for_namespace(&BTreeSet::from([log_namespace])) - .with_event_field(&owned_value_path!("name"), Kind::bytes(), None) - .with_event_field( - &owned_value_path!("namespace"), - Kind::bytes().or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("tags"), - Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), - None, - ) - .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None) - .with_event_field( - &owned_value_path!("counter"), - Kind::object(Collection::empty().with_known("value", Kind::float())) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("gauge"), - Kind::object(Collection::empty().with_known("value", Kind::float())) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("set"), - Kind::object(Collection::empty().with_known( - "values", - Kind::array(Collection::empty().with_unknown(Kind::bytes())), - )) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("distribution"), - Kind::object( - Collection::empty() - .with_known( - "samples", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("value", Kind::float()) - .with_known("rate", Kind::integer()), - )), - ), - ) - .with_known("statistic", Kind::bytes()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("aggregated_histogram"), - Kind::object( - Collection::empty() - .with_known( - "buckets", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("upper_limit", Kind::float()) - .with_known("count", Kind::integer()), - )), - ), - ) - .with_known("count", Kind::integer()) - .with_known("sum", Kind::float()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("aggregated_summary"), - Kind::object( - Collection::empty() - .with_known( - "quantiles", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("quantile", Kind::float()) - .with_known("value", Kind::float()), - )), - ), - ) - .with_known("count", Kind::integer()) - .with_known("sum", Kind::float()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("sketch"), - Kind::any().or_undefined(), - None, - ); - - match log_namespace { - LogNamespace::Vector => { - // from serializing the Metric (Legacy moves it to another field) - schema_definition = schema_definition.with_event_field( - &owned_value_path!("timestamp"), - Kind::bytes().or_undefined(), - None, - ); - - // This is added as a "marker" field to determine which namespace is being used at runtime. - // This is normally handled automatically by sources, but this is a special case. - schema_definition = schema_definition.with_metadata_field( - &owned_value_path!("vector"), - Kind::object(Collection::empty()), - None, - ); - } - LogNamespace::Legacy => { - if let Some(timestamp_key) = log_schema().timestamp_key() { - schema_definition = - schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None); - } - - schema_definition = schema_definition.with_event_field( - &parse_value_path(log_schema().host_key()).expect("valid host key"), - Kind::bytes().or_undefined(), - None, - ); - } - } + let schema_definition = schema_definition(log_namespace); vec![TransformOutput::new( DataType::Log, @@ -249,6 +120,137 @@ impl TransformConfig for MetricToLogConfig { } } +fn schema_definition(log_namespace: LogNamespace) -> Definition { + let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace])) + .with_event_field(&owned_value_path!("name"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("namespace"), + Kind::bytes().or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("tags"), + Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), + None, + ) + .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("counter"), + Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("gauge"), + Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("set"), + Kind::object(Collection::empty().with_known( + "values", + Kind::array(Collection::empty().with_unknown(Kind::bytes())), + )) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("distribution"), + Kind::object( + Collection::empty() + .with_known( + "samples", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("value", Kind::float()) + .with_known("rate", Kind::integer()), + )), + ), + ) + .with_known("statistic", Kind::bytes()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("aggregated_histogram"), + Kind::object( + Collection::empty() + .with_known( + "buckets", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("upper_limit", Kind::float()) + .with_known("count", Kind::integer()), + )), + ), + ) + .with_known("count", Kind::integer()) + .with_known("sum", Kind::float()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("aggregated_summary"), + Kind::object( + Collection::empty() + .with_known( + "quantiles", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("quantile", Kind::float()) + .with_known("value", Kind::float()), + )), + ), + ) + .with_known("count", Kind::integer()) + .with_known("sum", Kind::float()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("sketch"), + Kind::any().or_undefined(), + None, + ); + + match log_namespace { + LogNamespace::Vector => { + // from serializing the Metric (Legacy moves it to another field) + schema_definition = schema_definition.with_event_field( + &owned_value_path!("timestamp"), + Kind::bytes().or_undefined(), + None, + ); + + // This is added as a "marker" field to determine which namespace is being used at runtime. + // This is normally handled automatically by sources, but this is a special case. + schema_definition = schema_definition.with_metadata_field( + &owned_value_path!("vector"), + Kind::object(Collection::empty()), + None, + ); + } + LogNamespace::Legacy => { + if let Some(timestamp_key) = log_schema().timestamp_key() { + schema_definition = + schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None); + } + + schema_definition = schema_definition.with_event_field( + &parse_value_path(log_schema().host_key()).expect("valid host key"), + Kind::bytes().or_undefined(), + None, + ); + } + } + schema_definition +} + #[derive(Clone, Debug)] pub struct MetricToLog { host_tag: String, @@ -411,6 +413,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = counter.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(counter).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -439,6 +443,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = gauge.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(gauge).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -467,6 +473,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = set.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(set).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -497,6 +505,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = distro.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(distro).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -546,6 +556,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = histo.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(histo).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -593,6 +605,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = summary.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(summary).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index 845883f4ae045..b5b37b1cf6cd1 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -4,6 +4,8 @@ use vector_core::config::OutputId; use vector_core::metric_tags; use super::*; +use crate::config::schema::Definition; +use crate::config::LogNamespace; use crate::event::metric::TagValue; use crate::event::{metric, Event, Metric, MetricTags}; use crate::test_util::components::assert_transform_compliance; @@ -13,6 +15,7 @@ use crate::transforms::tag_cardinality_limit::config::{ use crate::transforms::test::create_topology; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use vrl::compiler::prelude::Kind; #[test] fn generate_config() { @@ -135,6 +138,21 @@ async fn drop_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(OutputId::from("in"))); event3.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event3.set_parent_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any(), [LogNamespace::Legacy]), + )); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::any())); + event3 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::any())); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); // The third event should have been modified to remove "tag1" From 817992cfd10cbdb75202ee5e1a1d3092a06ebcf4 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 15 Jun 2023 15:50:43 -0400 Subject: [PATCH 08/18] fix tests --- lib/vector-core/src/event/metadata.rs | 4 + lib/vector-core/src/event/mod.rs | 7 + src/config/transform.rs | 3 + src/transforms/aggregate.rs | 10 +- src/transforms/filter.rs | 5 + src/transforms/lua/v2/mod.rs | 5 +- src/transforms/reduce/mod.rs | 243 ++++++++++++------ src/transforms/remap.rs | 47 ++-- src/transforms/tag_cardinality_limit/tests.rs | 55 +++- 9 files changed, 255 insertions(+), 124 deletions(-) diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 65238abe06726..519e5f195f7fd 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -196,6 +196,10 @@ impl EventMetadata { pub fn merge(&mut self, other: Self) { self.finalizers.merge(other.finalizers); self.secrets.merge(other.secrets); + + if self.parent_id.is_none() { + self.parent_id = other.parent_id; + } } /// Update the finalizer(s) status. diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index 44ac76ad3169b..f1188f382880d 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -304,6 +304,13 @@ impl Event { self.metadata_mut().set_source_id(source_id); self } + + /// Sets the `source_id` in the event metadata to the provided value. + #[must_use] + pub fn with_parent_id(mut self, parent_id: Arc) -> Self { + self.metadata_mut().set_parent_id(parent_id); + self + } } impl EventDataEq for Event { diff --git a/src/config/transform.rs b/src/config/transform.rs index c2be848d53361..1b9f442ef0786 100644 --- a/src/config/transform.rs +++ b/src/config/transform.rs @@ -195,6 +195,9 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + &self, enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], + + // This only exists for transforms that create logs from non-logs, to know which namespace + // to use, such as `metric_to_log` global_log_namespace: LogNamespace, ) -> Vec; diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index de97b69eafe41..fe39b5b09b81e 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -155,8 +155,10 @@ mod tests { use futures::stream; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; + use vrl::value::Kind; use super::*; + use crate::schema::Definition; use crate::{ event::{metric, Event, Metric}, test_util::components::assert_transform_compliance, @@ -173,7 +175,13 @@ mod tests { kind: metric::MetricKind, value: metric::MetricValue, ) -> Event { - Event::Metric(Metric::new(name, kind, value)).with_source_id(Arc::new(OutputId::from("in"))) + let mut event = Event::Metric(Metric::new(name, kind, value)) + .with_source_id(Arc::new(OutputId::from("in"))) + .with_parent_id(Arc::new(OutputId::from("transform"))); + event.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event } #[test] diff --git a/src/transforms/filter.rs b/src/transforms/filter.rs index 85bee0a071ba1..8191b6794e548 100644 --- a/src/transforms/filter.rs +++ b/src/transforms/filter.rs @@ -103,6 +103,7 @@ mod test { use vector_core::event::{Metric, MetricKind, MetricValue}; use super::*; + use crate::config::schema::Definition; use crate::{ conditions::ConditionConfig, event::{Event, LogEvent}, @@ -128,6 +129,10 @@ mod test { tx.send(log.clone()).await.unwrap(); log.set_source_id(Arc::new(OutputId::from("in"))); + log.set_parent_id(Arc::new(OutputId::from("transform"))); + log.metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(out.recv().await.unwrap(), log); let metric = Event::from(Metric::new( diff --git a/src/transforms/lua/v2/mod.rs b/src/transforms/lua/v2/mod.rs index ca0790546b390..9e8172ba942cf 100644 --- a/src/transforms/lua/v2/mod.rs +++ b/src/transforms/lua/v2/mod.rs @@ -961,9 +961,12 @@ mod tests { MetricValue::Counter { value: 1.0 }, ); - let expected = metric + let mut expected = metric .clone() .with_value(MetricValue::Counter { value: 2.0 }); + expected + .metadata_mut() + .set_parent_id(Arc::new(OutputId::from("transform"))); tx.send(metric.into()).await.unwrap(); diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 455a4b142e4d6..3f53046e0b472 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -26,6 +26,7 @@ use crate::{ mod merge_strategy; +use crate::config::schema::Definition; use crate::event::Value; pub use merge_strategy::*; use vector_core::config::LogNamespace; @@ -133,94 +134,102 @@ impl TransformConfig for ReduceConfig { input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { - let mut output_definitions = HashMap::new(); - - for (output, input) in input_definitions { - let mut schema_definition = input.clone(); - - for (key, merge_strategy) in self.merge_strategies.iter() { - let key = if let Ok(key) = parse_target_path(key) { - key - } else { - continue; - }; - - let input_kind = match key.prefix { - PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), - PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), - }; - - let new_kind = match merge_strategy { - MergeStrategy::Discard | MergeStrategy::Retain => { - /* does not change the type */ - input_kind.clone() + // Events may be combined, so there isn't a true single "source" for events. + // All of the definitions must be merged. + let merged_definition: Definition = input_definitions + .iter() + .map(|(_output, definition)| definition.clone()) + .reduce(Definition::merge) + .unwrap_or_else(Definition::any); + + // for (output, input) in input_definitions { + let mut schema_definition = merged_definition; + + for (key, merge_strategy) in self.merge_strategies.iter() { + let key = if let Ok(key) = parse_target_path(key) { + key + } else { + continue; + }; + + let input_kind = match key.prefix { + PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), + PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), + }; + + let new_kind = match merge_strategy { + MergeStrategy::Discard | MergeStrategy::Retain => { + /* does not change the type */ + input_kind.clone() + } + MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => { + // only keeps integer / float values + match (input_kind.contains_integer(), input_kind.contains_float()) { + (true, true) => Kind::float().or_integer(), + (true, false) => Kind::integer(), + (false, true) => Kind::float(), + (false, false) => Kind::undefined(), } - MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => { - // only keeps integer / float values - match (input_kind.contains_integer(), input_kind.contains_float()) { - (true, true) => Kind::float().or_integer(), - (true, false) => Kind::integer(), - (false, true) => Kind::float(), - (false, false) => Kind::undefined(), - } + } + MergeStrategy::Array => { + let unknown_kind = input_kind.clone(); + Kind::array(Collection::empty().with_unknown(unknown_kind)) + } + MergeStrategy::Concat => { + let mut new_kind = Kind::never(); + + if input_kind.contains_bytes() { + new_kind.add_bytes(); } - MergeStrategy::Array => { - let unknown_kind = input_kind.clone(); - Kind::array(Collection::empty().with_unknown(unknown_kind)) + if let Some(array) = input_kind.as_array() { + // array elements can be either any type that the field can be, or any + // element of the array + let array_elements = array.reduced_kind().union(input_kind.without_array()); + new_kind.add_array(Collection::empty().with_unknown(array_elements)); } - MergeStrategy::Concat => { - let mut new_kind = Kind::never(); - - if input_kind.contains_bytes() { - new_kind.add_bytes(); - } - if let Some(array) = input_kind.as_array() { - // array elements can be either any type that the field can be, or any - // element of the array - let array_elements = - array.reduced_kind().union(input_kind.without_array()); - new_kind.add_array(Collection::empty().with_unknown(array_elements)); - } - new_kind + new_kind + } + MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => { + // can only produce bytes (or undefined) + if input_kind.contains_bytes() { + Kind::bytes() + } else { + Kind::undefined() } - MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => { - // can only produce bytes (or undefined) - if input_kind.contains_bytes() { - Kind::bytes() - } else { - Kind::undefined() - } + } + MergeStrategy::ShortestArray | MergeStrategy::LongestArray => { + if let Some(array) = input_kind.as_array() { + Kind::array(array.clone()) + } else { + Kind::undefined() } - MergeStrategy::ShortestArray | MergeStrategy::LongestArray => { - if let Some(array) = input_kind.as_array() { - Kind::array(array.clone()) - } else { - Kind::undefined() - } + } + MergeStrategy::FlatUnique => { + let mut array_elements = input_kind.without_array().without_object(); + if let Some(array) = input_kind.as_array() { + array_elements = array_elements.union(array.reduced_kind()); } - MergeStrategy::FlatUnique => { - let mut array_elements = input_kind.without_array().without_object(); - if let Some(array) = input_kind.as_array() { - array_elements = array_elements.union(array.reduced_kind()); - } - if let Some(object) = input_kind.as_object() { - array_elements = array_elements.union(object.reduced_kind()); - } - Kind::array(Collection::empty().with_unknown(array_elements)) + if let Some(object) = input_kind.as_object() { + array_elements = array_elements.union(object.reduced_kind()); } - }; + Kind::array(Collection::empty().with_unknown(array_elements)) + } + }; - // all of the merge strategies are optional. They won't produce a value unless a value actually exists - let new_kind = if input_kind.contains_undefined() { - new_kind.or_undefined() - } else { - new_kind - }; + // all of the merge strategies are optional. They won't produce a value unless a value actually exists + let new_kind = if input_kind.contains_undefined() { + new_kind.or_undefined() + } else { + new_kind + }; - schema_definition = schema_definition.with_field(&key, new_kind, None); - } + schema_definition = schema_definition.with_field(&key, new_kind, None); + } - output_definitions.insert(output.clone(), schema_definition); + // the same schema definition is used for all inputs + let mut output_definitions = HashMap::new(); + for (output, _input) in input_definitions { + output_definitions.insert(output.clone(), schema_definition.clone()); } vec![TransformOutput::new(DataType::Log, output_definitions)] @@ -474,12 +483,15 @@ impl TaskTransform for Reduce { #[cfg(test)] mod test { + use enrichment::TableRegistry; use serde_json::json; + use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vrl::value::Kind; use super::*; + use crate::config::schema::Definition; use crate::event::{LogEvent, Value}; use crate::test_util::components::assert_transform_compliance; use crate::transforms::test::create_topology; @@ -528,18 +540,33 @@ group_by = [ "request_id" ] .schema_definitions(true) .clone(); + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (tx, rx) = mpsc::channel(1); let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("counter", 1); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); e_2.insert("request_id", "2"); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_3 = LogEvent::from("test message 3"); e_3.insert("counter", 3); @@ -603,6 +630,18 @@ merge_strategies.baz = "max" assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); @@ -610,7 +649,9 @@ merge_strategies.baz = "max" e_1.insert("bar", "first bar"); e_1.insert("baz", 2); e_1.insert("request_id", "1"); - let metadata = e_1.metadata().clone(); + let mut metadata = e_1.metadata().clone(); + metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); @@ -660,17 +701,32 @@ group_by = [ "request_id" ] assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("counter", 1); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); let mut e_3 = LogEvent::from("test message 3"); @@ -852,20 +908,37 @@ merge_strategies.bar = "concat" assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("foo", json!([1, 3])); e_1.insert("bar", json!([1, 3])); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); + tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("foo", json!([2, 4])); e_2.insert("bar", json!([2, 4])); e_2.insert("request_id", "2"); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); let mut e_3 = LogEvent::from("test message 3"); diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index 059ce779d3337..4c18c1e2377a0 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -799,10 +799,6 @@ mod tests { let result1 = transform_one(&mut tform, event1).unwrap(); assert_eq!(get_field_string(&result1, "message"), "event1"); assert_eq!(get_field_string(&result1, "foo"), "bar"); - assert_eq!( - result1.metadata().schema_definition(), - &test_default_schema_definition() - ); assert!(tform.runner().runtime.is_empty()); let event2 = { @@ -812,10 +808,6 @@ mod tests { let result2 = transform_one(&mut tform, event2).unwrap(); assert_eq!(get_field_string(&result2, "message"), "event2"); assert_eq!(result2.as_log().get("foo"), Some(&Value::Null)); - assert_eq!( - result2.metadata().schema_definition(), - &test_default_schema_definition() - ); assert!(tform.runner().runtime.is_empty()); } @@ -847,11 +839,6 @@ mod tests { assert_eq!(get_field_string(&result, "foo"), "bar"); assert_eq!(get_field_string(&result, "bar"), "baz"); assert_eq!(get_field_string(&result, "copy"), "buz"); - - assert_eq!( - result.metadata().schema_definition(), - &test_default_schema_definition() - ); } #[test] @@ -885,17 +872,8 @@ mod tests { let r = result.next().unwrap(); assert_eq!(get_field_string(&r, "message"), "foo"); - assert_eq!( - r.metadata().schema_definition(), - &test_default_schema_definition() - ); let r = result.next().unwrap(); assert_eq!(get_field_string(&r, "message"), "bar"); - - assert_eq!( - r.metadata().schema_definition(), - &test_default_schema_definition() - ); } #[test] @@ -1061,7 +1039,9 @@ mod tests { "zork", MetricKind::Incremental, MetricValue::Counter { value: 1.0 }, - metadata.with_schema_definition(&Arc::new(test_default_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + metadata ) .with_namespace(Some("zerk")) .with_tags(Some(metric_tags! { @@ -1271,8 +1251,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_default_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "hello" => "world", @@ -1289,8 +1272,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_dropped_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "hello" => "goodbye", @@ -1310,8 +1296,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_dropped_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "not_hello" => "oops", diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index b5b37b1cf6cd1..4094055aa675f 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -91,6 +91,16 @@ async fn drop_event(config: TagCardinalityLimitConfig) { event1.set_source_id(Arc::new(OutputId::from("in"))); event2.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); // Third value rejected since value_limit is 2. @@ -142,16 +152,15 @@ async fn drop_tag(config: TagCardinalityLimitConfig) { event2.set_parent_id(Arc::new(OutputId::from("transform"))); event3.set_parent_id(Arc::new(OutputId::from("transform"))); - // definitions aren't valid for metrics yet, it's just set to the default (anything). event1.metadata_mut().set_schema_definition(&Arc::new( - Definition::new_with_default_metadata(Kind::any(), [LogNamespace::Legacy]), + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), )); - event2 - .metadata_mut() - .set_schema_definition(&Arc::new(Definition::any())); - event3 - .metadata_mut() - .set_schema_definition(&Arc::new(Definition::any())); assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); @@ -225,6 +234,21 @@ async fn drop_tag_multi_value(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(OutputId::from("in"))); event3.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event3.set_parent_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + drop(tx); topology.stop().await; @@ -275,6 +299,21 @@ async fn separate_value_limit_per_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(OutputId::from("in"))); event3.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event3.set_parent_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); assert_eq!(new_event3, Some(event3)); From c48e6c719c58cb769f1b031799cc3e0cd60de190 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Fri, 16 Jun 2023 09:07:33 -0400 Subject: [PATCH 09/18] fix more tests --- src/sources/opentelemetry/tests.rs | 8 ++++- src/topology/test/compliance.rs | 13 ++++++++ src/topology/test/mod.rs | 52 ++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 1 deletion(-) diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 798759fa1138d..2ee2e65f1e7b7 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -10,10 +10,12 @@ use opentelemetry_proto::proto::{ }; use similar_asserts::assert_eq; use std::collections::BTreeMap; +use std::sync::Arc; use tonic::Request; use vector_core::config::LogNamespace; use vrl::value; +use crate::config::OutputId; use crate::{ config::{SourceConfig, SourceContext}, event::{into_event_stream, Event, EventStatus, LogEvent, Value}, @@ -269,7 +271,11 @@ async fn receive_grpc_logs_legacy_namespace() { ("observed_timestamp", Utc.timestamp_nanos(2).into()), ("source_type", "opentelemetry".into()), ]); - let expect_event = Event::from(LogEvent::from(expect_vec)); + let mut expect_event = Event::from(LogEvent::from(expect_vec)); + expect_event.set_parent_id(Arc::new(OutputId { + component: "test".into(), + port: Some("logs".into()), + })); assert_eq!(actual_event, expect_event); }) .await; diff --git a/src/topology/test/compliance.rs b/src/topology/test/compliance.rs index c7c9d3c349818..093805b186080 100644 --- a/src/topology/test/compliance.rs +++ b/src/topology/test/compliance.rs @@ -6,6 +6,7 @@ use vector_core::{ event::{Event, EventArray, EventContainer, LogEvent}, }; +use crate::config::schema::Definition; use crate::{ config::{unit_test::UnitTestSourceConfig, ConfigBuilder}, test_util::{ @@ -59,6 +60,10 @@ async fn test_function_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(OutputId::from("in"))); + original_event.set_parent_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); @@ -80,6 +85,10 @@ async fn test_sync_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(OutputId::from("in"))); + original_event.set_parent_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); @@ -100,6 +109,10 @@ async fn test_task_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(OutputId::from("in"))); + original_event.set_parent_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index babce13f20d90..1185fb76ecbbf 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -7,6 +7,7 @@ use std::{ }, }; +use crate::schema::Definition; use crate::{ config::{Config, ConfigDiff, SinkOuter}, event::{into_event_stream, Event, EventArray, EventContainer, LogEvent}, @@ -149,6 +150,10 @@ async fn topology_source_and_sink() { let res = out1.flat_map(into_event_stream).collect::>().await; event.set_source_id(Arc::new(OutputId::from("in1"))); + event.set_parent_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event], res); } @@ -184,6 +189,16 @@ async fn topology_multiple_sources() { event1.set_source_id(Arc::new(OutputId::from("in1"))); event2.set_source_id(Arc::new(OutputId::from("in2"))); + event1.set_parent_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + + event2.set_parent_id(Arc::new(OutputId::from("test"))); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(out_event1, Some(event1.into())); assert_eq!(out_event2, Some(event2.into())); } @@ -218,6 +233,12 @@ async fn topology_multiple_sinks() { // We should see that both sinks got the exact same event: event.set_source_id(Arc::new(OutputId::from("in1"))); + + event.set_parent_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + let expected = vec![event]; assert_eq!(expected, res1); assert_eq!(expected, res2); @@ -293,6 +314,11 @@ async fn topology_remove_one_source() { event1.set_source_id(Arc::new(OutputId::from("in1"))); + event1.set_parent_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + let res = h_out1.await.unwrap(); assert_eq!(vec![event1], res); } @@ -332,6 +358,11 @@ async fn topology_remove_one_sink() { event.set_source_id(Arc::new(OutputId::from("in1"))); + event.set_parent_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event], res1); assert_eq!(Vec::::new(), res2); } @@ -442,6 +473,10 @@ async fn topology_swap_source() { assert_eq!(Vec::::new(), res1); event2.set_source_id(Arc::new(OutputId::from("in2"))); + event2.set_parent_id(Arc::new(OutputId::from("test"))); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event2], res2); } @@ -554,6 +589,10 @@ async fn topology_swap_sink() { assert_eq!(Vec::::new(), res1); event1.set_source_id(Arc::new(OutputId::from("in1"))); + event1.set_parent_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event1], res2); } @@ -663,6 +702,15 @@ async fn topology_rebuild_connected() { event1.set_source_id(Arc::new(OutputId::from("in1"))); event2.set_source_id(Arc::new(OutputId::from("in1"))); + event1.set_parent_id(Arc::new(OutputId::from("test"))); + event2.set_parent_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event1, event2], res); } @@ -715,6 +763,10 @@ async fn topology_rebuild_connected_transform() { assert_eq!(Vec::::new(), res1); event.set_source_id(Arc::new(OutputId::from("in1"))); + event.set_parent_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event], res2); } From 4e20a8974995674366834a2e746aa8282035fddd Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Fri, 16 Jun 2023 09:14:31 -0400 Subject: [PATCH 10/18] clippy --- lib/vector-core/src/transform/mod.rs | 5 +++-- src/topology/builder.rs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index fb99c53ab1032..175fbab05000d 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -195,7 +195,7 @@ pub struct TransformOutputs { impl TransformOutputs { pub fn new( outputs_in: Vec, - component_key: ComponentKey, + component_key: &ComponentKey, ) -> (Self, HashMap, fanout::ControlChannel>) { let outputs_spec = outputs_in.clone(); let mut primary_output = None; @@ -299,6 +299,7 @@ impl TransformOutputs { } } +#[allow(clippy::implicit_hasher)] pub fn update_runtime_schema_definition( // The event that will be updated mut event: EventMutRef, @@ -314,7 +315,7 @@ pub fn update_runtime_schema_definition( } } } - event.metadata_mut().set_parent_id(Arc::clone(&output_id)) + event.metadata_mut().set_parent_id(Arc::clone(output_id)); } #[derive(Debug, Clone)] diff --git a/src/topology/builder.rs b/src/topology/builder.rs index e3bf06aca3216..19b6be83b8614 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -747,7 +747,7 @@ fn build_sync_transform( node: TransformNode, input_rx: BufferReceiver, ) -> (Task, HashMap) { - let (outputs, controls) = TransformOutputs::new(node.outputs, node.key.clone()); + let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key); let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); let transform = if node.enable_concurrency { @@ -929,7 +929,7 @@ fn build_task_transform( input_type: DataType, typetag: &str, key: &ComponentKey, - outputs: &Vec, + outputs: &[TransformOutput], ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); From ac268f6e367ea3f285288b64b83e9d8f4a7823d6 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Fri, 16 Jun 2023 09:32:24 -0400 Subject: [PATCH 11/18] cleanup --- lib/vector-core/src/event/metadata.rs | 4 ---- lib/vector-core/src/transform/mod.rs | 7 +++++++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 519e5f195f7fd..65238abe06726 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -196,10 +196,6 @@ impl EventMetadata { pub fn merge(&mut self, other: Self) { self.finalizers.merge(other.finalizers); self.secrets.merge(other.secrets); - - if self.parent_id.is_none() { - self.parent_id = other.parent_id; - } } /// Update the finalizer(s) status. diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index 175fbab05000d..d60fbdc630e16 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -313,6 +313,13 @@ pub fn update_runtime_schema_definition( if let Some(definition) = log_schema_definitions.get(parent_component_id) { log.metadata_mut().set_schema_definition(definition); } + } else { + // there is no parent defined. That means this ven originated from a component that + // isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the + // schema definitions _must_ be the same, so the first one is picked + if let Some(definition) = log_schema_definitions.values().next() { + log.metadata_mut().set_schema_definition(definition); + } } } event.metadata_mut().set_parent_id(Arc::clone(output_id)); From 554ac9af741becc85dcaa691b2bc79d3e7f741ca Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Fri, 16 Jun 2023 09:39:55 -0400 Subject: [PATCH 12/18] fix spelling --- lib/vector-core/src/transform/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index d60fbdc630e16..4d29a8e7d6c38 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -314,7 +314,7 @@ pub fn update_runtime_schema_definition( log.metadata_mut().set_schema_definition(definition); } } else { - // there is no parent defined. That means this ven originated from a component that + // there is no parent defined. That means this event originated from a component that // isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the // schema definitions _must_ be the same, so the first one is picked if let Some(definition) = log_schema_definitions.values().next() { From 01694449116fadc8164870b0ceff92fd34c470bf Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 22 Jun 2023 10:22:58 -0400 Subject: [PATCH 13/18] rename parent_id to upstream_id --- lib/vector-core/src/event/metadata.rs | 14 +++++----- lib/vector-core/src/event/mod.rs | 12 ++++----- lib/vector-core/src/transform/mod.rs | 4 +-- src/source_sender/mod.rs | 4 +-- src/sources/opentelemetry/tests.rs | 2 +- src/topology/test/compliance.rs | 6 ++--- src/topology/test/mod.rs | 22 ++++++++-------- src/transforms/aggregate.rs | 2 +- src/transforms/dedupe.rs | 26 +++++++++---------- src/transforms/filter.rs | 2 +- src/transforms/log_to_metric.rs | 22 ++++++++-------- src/transforms/lua/v2/mod.rs | 2 +- src/transforms/metric_to_log.rs | 12 ++++----- src/transforms/reduce/mod.rs | 14 +++++----- src/transforms/tag_cardinality_limit/tests.rs | 22 ++++++++-------- 15 files changed, 83 insertions(+), 83 deletions(-) diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 65238abe06726..61aeddced97a4 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -36,7 +36,7 @@ pub struct EventMetadata { /// determine which schema definition to attach to an event in transforms. /// This should always have a value set for events in transforms. It will always be `None` /// in a source, and there is currently no use-case for reading the value in a sink. - parent_id: Option>, + upstream_id: Option>, /// An identifier for a globally registered schema definition which provides information about /// the event shape (type information, and semantic meaning of fields). @@ -89,8 +89,8 @@ impl EventMetadata { /// Returns a reference to the metadata parent id. This is the `OutputId` /// of the previous component the event was sent through (if any). #[must_use] - pub fn parent_id(&self) -> Option<&OutputId> { - self.parent_id.as_deref() + pub fn upstream_id(&self) -> Option<&OutputId> { + self.upstream_id.as_deref() } /// Sets the `source_id` in the metadata to the provided value. @@ -98,9 +98,9 @@ impl EventMetadata { self.source_id = Some(source_id); } - /// Sets the `parent_id` in the metadata to the provided value. - pub fn set_parent_id(&mut self, parent_id: Arc) { - self.parent_id = Some(parent_id); + /// Sets the `upstream_id` in the metadata to the provided value. + pub fn set_upstream_id(&mut self, upstream_id: Arc) { + self.upstream_id = Some(upstream_id); } /// Return the datadog API key, if it exists @@ -132,7 +132,7 @@ impl Default for EventMetadata { finalizers: Default::default(), schema_definition: default_schema_definition(), source_id: None, - parent_id: None, + upstream_id: None, } } } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index f1188f382880d..0dd36199a6eb8 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -293,9 +293,9 @@ impl Event { self.metadata_mut().set_source_id(source_id); } - /// Sets the `parent_id` in the event metadata to the provided value. - pub fn set_parent_id(&mut self, parent_id: Arc) { - self.metadata_mut().set_parent_id(parent_id); + /// Sets the `upstream_id` in the event metadata to the provided value. + pub fn set_upstream_id(&mut self, upstream_id: Arc) { + self.metadata_mut().set_upstream_id(upstream_id); } /// Sets the `source_id` in the event metadata to the provided value. @@ -305,10 +305,10 @@ impl Event { self } - /// Sets the `source_id` in the event metadata to the provided value. + /// Sets the `upstream_id` in the event metadata to the provided value. #[must_use] - pub fn with_parent_id(mut self, parent_id: Arc) -> Self { - self.metadata_mut().set_parent_id(parent_id); + pub fn with_upstream_id(mut self, upstream_id: Arc) -> Self { + self.metadata_mut().set_upstream_id(upstream_id); self } } diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index 4d29a8e7d6c38..a9c67f504d22f 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -309,7 +309,7 @@ pub fn update_runtime_schema_definition( log_schema_definitions: &HashMap>, ) { if let EventMutRef::Log(log) = &mut event { - if let Some(parent_component_id) = log.metadata_mut().parent_id() { + if let Some(parent_component_id) = log.metadata_mut().upstream_id() { if let Some(definition) = log_schema_definitions.get(parent_component_id) { log.metadata_mut().set_schema_definition(definition); } @@ -322,7 +322,7 @@ pub fn update_runtime_schema_definition( } } } - event.metadata_mut().set_parent_id(Arc::clone(output_id)); + event.metadata_mut().set_upstream_id(Arc::clone(output_id)); } #[derive(Debug, Clone)] diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 1d061f0396bf1..e7140fb931cea 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -260,7 +260,7 @@ struct Inner { events_sent: Registered, /// The schema definition that will be attached to Log events sent through here log_definition: Option>, - /// The OutputId related to this source sender. This is set as the `parent_id` in + /// The OutputId related to this source sender. This is set as the `upstream_id` in /// `EventMetadata` for all event sent through here. output_id: Arc, } @@ -312,7 +312,7 @@ impl Inner { } event .metadata_mut() - .set_parent_id(Arc::clone(&self.output_id)); + .set_upstream_id(Arc::clone(&self.output_id)); }); let byte_size = events.estimated_json_encoded_size_of(); diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 2ee2e65f1e7b7..fc538efad199d 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -272,7 +272,7 @@ async fn receive_grpc_logs_legacy_namespace() { ("source_type", "opentelemetry".into()), ]); let mut expect_event = Event::from(LogEvent::from(expect_vec)); - expect_event.set_parent_id(Arc::new(OutputId { + expect_event.set_upstream_id(Arc::new(OutputId { component: "test".into(), port: Some("logs".into()), })); diff --git a/src/topology/test/compliance.rs b/src/topology/test/compliance.rs index 093805b186080..95b24d62bb14a 100644 --- a/src/topology/test/compliance.rs +++ b/src/topology/test/compliance.rs @@ -60,7 +60,7 @@ async fn test_function_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(OutputId::from("in"))); - original_event.set_parent_id(Arc::new(OutputId::from("transform"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); original_event .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -85,7 +85,7 @@ async fn test_sync_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(OutputId::from("in"))); - original_event.set_parent_id(Arc::new(OutputId::from("transform"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); original_event .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -109,7 +109,7 @@ async fn test_task_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(OutputId::from("in"))); - original_event.set_parent_id(Arc::new(OutputId::from("transform"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); original_event .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index 1185fb76ecbbf..e0ea0dffd47b3 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -150,7 +150,7 @@ async fn topology_source_and_sink() { let res = out1.flat_map(into_event_stream).collect::>().await; event.set_source_id(Arc::new(OutputId::from("in1"))); - event.set_parent_id(Arc::new(OutputId::from("test"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); event .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -189,12 +189,12 @@ async fn topology_multiple_sources() { event1.set_source_id(Arc::new(OutputId::from("in1"))); event2.set_source_id(Arc::new(OutputId::from("in2"))); - event1.set_parent_id(Arc::new(OutputId::from("test"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); event1 .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); - event2.set_parent_id(Arc::new(OutputId::from("test"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); event2 .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -234,7 +234,7 @@ async fn topology_multiple_sinks() { // We should see that both sinks got the exact same event: event.set_source_id(Arc::new(OutputId::from("in1"))); - event.set_parent_id(Arc::new(OutputId::from("test"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); event .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -314,7 +314,7 @@ async fn topology_remove_one_source() { event1.set_source_id(Arc::new(OutputId::from("in1"))); - event1.set_parent_id(Arc::new(OutputId::from("test"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); event1 .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -358,7 +358,7 @@ async fn topology_remove_one_sink() { event.set_source_id(Arc::new(OutputId::from("in1"))); - event.set_parent_id(Arc::new(OutputId::from("test"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); event .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -473,7 +473,7 @@ async fn topology_swap_source() { assert_eq!(Vec::::new(), res1); event2.set_source_id(Arc::new(OutputId::from("in2"))); - event2.set_parent_id(Arc::new(OutputId::from("test"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); event2 .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -589,7 +589,7 @@ async fn topology_swap_sink() { assert_eq!(Vec::::new(), res1); event1.set_source_id(Arc::new(OutputId::from("in1"))); - event1.set_parent_id(Arc::new(OutputId::from("test"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); event1 .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -702,8 +702,8 @@ async fn topology_rebuild_connected() { event1.set_source_id(Arc::new(OutputId::from("in1"))); event2.set_source_id(Arc::new(OutputId::from("in1"))); - event1.set_parent_id(Arc::new(OutputId::from("test"))); - event2.set_parent_id(Arc::new(OutputId::from("test"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); event1 .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); @@ -763,7 +763,7 @@ async fn topology_rebuild_connected_transform() { assert_eq!(Vec::::new(), res1); event.set_source_id(Arc::new(OutputId::from("in1"))); - event.set_parent_id(Arc::new(OutputId::from("test"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); event .metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index fe39b5b09b81e..1ae2f73160971 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -177,7 +177,7 @@ mod tests { ) -> Event { let mut event = Event::Metric(Metric::new(name, kind, value)) .with_source_id(Arc::new(OutputId::from("in"))) - .with_parent_id(Arc::new(OutputId::from("transform"))); + .with_upstream_id(Arc::new(OutputId::from("transform"))); event.metadata_mut().set_schema_definition(&Arc::new( Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), )); diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index a44e8bc007d21..47e7b599bdd23 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -364,7 +364,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event1 .metadata_mut() @@ -377,7 +377,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event2 .metadata_mut() @@ -424,7 +424,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event1 .metadata_mut() @@ -437,7 +437,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event2 .metadata_mut() @@ -487,7 +487,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event1 .metadata_mut() @@ -537,7 +537,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event1 @@ -551,7 +551,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event2 .metadata_mut() @@ -606,7 +606,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event1 .metadata_mut() @@ -619,7 +619,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event2 .metadata_mut() @@ -669,7 +669,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event1 .metadata_mut() @@ -682,7 +682,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event2 .metadata_mut() @@ -725,7 +725,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event1 .metadata_mut() @@ -738,7 +738,7 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); // the schema definition is copied from the source for dedupe event2 .metadata_mut() diff --git a/src/transforms/filter.rs b/src/transforms/filter.rs index 8191b6794e548..76bfe94265ae2 100644 --- a/src/transforms/filter.rs +++ b/src/transforms/filter.rs @@ -129,7 +129,7 @@ mod test { tx.send(log.clone()).await.unwrap(); log.set_source_id(Arc::new(OutputId::from("in"))); - log.set_parent_id(Arc::new(OutputId::from("transform"))); + log.set_upstream_id(Arc::new(OutputId::from("transform"))); log.metadata_mut() .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index a1a2e3b2996b9..5400c61bde518 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -515,7 +515,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -550,7 +550,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -642,7 +642,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -690,7 +690,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -724,7 +724,7 @@ mod tests { // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); let metric = do_transform(config, event).await.unwrap(); @@ -758,7 +758,7 @@ mod tests { metadata.set_schema_definition(&Arc::new(Definition::any())); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); let metric = do_transform(config, event).await.unwrap(); @@ -845,7 +845,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let output = do_transform_multiple_events(config, event, 2).await; @@ -902,7 +902,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let output = do_transform_multiple_events(config, event, 2).await; @@ -948,7 +948,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -980,7 +980,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -1013,7 +1013,7 @@ mod tests { let mut metadata = event.metadata().clone(); // definitions aren't valid for metrics yet, it's just set to the default (anything). metadata.set_schema_definition(&Arc::new(Definition::any())); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_source_id(Arc::new(OutputId::from("in"))); let metric = do_transform(config, event).await.unwrap(); diff --git a/src/transforms/lua/v2/mod.rs b/src/transforms/lua/v2/mod.rs index 9e8172ba942cf..a6a65cde824ef 100644 --- a/src/transforms/lua/v2/mod.rs +++ b/src/transforms/lua/v2/mod.rs @@ -966,7 +966,7 @@ mod tests { .with_value(MetricValue::Counter { value: 2.0 }); expected .metadata_mut() - .set_parent_id(Arc::new(OutputId::from("transform"))); + .set_upstream_id(Arc::new(OutputId::from("transform"))); tx.send(metric.into()).await.unwrap(); diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index 62f1d713d9535..948cb0fe86379 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -413,7 +413,7 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = counter.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(counter).await.unwrap(); @@ -443,7 +443,7 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = gauge.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(gauge).await.unwrap(); @@ -473,7 +473,7 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = set.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(set).await.unwrap(); @@ -505,7 +505,7 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = distro.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(distro).await.unwrap(); @@ -556,7 +556,7 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = histo.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(histo).await.unwrap(); @@ -605,7 +605,7 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = summary.metadata().clone(); metadata.set_source_id(Arc::new(OutputId::from("in"))); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(summary).await.unwrap(); diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 3f53046e0b472..c26da52a77555 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -558,14 +558,14 @@ group_by = [ "request_id" ] e_1.insert("counter", 1); e_1.insert("request_id", "1"); let mut metadata_1 = e_1.metadata().clone(); - metadata_1.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); e_2.insert("request_id", "2"); let mut metadata_2 = e_2.metadata().clone(); - metadata_2.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_3 = LogEvent::from("test message 3"); @@ -650,7 +650,7 @@ merge_strategies.baz = "max" e_1.insert("baz", 2); e_1.insert("request_id", "1"); let mut metadata = e_1.metadata().clone(); - metadata.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); @@ -718,14 +718,14 @@ group_by = [ "request_id" ] e_1.insert("counter", 1); e_1.insert("request_id", "1"); let mut metadata_1 = e_1.metadata().clone(); - metadata_1.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); let mut metadata_2 = e_2.metadata().clone(); - metadata_2.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); @@ -927,7 +927,7 @@ merge_strategies.bar = "concat" e_1.insert("bar", json!([1, 3])); e_1.insert("request_id", "1"); let mut metadata_1 = e_1.metadata().clone(); - metadata_1.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); @@ -937,7 +937,7 @@ merge_strategies.bar = "concat" e_2.insert("bar", json!([2, 4])); e_2.insert("request_id", "2"); let mut metadata_2 = e_2.metadata().clone(); - metadata_2.set_parent_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index 4094055aa675f..8afe945a51ae1 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -91,8 +91,8 @@ async fn drop_event(config: TagCardinalityLimitConfig) { event1.set_source_id(Arc::new(OutputId::from("in"))); event2.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); event1.metadata_mut().set_schema_definition(&Arc::new( Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), @@ -148,9 +148,9 @@ async fn drop_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(OutputId::from("in"))); event3.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); - event3.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); event1.metadata_mut().set_schema_definition(&Arc::new( Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), @@ -234,9 +234,9 @@ async fn drop_tag_multi_value(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(OutputId::from("in"))); event3.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); - event3.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); // definitions aren't valid for metrics yet, it's just set to the default (anything). event1.metadata_mut().set_schema_definition(&Arc::new( @@ -299,9 +299,9 @@ async fn separate_value_limit_per_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(OutputId::from("in"))); event3.set_source_id(Arc::new(OutputId::from("in"))); - event1.set_parent_id(Arc::new(OutputId::from("transform"))); - event2.set_parent_id(Arc::new(OutputId::from("transform"))); - event3.set_parent_id(Arc::new(OutputId::from("transform"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); // definitions aren't valid for metrics yet, it's just set to the default (anything). event1.metadata_mut().set_schema_definition(&Arc::new( From 9a1e7b9cfeafb8c858e4059781b063bead550a28 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 22 Jun 2023 11:21:27 -0400 Subject: [PATCH 14/18] add semantic meaning regardless of log namespacing setting --- lib/vector-core/src/config/mod.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index fb5e20767fe40..44f74e6005633 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -172,11 +172,7 @@ impl SourceOutput { } else { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); - - if definition.log_namespaces().contains(&LogNamespace::Vector) { - new_definition.add_meanings(definition.meanings()); - } - + new_definition.add_meanings(definition.meanings()); new_definition } }) @@ -243,11 +239,7 @@ impl TransformOutput { .map(|(output, definition)| { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); - - if definition.log_namespaces().contains(&LogNamespace::Vector) { - new_definition.add_meanings(definition.meanings()); - } - + new_definition.add_meanings(definition.meanings()); (output.clone(), new_definition) }) .collect() From 5b9a2b982cc75c8c5abd20950298c1b9dede2d86 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 22 Jun 2023 13:08:35 -0400 Subject: [PATCH 15/18] fix tests --- lib/vector-core/Cargo.toml | 2 +- lib/vector-core/src/config/mod.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index d4e997eecdd05..c8429db4c5e72 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -94,7 +94,7 @@ rand = "0.8.5" rand_distr = "0.4.3" tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] } vector-common = { path = "../vector-common", default-features = false, features = ["test"] } -vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua"] } +vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua", "test"] } [features] api = ["dep:async-graphql"] diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 44f74e6005633..654c6b5fe3c7a 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -596,7 +596,10 @@ mod test { // There should be the default legacy definition without schemas enabled. assert_eq!( - Some(schema::Definition::default_legacy_namespace()), + Some( + schema::Definition::default_legacy_namespace() + .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork") + ), output.schema_definition(false) ); } From 3894a97a0feb8358ffe4ab3f730f87164b865fd3 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Mon, 26 Jun 2023 15:55:41 -0400 Subject: [PATCH 16/18] store schema def in an Arc --- lib/vector-core/src/config/mod.rs | 10 +++++++--- src/source_sender/mod.rs | 2 +- src/transforms/reduce/mod.rs | 1 - src/transforms/remap.rs | 2 -- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index efd9e973d8557..744e712243b59 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, fmt, num::NonZeroUsize}; use bitmask_enum::bitmask; @@ -111,7 +112,7 @@ pub struct SourceOutput { // NOTE: schema definitions are only implemented/supported for log-type events. There is no // inherent blocker to support other types as well, but it'll require additional work to add // the relevant schemas, and store them separately in this type. - pub schema_definition: Option, + pub schema_definition: Option>, } impl SourceOutput { @@ -129,7 +130,7 @@ impl SourceOutput { Self { port: None, ty, - schema_definition: Some(schema_definition), + schema_definition: Some(Arc::new(schema_definition)), } } @@ -166,11 +167,14 @@ impl SourceOutput { /// A simple definition is just the default for the namespace. For the Vector namespace the /// meanings are included. /// Schema enabled is set in the users configuration. + #[cfg(any(feature = "test", test))] #[must_use] pub fn schema_definition(&self, schema_enabled: bool) -> Option { + use std::ops::Deref; + self.schema_definition.as_ref().map(|definition| { if schema_enabled { - definition.clone() + definition.deref().clone() } else { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index e7140fb931cea..fea4a3980b64d 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -57,7 +57,7 @@ impl Builder { component_key: ComponentKey, ) -> LimitedReceiver { let lag_time = self.lag_time.clone(); - let log_definition = output.schema_definition.clone().map(Arc::new); + let log_definition = output.schema_definition.clone(); let output_id = OutputId { component: component_key, port: output.port.clone(), diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index c26da52a77555..90c9294b0cb63 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -142,7 +142,6 @@ impl TransformConfig for ReduceConfig { .reduce(Definition::merge) .unwrap_or_else(Definition::any); - // for (output, input) in input_definitions { let mut schema_definition = merged_definition; for (key, merge_strategy) in self.merge_strategies.iter() { diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index c08ea925fe3fe..0cbd2af1c119e 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -450,8 +450,6 @@ where drop_on_error: config.drop_on_error, drop_on_abort: config.drop_on_abort, reroute_dropped: config.reroute_dropped, - // default_schema_definition: Arc::new(default_schema_definition), - // dropped_schema_definition: Arc::new(dropped_schema_definition), runner, metric_tag_values: config.metric_tag_values, }) From 28d105289ea4358d3a408d5e21b0d7f453b51fe2 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Mon, 26 Jun 2023 16:43:31 -0400 Subject: [PATCH 17/18] cleanup --- lib/vector-core/src/config/mod.rs | 1 - lib/vector-core/src/transform/mod.rs | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 744e712243b59..71786155d1d8f 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -167,7 +167,6 @@ impl SourceOutput { /// A simple definition is just the default for the namespace. For the Vector namespace the /// meanings are included. /// Schema enabled is set in the users configuration. - #[cfg(any(feature = "test", test))] #[must_use] pub fn schema_definition(&self, schema_enabled: bool) -> Option { use std::ops::Deref; diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index a9c67f504d22f..af81c51aa69a1 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -300,16 +300,16 @@ impl TransformOutputs { } #[allow(clippy::implicit_hasher)] +/// `event`: The event that will be updated +/// `output_id`: The `output_id` that the current even is being sent to (will be used as the new `parent_id`) +/// `log_schema_definitions`: A mapping of parent `OutputId` to definitions, that will be used to lookup the new runtime definition of the event pub fn update_runtime_schema_definition( - // The event that will be updated mut event: EventMutRef, - // The output_id that the current even is being sent to (will be used as the new parent_id) output_id: &Arc, - // A mapping of parent OutputId's to definitions, that will be used to lookup the new runtime definition of the event log_schema_definitions: &HashMap>, ) { if let EventMutRef::Log(log) = &mut event { - if let Some(parent_component_id) = log.metadata_mut().upstream_id() { + if let Some(parent_component_id) = log.metadata().upstream_id() { if let Some(definition) = log_schema_definitions.get(parent_component_id) { log.metadata_mut().set_schema_definition(definition); } From baf87aa6d5d40c16d97ad4729cfd07998bea6271 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Tue, 27 Jun 2023 09:35:44 -0400 Subject: [PATCH 18/18] add comment --- src/topology/builder.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 29121ef1244d7..483d4b51aaede 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -945,6 +945,8 @@ fn build_task_transform( component: key.clone(), port: None, }); + + // Task transforms can only write to the default output, so only a single schema def map is needed let schema_definition_map = outputs .iter() .find(|x| x.port.is_none())