Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track runtime schema definitions for log events #17692

Merged
merged 19 commits into from
Jun 29, 2023
2 changes: 1 addition & 1 deletion lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub struct TransformOutput {
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
log_schema_definitions: HashMap<OutputId, schema::Definition>,
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
}

impl TransformOutput {
Expand Down
22 changes: 21 additions & 1 deletion lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@ pub struct EventMetadata {
/// The id of the source
source_id: Option<Arc<OutputId>>,

/// The id of the component this event originated from. This is used to
/// determine which schema definition to attach to an event in transforms.
/// This should always have a value set for events in transforms. It will always be `None`
/// in a source, and there is currently no use-case for reading the value in a sink.
parent_id: Option<Arc<OutputId>>,
fuchsnj marked this conversation as resolved.
Show resolved Hide resolved

/// An identifier for a globally registered schema definition which provides information about
/// the event shape (type information, and semantic meaning of fields).
/// This definition is only currently valid for logs, and shouldn't be used for other event types.
///
/// TODO(Jean): must not skip serialization to track schemas across restarts.
#[serde(default = "default_schema_definition", skip)]
Expand Down Expand Up @@ -73,17 +80,29 @@ impl EventMetadata {
&mut self.secrets
}

/// Returns a reference to the metadata source.
/// Returns a reference to the metadata source id.
#[must_use]
pub fn source_id(&self) -> Option<&OutputId> {
self.source_id.as_deref()
}

/// Returns a reference to the metadata parent id. This is the `OutputId`
/// of the previous component the event was sent through (if any).
#[must_use]
pub fn parent_id(&self) -> Option<&OutputId> {
self.parent_id.as_deref()
}

/// Sets the `source_id` in the metadata to the provided value.
pub fn set_source_id(&mut self, source_id: Arc<OutputId>) {
self.source_id = Some(source_id);
}

/// Sets the `parent_id` in the metadata to the provided value.
pub fn set_parent_id(&mut self, parent_id: Arc<OutputId>) {
self.parent_id = Some(parent_id);
}

/// Return the datadog API key, if it exists
pub fn datadog_api_key(&self) -> Option<Arc<str>> {
self.secrets.get(DATADOG_API_KEY).cloned()
Expand Down Expand Up @@ -113,6 +132,7 @@ impl Default for EventMetadata {
finalizers: Default::default(),
schema_definition: default_schema_definition(),
source_id: None,
parent_id: None,
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,24 @@ impl Event {
self.metadata_mut().set_source_id(source_id);
}

/// Sets the `parent_id` in the event metadata to the provided value.
pub fn set_parent_id(&mut self, parent_id: Arc<OutputId>) {
self.metadata_mut().set_parent_id(parent_id);
}

/// Sets the `source_id` in the event metadata to the provided value.
#[must_use]
pub fn with_source_id(mut self, source_id: Arc<OutputId>) -> Self {
self.metadata_mut().set_source_id(source_id);
self
}

/// Sets the `source_id` in the event metadata to the provided value.
#[must_use]
pub fn with_parent_id(mut self, parent_id: Arc<OutputId>) -> Self {
self.metadata_mut().set_parent_id(parent_id);
self
}
}

impl EventDataEq for Event {
Expand Down
155 changes: 88 additions & 67 deletions lib/vector-core/src/transform/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::{collections::HashMap, error, pin::Pin};

use futures::{Stream, StreamExt};
Expand All @@ -7,13 +8,16 @@ use vector_common::internal_event::{
use vector_common::json_size::JsonSize;
use vector_common::EventDataEq;

use crate::config::{ComponentKey, OutputId};
use crate::event::EventMutRef;
use crate::schema::Definition;
use crate::{
config,
event::{
into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef,
},
fanout::{self, Fanout},
ByteSizeOf,
schema, ByteSizeOf,
};

#[cfg(any(feature = "lua"))]
Expand Down Expand Up @@ -178,6 +182,8 @@ impl SyncTransform for Box<dyn FunctionTransform> {
struct TransformOutput {
fanout: Fanout,
events_sent: Registered<EventsSent>,
log_schema_definitions: HashMap<OutputId, Arc<schema::Definition>>,
output_id: Arc<OutputId>,
}

pub struct TransformOutputs {
Expand All @@ -189,6 +195,7 @@ pub struct TransformOutputs {
impl TransformOutputs {
pub fn new(
outputs_in: Vec<config::TransformOutput>,
component_key: &ComponentKey,
) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
let outputs_spec = outputs_in.clone();
let mut primary_output = None;
Expand All @@ -197,13 +204,25 @@ impl TransformOutputs {

for output in outputs_in {
let (fanout, control) = Fanout::new();

let log_schema_definitions = output
.log_schema_definitions
.into_iter()
.map(|(id, definition)| (id, Arc::new(definition)))
.collect();

match output.port {
None => {
primary_output = Some(TransformOutput {
fanout,
events_sent: register(EventsSent::from(internal_event::Output(Some(
DEFAULT_OUTPUT.into(),
)))),
log_schema_definitions,
output_id: Arc::new(OutputId {
component: component_key.clone(),
port: None,
}),
});
controls.insert(None, control);
}
Expand All @@ -215,6 +234,11 @@ impl TransformOutputs {
events_sent: register(EventsSent::from(internal_event::Output(Some(
name.clone().into(),
)))),
log_schema_definitions,
output_id: Arc::new(OutputId {
component: component_key.clone(),
port: Some(name.clone()),
}),
},
);
controls.insert(Some(name.clone()), control);
Expand Down Expand Up @@ -246,31 +270,61 @@ impl TransformOutputs {
buf: &mut TransformOutputsBuf,
) -> Result<(), Box<dyn error::Error + Send + Sync>> {
if let Some(primary) = self.primary_output.as_mut() {
let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len);
let byte_size = buf.primary_buffer.as_ref().map_or(
JsonSize::new(0),
EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of,
);
buf.primary_buffer
.as_mut()
.expect("mismatched outputs")
.send(&mut primary.fanout)
.await?;
primary.events_sent.emit(CountByteSize(count, byte_size));
let buf = buf.primary_buffer.as_mut().expect("mismatched outputs");
Self::send_single_buffer(buf, primary).await?;
}

for (key, buf) in &mut buf.named_buffers {
let count = buf.len();
let byte_size = buf.estimated_json_encoded_size_of();
let output = self.named_outputs.get_mut(key).expect("unknown output");
buf.send(&mut output.fanout).await?;
output.events_sent.emit(CountByteSize(count, byte_size));
Self::send_single_buffer(buf, output).await?;
}
Ok(())
}

async fn send_single_buffer(
buf: &mut OutputBuffer,
output: &mut TransformOutput,
) -> Result<(), Box<dyn error::Error + Send + Sync>> {
for event in buf.events_mut() {
update_runtime_schema_definition(
event,
&output.output_id,
&output.log_schema_definitions,
);
}
let count = buf.len();
let byte_size = buf.estimated_json_encoded_size_of();
buf.send(&mut output.fanout).await?;
output.events_sent.emit(CountByteSize(count, byte_size));
Ok(())
}
}

#[allow(clippy::implicit_hasher)]
fuchsnj marked this conversation as resolved.
Show resolved Hide resolved
pub fn update_runtime_schema_definition(
// The event that will be updated
fuchsnj marked this conversation as resolved.
Show resolved Hide resolved
mut event: EventMutRef,
// The output_id that the current even is being sent to (will be used as the new parent_id)
output_id: &Arc<OutputId>,
// A mapping of parent OutputId's to definitions, that will be used to lookup the new runtime definition of the event
log_schema_definitions: &HashMap<OutputId, Arc<Definition>>,
) {
if let EventMutRef::Log(log) = &mut event {
if let Some(parent_component_id) = log.metadata_mut().parent_id() {
if let Some(definition) = log_schema_definitions.get(parent_component_id) {
log.metadata_mut().set_schema_definition(definition);
}
} else {
// there is no parent defined. That means this event originated from a component that
// isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the
// schema definitions _must_ be the same, so the first one is picked
if let Some(definition) = log_schema_definitions.values().next() {
log.metadata_mut().set_schema_definition(definition);
}
}
}
event.metadata_mut().set_parent_id(Arc::clone(output_id));
}

#[derive(Debug, Clone)]
pub struct TransformOutputsBuf {
primary_buffer: Option<OutputBuffer>,
Expand Down Expand Up @@ -299,75 +353,41 @@ impl TransformOutputsBuf {
}
}

pub fn push(&mut self, event: Event) {
self.primary_buffer
.as_mut()
.expect("no default output")
.push(event);
}

pub fn push_named(&mut self, name: &str, event: Event) {
self.named_buffers
.get_mut(name)
.expect("unknown output")
.push(event);
}

pub fn append(&mut self, slice: &mut Vec<Event>) {
self.primary_buffer
.as_mut()
.expect("no default output")
.append(slice);
}

pub fn append_named(&mut self, name: &str, slice: &mut Vec<Event>) {
self.named_buffers
.get_mut(name)
.expect("unknown output")
.append(slice);
/// Adds a new event to the transform output buffer
pub fn push(&mut self, name: Option<&str>, event: Event) {
match name {
Some(name) => self.named_buffers.get_mut(name),
None => self.primary_buffer.as_mut(),
}
.expect("unknown output")
.push(event);
}

#[cfg(any(feature = "test", test))]
pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
self.primary_buffer
.as_mut()
.expect("no default output")
.drain()
}

#[cfg(any(feature = "test", test))]
pub fn drain_named(&mut self, name: &str) -> impl Iterator<Item = Event> + '_ {
self.named_buffers
.get_mut(name)
.expect("unknown output")
.drain()
}

pub fn extend(&mut self, events: impl Iterator<Item = Event>) {
self.primary_buffer
.as_mut()
.expect("no default output")
.extend(events);
}

#[cfg(any(feature = "test", test))]
pub fn take_primary(&mut self) -> OutputBuffer {
std::mem::take(self.primary_buffer.as_mut().expect("no default output"))
}

#[cfg(any(feature = "test", test))]
pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
std::mem::take(&mut self.named_buffers)
}

pub fn len(&self) -> usize {
self.primary_buffer.as_ref().map_or(0, OutputBuffer::len)
+ self
.named_buffers
.values()
.map(OutputBuffer::len)
.sum::<usize>()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

impl ByteSizeOf for TransformOutputsBuf {
Expand Down Expand Up @@ -439,6 +459,7 @@ impl OutputBuffer {
})
}

#[cfg(any(feature = "test", test))]
pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
self.0.drain(..).flat_map(EventArray::into_events)
}
Expand All @@ -458,12 +479,12 @@ impl OutputBuffer {
self.0.iter().flat_map(EventArray::iter_events)
}

pub fn into_events(self) -> impl Iterator<Item = Event> {
self.0.into_iter().flat_map(EventArray::into_events)
fn events_mut(&mut self) -> impl Iterator<Item = EventMutRef> {
self.0.iter_mut().flat_map(EventArray::iter_events_mut)
}

pub fn take_events(&mut self) -> Vec<EventArray> {
std::mem::take(&mut self.0)
pub fn into_events(self) -> impl Iterator<Item = Event> {
self.0.into_iter().flat_map(EventArray::into_events)
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send +
&self,
enrichment_tables: enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],

// This only exists for transforms that create logs from non-logs, to know which namespace
// to use, such as `metric_to_log`
global_log_namespace: LogNamespace,
) -> Vec<TransformOutput>;

Expand Down
Loading