diff --git a/lib/vector-common/src/request_metadata.rs b/lib/vector-common/src/request_metadata.rs index 5b2ffeaef0eb4..6069a0dda837b 100644 --- a/lib/vector-common/src/request_metadata.rs +++ b/lib/vector-common/src/request_metadata.rs @@ -1,5 +1,7 @@ -use std::collections::HashMap; -use std::ops::Add; +use std::{ + collections::HashMap, + ops::{Add, AddAssign}, +}; use crate::{ internal_event::{ @@ -119,6 +121,22 @@ impl GroupedCountByteSize { } } } + + /// Returns `true` if we are the `Tagged` variant - keeping track of the byte sizes + /// grouped by their relevant tags. + #[must_use] + pub fn is_tagged(&self) -> bool { + match self { + GroupedCountByteSize::Tagged { .. } => true, + GroupedCountByteSize::Untagged { .. } => false, + } + } + + /// Returns `true` if we are the `Untagged` variant - keeping a single count for all events. + #[must_use] + pub fn is_untagged(&self) -> bool { + !self.is_tagged() + } } impl From for GroupedCountByteSize { @@ -127,26 +145,81 @@ impl From for GroupedCountByteSize { } } +impl AddAssign for GroupedCountByteSize { + fn add_assign(&mut self, mut rhs: Self) { + if self.is_untagged() && rhs.is_tagged() { + // First handle the case where we are untagged and assigning to a tagged value. + // We need to change `self` and so need to ensure our match doesn't take ownership of the object. + *self = match (&self, &mut rhs) { + (Self::Untagged { size }, Self::Tagged { sizes }) => { + let mut sizes = std::mem::take(sizes); + match sizes.get_mut(&TaggedEventsSent::new_empty()) { + Some(empty_size) => *empty_size += *size, + None => { + sizes.insert(TaggedEventsSent::new_empty(), *size); + } + } + + Self::Tagged { sizes } + } + _ => { + unreachable!() + } + }; + + return; + } + + // For these cases, we know we won't have to change `self` so the match can take ownership. + match (self, rhs) { + (Self::Tagged { sizes: ref mut lhs }, Self::Tagged { sizes: rhs }) => { + for (key, value) in rhs { + match lhs.get_mut(&key) { + Some(size) => *size += value, + None => { + lhs.insert(key.clone(), value); + } + } + } + } + + (Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => { + *lhs = *lhs + rhs; + } + + (Self::Tagged { ref mut sizes }, Self::Untagged { size }) => { + match sizes.get_mut(&TaggedEventsSent::new_empty()) { + Some(empty_size) => *empty_size += size, + None => { + sizes.insert(TaggedEventsSent::new_empty(), size); + } + } + } + (Self::Untagged { .. }, Self::Tagged { .. }) => unreachable!(), + }; + } +} + impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize { type Output = GroupedCountByteSize; fn add(self, other: &'a Self::Output) -> Self::Output { match (self, other) { - (Self::Tagged { sizes: mut us }, Self::Tagged { sizes: them }) => { - for (key, value) in them { - match us.get_mut(key) { + (Self::Tagged { sizes: mut lhs }, Self::Tagged { sizes: rhs }) => { + for (key, value) in rhs { + match lhs.get_mut(key) { Some(size) => *size += *value, None => { - us.insert(key.clone(), *value); + lhs.insert(key.clone(), *value); } } } - Self::Tagged { sizes: us } + Self::Tagged { sizes: lhs } } - (Self::Untagged { size: us }, Self::Untagged { size: them }) => { - Self::Untagged { size: us + *them } + (Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => { + Self::Untagged { size: lhs + *rhs } } // The following two scenarios shouldn't really occur in practice, but are provided for completeness. diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index b056d672cf46a..347271d5651ae 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -333,6 +333,8 @@ impl DatadogMetricsEncoder { self.state.written += n; let raw_bytes_written = self.state.written; + let byte_size = self.state.byte_size.clone(); + // Consume the encoder state so we can do our final checks and return the necessary data. let state = self.reset_state(); let payload = state @@ -357,7 +359,7 @@ impl DatadogMetricsEncoder { if recommended_splits == 1 { // "One" split means no splits needed: our payload didn't exceed either of the limits. Ok(( - EncodeResult::compressed(payload, raw_bytes_written, self.state.byte_size.clone()), + EncodeResult::compressed(payload, raw_bytes_written, byte_size), processed, )) } else { diff --git a/src/sinks/datadog/metrics/integration_tests.rs b/src/sinks/datadog/metrics/integration_tests.rs index 572b304db8f3e..458cc5be987f3 100644 --- a/src/sinks/datadog/metrics/integration_tests.rs +++ b/src/sinks/datadog/metrics/integration_tests.rs @@ -4,14 +4,20 @@ use futures::{channel::mpsc::Receiver, stream, StreamExt}; use hyper::StatusCode; use indoc::indoc; use rand::{thread_rng, Rng}; -use vector_core::event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue}; +use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue}, +}; use super::DatadogMetricsConfig; use crate::{ config::SinkConfig, sinks::util::test::{build_test_server_status, load_sink}, test_util::{ - components::{assert_sink_compliance, SINK_TAGS}, + components::{ + assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS, + SINK_TAGS, + }, map_event_batch_stream, next_addr, }, }; @@ -168,35 +174,51 @@ async fn smoke() { } } -#[tokio::test] -async fn real_endpoint() { - assert_sink_compliance(&SINK_TAGS, async { - let config = indoc! {r#" +async fn run_sink() { + let config = indoc! {r#" default_api_key = "${TEST_DATADOG_API_KEY}" default_namespace = "fake.test.integration" "#}; - let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap(); - assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required"); - let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key); - let (config, cx) = load_sink::(config.as_str()).unwrap(); - - let (sink, _) = config.build(cx).await.unwrap(); - let (batch, receiver) = BatchNotifier::new_with_receiver(); - let events: Vec<_> = (0..10) - .map(|index| { - Event::Metric(Metric::new( - "counter", - MetricKind::Absolute, - MetricValue::Counter { - value: index as f64, - }, - )) - }) - .collect(); - let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); + let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap(); + assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required"); + let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key); + let (config, cx) = load_sink::(config.as_str()).unwrap(); + + let (sink, _) = config.build(cx).await.unwrap(); + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let events: Vec<_> = (0..10) + .map(|index| { + Event::Metric(Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { + value: index as f64, + }, + )) + }) + .collect(); + let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); + + sink.run(stream).await.unwrap(); + assert_eq!(receiver.await, BatchStatus::Delivered); +} - sink.run(stream).await.unwrap(); - assert_eq!(receiver.await, BatchStatus::Delivered); - }) - .await; +#[tokio::test] +async fn real_endpoint() { + assert_sink_compliance(&SINK_TAGS, async { run_sink().await }).await; +} + +#[tokio::test] +async fn data_volume_tags() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + + assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async { run_sink().await }).await; } diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index dbe57714995fc..68081627cc76a 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -9,10 +9,7 @@ use bytes::Bytes; use prost::Message; use snafu::Snafu; use vector_common::request_metadata::RequestMetadata; -use vector_core::{ - event::{EventFinalizers, Finalizable}, - EstimatedJsonEncodedSizeOf, -}; +use vector_core::event::{EventFinalizers, Finalizable}; use super::{ apm_stats::{compute_apm_stats, Aggregator}, @@ -125,7 +122,6 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ .for_each(|r| match r { Ok((payload, mut processed)) => { let uncompressed_size = payload.len(); - let json_size = processed.estimated_json_encoded_size_of(); let metadata = DDTracesMetadata { api_key: key .api_key @@ -137,14 +133,14 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ content_type: "application/x-protobuf".to_string(), }; + // build RequestMetadata + let builder = RequestMetadataBuilder::from_events(&processed); + let mut compressor = Compressor::from(self.compression); match compressor.write_all(&payload) { Ok(()) => { let bytes = compressor.into_inner().freeze(); - // build RequestMetadata - let builder = - RequestMetadataBuilder::new(n, uncompressed_size, json_size); let bytes_len = NonZeroUsize::new(bytes.len()) .expect("payload should never be zero length"); let request_metadata = builder.with_request_size(bytes_len); diff --git a/src/sinks/elasticsearch/integration_tests.rs b/src/sinks/elasticsearch/integration_tests.rs index 023371e6f4fe9..5860576522a94 100644 --- a/src/sinks/elasticsearch/integration_tests.rs +++ b/src/sinks/elasticsearch/integration_tests.rs @@ -8,7 +8,7 @@ use futures::{future::ready, stream}; use http::{Request, StatusCode}; use serde_json::{json, Value}; use vector_core::{ - config::log_schema, + config::{init_telemetry, log_schema, Tags, Telemetry}, event::{BatchNotifier, BatchStatus, Event, LogEvent}, }; @@ -23,8 +23,8 @@ use crate::{ }, test_util::{ components::{ - run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS, - HTTP_SINK_TAGS, + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + run_and_assert_sink_error, COMPONENT_ERROR_TAGS, DATA_VOLUME_SINK_TAGS, HTTP_SINK_TAGS, }, random_events_with_stream, random_string, trace_init, }, @@ -288,7 +288,25 @@ async fn insert_events_over_http() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, + BatchStatus::Delivered, + ) + .await; +} + +#[tokio::test] +async fn insert_events_with_data_volume() { + trace_init(); + + run_insert_tests( + ElasticsearchConfig { + endpoints: vec![http_server()], + doc_type: "log_lines".into(), + compression: Compression::None, + batch: batch_settings(), + ..Default::default() + }, + TestType::DataVolume, BatchStatus::Delivered, ) .await; @@ -306,7 +324,7 @@ async fn insert_events_over_http_with_gzip_compression() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, BatchStatus::Delivered, ) .await; @@ -332,7 +350,7 @@ async fn insert_events_over_https() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, BatchStatus::Delivered, ) .await; @@ -355,7 +373,7 @@ async fn insert_events_on_aws() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, BatchStatus::Delivered, ) .await; @@ -379,7 +397,7 @@ async fn insert_events_on_aws_with_compression() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, BatchStatus::Delivered, ) .await; @@ -397,7 +415,7 @@ async fn insert_events_with_failure() { batch: batch_settings(), ..Default::default() }, - true, + TestType::Error, BatchStatus::Rejected, ) .await; @@ -415,7 +433,7 @@ async fn insert_events_with_failure_and_gzip_compression() { batch: batch_settings(), ..Default::default() }, - true, + TestType::Error, BatchStatus::Rejected, ) .await; @@ -449,7 +467,7 @@ async fn insert_events_in_data_stream() { .await .expect("Data stream creation error"); - run_insert_tests_with_config(&cfg, false, BatchStatus::Delivered).await; + run_insert_tests_with_config(&cfg, TestType::Normal, BatchStatus::Delivered).await; } #[tokio::test] @@ -512,14 +530,26 @@ async fn distributed_insert_events_failover() { async fn run_insert_tests( mut config: ElasticsearchConfig, - break_events: bool, + test_type: TestType, status: BatchStatus, ) { + if test_type == TestType::DataVolume { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + } + config.bulk = BulkConfig { index: gen_index(), ..Default::default() }; - run_insert_tests_with_config(&config, break_events, status).await; + run_insert_tests_with_config(&config, test_type, status).await; } fn create_http_client() -> reqwest::Client { @@ -537,9 +567,16 @@ fn create_http_client() -> reqwest::Client { .expect("Could not build HTTP client") } +#[derive(Eq, PartialEq)] +enum TestType { + Error, + DataVolume, + Normal, +} + async fn run_insert_tests_with_config( config: &ElasticsearchConfig, - break_events: bool, + test_type: TestType, batch_status: BatchStatus, ) { let common = ElasticsearchCommon::parse_single(config) @@ -565,22 +602,30 @@ async fn run_insert_tests_with_config( let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input, events) = random_events_with_stream(100, 100, Some(batch)); - if break_events { - // Break all but the first event to simulate some kind of partial failure - let mut doit = false; - let events = events.map(move |mut events| { - if doit { - events.iter_logs_mut().for_each(|log| { - log.insert("_type", 1); - }); - } - doit = true; - events - }); + match test_type { + TestType::Error => { + // Break all but the first event to simulate some kind of partial failure + let mut doit = false; + let events = events.map(move |mut events| { + if doit { + events.iter_logs_mut().for_each(|log| { + log.insert("_type", 1); + }); + } + doit = true; + events + }); + + run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; + } - run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; - } else { - run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + TestType::DataVolume => { + run_and_assert_data_volume_sink_compliance(sink, events, &DATA_VOLUME_SINK_TAGS).await; + } + + TestType::Normal => { + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + } } assert_eq!(receiver.try_recv(), Ok(batch_status)); @@ -607,7 +652,7 @@ async fn run_insert_tests_with_config( .or_else(|| response["hits"]["total"].as_u64()) .expect("Elasticsearch response does not include hits->total nor hits->total->value"); - if break_events { + if test_type == TestType::Error { assert_ne!(input.len() as u64, total); } else { assert_eq!(input.len() as u64, total); diff --git a/src/sinks/loki/integration_tests.rs b/src/sinks/loki/integration_tests.rs index 8f4232de6fdcf..e4dd4e31e5cc8 100644 --- a/src/sinks/loki/integration_tests.rs +++ b/src/sinks/loki/integration_tests.rs @@ -7,7 +7,7 @@ use futures::stream; use lookup::owned_value_path; use vector_common::encode_logfmt; use vector_core::{ - config::LogNamespace, + config::{init_telemetry, LogNamespace, Tags, Telemetry}, event::{BatchNotifier, BatchStatus, Event, LogEvent}, }; use vrl::path::PathPrefix; @@ -21,7 +21,10 @@ use crate::{ sinks::{util::test::load_sink, VectorSink}, template::Template, test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, + components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, SINK_TAGS, + }, generate_events_with_stream, generate_lines_with_stream, random_lines, }, }; @@ -152,6 +155,34 @@ async fn text() { } } +#[tokio::test] +async fn data_volume_tags() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + + let (stream, sink) = build_sink("text", false).await; + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (lines, events) = generate_lines_with_stream(line_generator, 10, Some(batch)); + run_and_assert_data_volume_sink_compliance(sink, events, &DATA_VOLUME_SINK_TAGS).await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + tokio::time::sleep(tokio::time::Duration::new(1, 0)).await; + + let (_, outputs) = fetch_stream(stream.to_string(), "default").await; + assert_eq!(lines.len(), outputs.len()); + for (i, output) in outputs.iter().enumerate() { + assert_eq!(output, &lines[i]); + } +} + #[tokio::test] async fn namespaced_timestamp() { let (stream, sink) = build_sink("json", true).await; diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index 51be807831f56..f977b58f0177c 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -3,13 +3,17 @@ use std::{collections::HashMap, convert::TryFrom, time::SystemTime}; use chrono::{DateTime, Utc}; use futures::{future::ready, stream}; use serde::Deserialize; +use vector_core::config::{init_telemetry, Tags, Telemetry}; use super::*; use crate::{ config::{GenerateConfig, SinkConfig, SinkContext}, event::{Event, LogEvent, Metric, MetricKind, MetricValue, Value}, test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, + components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, SINK_TAGS, + }, http::{always_200_response, spawn_blackhole_http_server}, }, }; @@ -19,8 +23,7 @@ fn generate_config() { crate::test_util::test_generate_config::(); } -#[tokio::test] -async fn component_spec_compliance() { +async fn sink() -> (VectorSink, Event) { let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; let config = NewRelicConfig::generate_config().to_string(); @@ -32,9 +35,39 @@ async fn component_spec_compliance() { let (sink, _healthcheck) = config.build(context).await.unwrap(); let event = Event::Log(LogEvent::from("simple message")); + + (sink, event) +} + +#[tokio::test] +async fn component_spec_compliance() { + let (sink, event) = sink().await; run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; } +#[tokio::test] +async fn component_spec_compliance_data_volume() { + // We need to configure Vector to emit the service and source tags. + // The default is to not emit these. + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + + let (sink, event) = sink().await; + run_and_assert_data_volume_sink_compliance( + sink, + stream::once(ready(event)), + &DATA_VOLUME_SINK_TAGS, + ) + .await; +} + #[test] fn generate_event_api_model() { // Without message field diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index a8abc57e77b09..ce9b324c10fcd 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -282,6 +282,7 @@ mod tests { use bytes::Bytes; use futures_util::{poll, stream::FuturesUnordered, StreamExt}; use tower::{util::BoxService, Service, ServiceExt}; + use vector_common::internal_event::CountByteSize; use vector_core::{ config::proxy::ProxyConfig, event::{EventFinalizers, EventStatus}, @@ -339,7 +340,11 @@ mod tests { let body = Bytes::from("test-message"); let events_byte_size = body.len(); - let builder = RequestMetadataBuilder::new(1, events_byte_size, events_byte_size.into()); + let builder = RequestMetadataBuilder::new( + 1, + events_byte_size, + CountByteSize(1, events_byte_size.into()).into(), + ); let bytes_len = NonZeroUsize::new(events_byte_size).expect("payload should never be zero length"); let metadata = builder.with_request_size(bytes_len); diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index f686e9e74d4cc..8bfe02dfee305 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -6,7 +6,10 @@ use futures::{future::ready, stream}; use lookup::lookup_v2::OptionalValuePath; use serde_json::Value as JsonValue; use tokio::time::{sleep, Duration}; -use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; +use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus, Event, LogEvent}, +}; use crate::{ codecs::EncodingConfig, @@ -25,7 +28,10 @@ use crate::{ }, template::Template, test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, HTTP_SINK_TAGS, + }, random_lines_with_stream, random_string, }, }; @@ -125,6 +131,18 @@ async fn config(encoding: EncodingConfig, indexed_fields: Vec) -> HecLog } } +fn enable_telemetry() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); +} + #[tokio::test] async fn splunk_insert_message() { let cx = SinkContext::default(); @@ -145,6 +163,33 @@ async fn splunk_insert_message() { assert!(entry.get("message").is_none()); } +#[tokio::test] +async fn splunk_insert_message_data_volume() { + enable_telemetry(); + + let cx = SinkContext::default(); + + let config = config(TextSerializerConfig::default().into(), vec![]).await; + let (sink, _) = config.build(cx).await.unwrap(); + + let message = random_string(100); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let event = LogEvent::from(message.clone()).with_batch_notifier(&batch); + drop(batch); + run_and_assert_data_volume_sink_compliance( + sink, + stream::once(ready(event)), + &DATA_VOLUME_SINK_TAGS, + ) + .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let entry = find_entry(message.as_str()).await; + + assert_eq!(message, entry["_raw"].as_str().unwrap()); + assert!(entry.get("message").is_none()); +} + #[tokio::test] async fn splunk_insert_raw_message() { let cx = SinkContext::default(); @@ -170,6 +215,38 @@ async fn splunk_insert_raw_message() { assert!(entry.get("message").is_none()); } +#[tokio::test] +async fn splunk_insert_raw_message_data_volume() { + enable_telemetry(); + + let cx = SinkContext::default(); + + let config = HecLogsSinkConfig { + endpoint_target: EndpointTarget::Raw, + source: Some(Template::try_from("zork").unwrap()), + ..config(TextSerializerConfig::default().into(), vec![]).await + }; + let (sink, _) = config.build(cx).await.unwrap(); + + let message = random_string(100); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let event = LogEvent::from(message.clone()).with_batch_notifier(&batch); + drop(batch); + run_and_assert_data_volume_sink_compliance( + sink, + stream::once(ready(event)), + &DATA_VOLUME_SINK_TAGS, + ) + .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let entry = find_entry(message.as_str()).await; + + assert_eq!(message, entry["_raw"].as_str().unwrap()); + assert_eq!("zork", entry[SOURCE_FIELD].as_str().unwrap()); + assert!(entry.get("message").is_none()); +} + #[tokio::test] async fn splunk_insert_broken_token() { let cx = SinkContext::default(); diff --git a/src/sinks/splunk_hec/metrics/integration_tests.rs b/src/sinks/splunk_hec/metrics/integration_tests.rs index 0627749ce0a2a..0cadb38084c02 100644 --- a/src/sinks/splunk_hec/metrics/integration_tests.rs +++ b/src/sinks/splunk_hec/metrics/integration_tests.rs @@ -3,8 +3,11 @@ use std::convert::TryFrom; use futures::{future::ready, stream}; use lookup::lookup_v2::OptionalValuePath; use serde_json::Value as JsonValue; -use vector_core::event::{BatchNotifier, BatchStatus, Event, MetricValue}; -use vector_core::metric_tags; +use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus, Event, MetricValue}, + metric_tags, +}; use super::config::HecMetricsSinkConfig; use crate::{ @@ -17,7 +20,10 @@ use crate::{ util::{BatchConfig, Compression, TowerRequestConfig}, }, template::Template, - test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + test_util::components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, HTTP_SINK_TAGS, + }, }; const USERNAME: &str = "admin"; @@ -92,6 +98,48 @@ async fn splunk_insert_counter_metric() { ); } +fn enable_telemetry() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); +} + +#[tokio::test] +async fn splunk_insert_counter_metric_data_volume() { + enable_telemetry(); + + let cx = SinkContext::default(); + + let mut config = config().await; + config.index = Template::try_from("testmetrics".to_string()).ok(); + let (sink, _) = config.build(cx).await.unwrap(); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let event = get_counter(batch.clone()); + drop(batch); + run_and_assert_data_volume_sink_compliance( + sink, + stream::once(ready(event)), + &DATA_VOLUME_SINK_TAGS, + ) + .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + assert!( + metric_dimensions_exist( + "example-counter", + &["host", "source", "sourcetype", "tag_counter_test"], + ) + .await + ); +} + #[tokio::test] async fn splunk_insert_gauge_metric() { let cx = SinkContext::default(); diff --git a/src/sinks/util/metadata.rs b/src/sinks/util/metadata.rs index ccf39542c6bd3..b9418c128244a 100644 --- a/src/sinks/util/metadata.rs +++ b/src/sinks/util/metadata.rs @@ -1,20 +1,15 @@ use std::num::NonZeroUsize; +use vector_common::request_metadata::{GetEventCountTags, GroupedCountByteSize, RequestMetadata}; use vector_core::{config, ByteSizeOf, EstimatedJsonEncodedSizeOf}; -use vector_common::{ - internal_event::CountByteSize, - json_size::JsonSize, - request_metadata::{GetEventCountTags, GroupedCountByteSize, RequestMetadata}, -}; - use super::request_builder::EncodeResult; #[derive(Clone, Default)] pub struct RequestMetadataBuilder { event_count: usize, events_byte_size: usize, - events_estimated_json_encoded_byte_size: GroupedCountByteSize, + grouped_events_byte_size: GroupedCountByteSize, } impl RequestMetadataBuilder { @@ -34,7 +29,7 @@ impl RequestMetadataBuilder { Self { event_count: events.len(), events_byte_size, - events_estimated_json_encoded_byte_size: size, + grouped_events_byte_size: size, } } @@ -48,23 +43,19 @@ impl RequestMetadataBuilder { Self { event_count: 1, events_byte_size: event.size_of(), - events_estimated_json_encoded_byte_size: size, + grouped_events_byte_size: size, } } - pub fn new( + pub const fn new( event_count: usize, events_byte_size: usize, - events_estimated_json_encoded_byte_size: JsonSize, + grouped_events_byte_size: GroupedCountByteSize, ) -> Self { Self { event_count, events_byte_size, - events_estimated_json_encoded_byte_size: CountByteSize( - event_count, - events_estimated_json_encoded_byte_size, - ) - .into(), + grouped_events_byte_size, } } @@ -75,8 +66,7 @@ impl RequestMetadataBuilder { self.event_count += 1; self.events_byte_size += event.size_of(); let json_size = event.estimated_json_encoded_size_of(); - self.events_estimated_json_encoded_byte_size - .add_event(&event, json_size); + self.grouped_events_byte_size.add_event(&event, json_size); } /// Builds the [`RequestMetadata`] with the given size. @@ -89,7 +79,7 @@ impl RequestMetadataBuilder { self.events_byte_size, size, size, - self.events_estimated_json_encoded_byte_size.clone(), + self.grouped_events_byte_size.clone(), ) } diff --git a/src/sinks/vector/mod.rs b/src/sinks/vector/mod.rs index 03d77611ad239..e3a6b2b1b918a 100644 --- a/src/sinks/vector/mod.rs +++ b/src/sinks/vector/mod.rs @@ -45,7 +45,10 @@ mod tests { use http::request::Parts; use hyper::Method; use prost::Message; - use vector_core::event::{BatchNotifier, BatchStatus}; + use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus}, + }; use super::config::with_default_scheme; use super::*; @@ -55,7 +58,10 @@ mod tests { proto::vector as proto, sinks::util::test::build_test_server_generic, test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, HTTP_SINK_TAGS, + }, next_addr, random_lines_with_stream, }, }; @@ -68,8 +74,12 @@ mod tests { crate::test_util::test_generate_config::(); } - #[tokio::test] - async fn deliver_message() { + enum TestType { + Normal, + DataVolume, + } + + async fn run_sink_test(test_type: TestType) { let num_lines = 10; let in_addr = next_addr(); @@ -93,7 +103,15 @@ mod tests { let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); - run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + match test_type { + TestType::Normal => run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await, + + TestType::DataVolume => { + run_and_assert_data_volume_sink_compliance(sink, events, &DATA_VOLUME_SINK_TAGS) + .await + } + } + drop(trigger); assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); @@ -112,6 +130,26 @@ mod tests { assert_eq!(input_lines, output_lines); } + #[tokio::test] + async fn deliver_message() { + run_sink_test(TestType::Normal).await; + } + + #[tokio::test] + async fn data_volume_tags() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + + run_sink_test(TestType::DataVolume).await; + } + #[tokio::test] async fn acknowledges_error() { let num_lines = 10; diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index 5277a408634ee..a89271d4f928d 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -119,6 +119,7 @@ impl Service for VectorService { protocol: &service.protocol, endpoint: &service.endpoint, }); + VectorResponse { events_byte_size } }) .map_err(|source| VectorSinkError::Request { source }.into()) diff --git a/src/sinks/vector/sink.rs b/src/sinks/vector/sink.rs index 229867194ddfd..fe77de6837883 100644 --- a/src/sinks/vector/sink.rs +++ b/src/sinks/vector/sink.rs @@ -4,8 +4,9 @@ use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt}; use prost::Message; use tower::Service; -use vector_common::json_size::JsonSize; +use vector_common::request_metadata::GroupedCountByteSize; use vector_core::{ + config::telemetry, stream::{BatcherSettings, DriverResponse}, ByteSizeOf, EstimatedJsonEncodedSizeOf, }; @@ -20,18 +21,29 @@ use crate::{ /// Data for a single event. struct EventData { byte_size: usize, - json_byte_size: JsonSize, + json_byte_size: GroupedCountByteSize, finalizers: EventFinalizers, wrapper: EventWrapper, } /// Temporary struct to collect events during batching. -#[derive(Clone, Default)] +#[derive(Clone)] struct EventCollection { pub finalizers: EventFinalizers, pub events: Vec, pub events_byte_size: usize, - pub events_json_byte_size: JsonSize, + pub events_json_byte_size: GroupedCountByteSize, +} + +impl Default for EventCollection { + fn default() -> Self { + Self { + finalizers: Default::default(), + events: Default::default(), + events_byte_size: Default::default(), + events_json_byte_size: telemetry().create_request_count_byte_size(), + } + } } pub struct VectorSink { @@ -48,11 +60,16 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input - .map(|mut event| EventData { - byte_size: event.size_of(), - json_byte_size: event.estimated_json_encoded_size_of(), - finalizers: event.take_finalizers(), - wrapper: EventWrapper::from(event), + .map(|mut event| { + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + EventData { + byte_size: event.size_of(), + json_byte_size: byte_size, + finalizers: event.take_finalizers(), + wrapper: EventWrapper::from(event), + } }) .batched(self.batch_settings.into_reducer_config( |data: &EventData| data.wrapper.encoded_len(),