Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(observability): ensure sent_event and received_event metrics are estimated json size #17465

Merged
merged 20 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/vector-common/src/internal_event/events_received.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ crate::registered_event!(
#[allow(clippy::cast_precision_loss)]
self.events_count.record(count as f64);
self.events.increment(count as u64);
self.event_bytes.increment(byte_size as u64);
self.event_bytes.increment(byte_size.get() as u64);
}
);
6 changes: 3 additions & 3 deletions lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ crate::registered_event!(

match &self.output {
Some(output) => {
trace!(message = "Events sent.", count = %count, byte_size = %byte_size, output = %output);
trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
}
None => {
trace!(message = "Events sent.", count = %count, byte_size = %byte_size);
trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
}
}

self.events.increment(count as u64);
self.event_bytes.increment(byte_size as u64);
self.event_bytes.increment(byte_size.get() as u64);
}
);

Expand Down
6 changes: 4 additions & 2 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
pub use prelude::{error_stage, error_type};
pub use service::{CallError, PollReadyError};

use crate::json_size::JsonSize;

pub trait InternalEvent: Sized {
fn emit(self);

Expand Down Expand Up @@ -106,9 +108,9 @@ pub struct ByteSize(pub usize);
#[derive(Clone, Copy)]
pub struct Count(pub usize);

/// Holds the tuple `(count_of_events, size_of_events_in_bytes)`.
/// Holds the tuple `(count_of_events, estimated_json_size_of_events)`.
#[derive(Clone, Copy)]
pub struct CountByteSize(pub usize, pub usize);
pub struct CountByteSize(pub usize, pub JsonSize);
StephenWakely marked this conversation as resolved.
Show resolved Hide resolved

// Wrapper types used to hold parameters for registering events

Expand Down
105 changes: 105 additions & 0 deletions lib/vector-common/src/json_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::{
fmt,
iter::Sum,
ops::{Add, AddAssign, Sub},
};

/// A newtype for the JSON size of an event.
/// Used to emit the `component_received_event_bytes_total` and
/// `component_sent_event_bytes_total` metrics.
#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct JsonSize(usize);

impl fmt::Display for JsonSize {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl Sub for JsonSize {
type Output = JsonSize;

#[inline]
fn sub(mut self, rhs: Self) -> Self::Output {
self.0 -= rhs.0;
self
}
}

impl Add for JsonSize {
type Output = JsonSize;

#[inline]
fn add(mut self, rhs: Self) -> Self::Output {
self.0 += rhs.0;
self
}
}

impl AddAssign for JsonSize {
#[inline]
fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0;
}
}

impl Sum for JsonSize {
#[inline]
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
let mut accum = 0;
for val in iter {
accum += val.get();
}

JsonSize::new(accum)
}
}

impl From<usize> for JsonSize {
#[inline]
fn from(value: usize) -> Self {
Self(value)
}
}

impl JsonSize {
/// Create a new instance with the specified size.
#[must_use]
#[inline]
pub const fn new(size: usize) -> Self {
Self(size)
}

/// Create a new instance with size 0.
#[must_use]
#[inline]
pub const fn zero() -> Self {
Self(0)
}

/// Returns the contained size.
#[must_use]
#[inline]
pub fn get(&self) -> usize {
self.0
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[allow(clippy::module_name_repetitions)]
pub struct NonZeroJsonSize(JsonSize);
bruceg marked this conversation as resolved.
Show resolved Hide resolved

impl NonZeroJsonSize {
#[must_use]
#[inline]
pub fn new(size: JsonSize) -> Option<Self> {
(size.0 > 0).then_some(NonZeroJsonSize(size))
}
}

impl From<NonZeroJsonSize> for JsonSize {
#[inline]
fn from(value: NonZeroJsonSize) -> Self {
value.0
}
}
2 changes: 2 additions & 0 deletions lib/vector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub use vrl::btreemap;
#[cfg(feature = "byte_size_of")]
pub mod byte_size_of;

pub mod json_size;

pub mod config;

#[cfg(feature = "conversion")]
Expand Down
10 changes: 6 additions & 4 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::ops::Add;

use crate::json_size::JsonSize;

/// Metadata for batch requests.
#[derive(Clone, Copy, Debug, Default)]
pub struct RequestMetadata {
Expand All @@ -8,7 +10,7 @@ pub struct RequestMetadata {
/// Size, in bytes, of the in-memory representation of all events in this batch request.
events_byte_size: usize,
/// Size, in bytes, of the estimated JSON-encoded representation of all events in this batch request.
events_estimated_json_encoded_byte_size: usize,
events_estimated_json_encoded_byte_size: JsonSize,
/// Uncompressed size, in bytes, of the encoded events in this batch request.
request_encoded_size: usize,
/// On-the-wire size, in bytes, of the batch request itself after compression, etc.
Expand All @@ -25,7 +27,7 @@ impl RequestMetadata {
events_byte_size: usize,
request_encoded_size: usize,
request_wire_size: usize,
events_estimated_json_encoded_byte_size: usize,
events_estimated_json_encoded_byte_size: JsonSize,
) -> Self {
Self {
event_count,
Expand All @@ -47,7 +49,7 @@ impl RequestMetadata {
}

#[must_use]
pub const fn events_estimated_json_encoded_byte_size(&self) -> usize {
pub const fn events_estimated_json_encoded_byte_size(&self) -> JsonSize {
self.events_estimated_json_encoded_byte_size
}

Expand All @@ -64,7 +66,7 @@ impl RequestMetadata {
/// Constructs a `RequestMetadata` by summation of the "batch" of `RequestMetadata` provided.
#[must_use]
pub fn from_batch<T: IntoIterator<Item = RequestMetadata>>(metadata_iter: T) -> Self {
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, 0);
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, JsonSize::zero());

for metadata in metadata_iter {
metadata_sum = metadata_sum + &metadata;
Expand Down
7 changes: 5 additions & 2 deletions lib/vector-core/src/event/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use futures::{stream, Stream};
#[cfg(test)]
use quickcheck::{Arbitrary, Gen};
use vector_buffers::EventCount;
use vector_common::finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable};
use vector_common::{
finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable},
json_size::JsonSize,
};

use super::{
EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef,
Expand Down Expand Up @@ -253,7 +256,7 @@ impl ByteSizeOf for EventArray {
}

impl EstimatedJsonEncodedSizeOf for EventArray {
fn estimated_json_encoded_size_of(&self) -> usize {
fn estimated_json_encoded_size_of(&self) -> JsonSize {
match self {
Self::Logs(v) => v.estimated_json_encoded_size_of(),
Self::Traces(v) => v.estimated_json_encoded_size_of(),
Expand Down
Loading