diff --git a/Cargo.toml b/Cargo.toml index cbcfd908b5ced..d0f8c5e684802 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -688,7 +688,7 @@ sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-papertrail = ["dep:syslog"] sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap"] -sinks-pulsar = ["dep:apache-avro", "dep:pulsar"] +sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"] sinks-redis = ["dep:redis"] sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"] sinks-socket = ["sinks-utils-udp"] diff --git a/scripts/integration/pulsar/test.yaml b/scripts/integration/pulsar/test.yaml index 9115ec5620eee..a63f7da772e4b 100644 --- a/scripts/integration/pulsar/test.yaml +++ b/scripts/integration/pulsar/test.yaml @@ -1,7 +1,7 @@ features: - pulsar-integration-tests -test_filter: '::pulsar::' +test_filter: '::pulsar::integration_tests::' env: PULSAR_ADDRESS: pulsar://pulsar:6650 diff --git a/src/internal_events/pulsar.rs b/src/internal_events/pulsar.rs index bb38b9bddff4d..ffa5f51883d8a 100644 --- a/src/internal_events/pulsar.rs +++ b/src/internal_events/pulsar.rs @@ -33,3 +33,26 @@ impl InternalEvent for PulsarSendingError { }); } } + +pub struct PulsarPropertyExtractionError { + pub property_field: F, +} + +impl InternalEvent for PulsarPropertyExtractionError { + fn emit(self) { + error!( + message = "Failed to extract properties. Value should be a map of String -> Bytes.", + error_code = "extracting_property", + error_type = error_type::PARSER_FAILED, + stage = error_stage::PROCESSING, + property_field = %self.property_field, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", 1, + "error_code" => "extracting_property", + "error_type" => error_type::PARSER_FAILED, + "stage" => error_stage::PROCESSING, + ); + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index b9d6955642e85..a992c11a76baf 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -357,7 +357,7 @@ pub enum Sinks { /// Publish observability events to Apache Pulsar topics. #[cfg(feature = "sinks-pulsar")] #[configurable(metadata(docs::label = "Pulsar"))] - Pulsar(pulsar::PulsarSinkConfig), + Pulsar(pulsar::config::PulsarSinkConfig), /// Publish observability data to Redis. #[cfg(feature = "sinks-redis")] diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs deleted file mode 100644 index 0d64824d8427e..0000000000000 --- a/src/sinks/pulsar.rs +++ /dev/null @@ -1,593 +0,0 @@ -use std::{ - num::NonZeroUsize, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use crate::{ - codecs::{Encoder, EncodingConfig, Transformer}, - config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - event::{Event, EventFinalizers, EventStatus, Finalizable}, - internal_events::PulsarSendingError, - schema, - sinks::util::metadata::RequestMetadataBuilder, -}; -use bytes::BytesMut; -use codecs::{encoding::SerializerConfig, TextSerializerConfig}; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, Sink, Stream}; -use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; -use pulsar::compression; -use pulsar::error::AuthenticationError; -use pulsar::{ - message::proto, producer::SendFuture, proto::CommandSendReceipt, Authentication, - Error as PulsarError, Producer, Pulsar, TokioExecutor, -}; -use snafu::{ResultExt, Snafu}; -use tokio_util::codec::Encoder as _; -use value::{Kind, Value}; -use vector_common::{ - internal_event::{ - ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle as _, Output, Protocol, - Registered, - }, - request_metadata::RequestMetadata, - sensitive_string::SensitiveString, -}; -use vector_config::configurable_component; - -#[derive(Debug, Snafu)] -enum BuildError { - #[snafu(display("creating pulsar producer failed: {}", source))] - CreatePulsarSink { source: PulsarError }, -} - -/// Configuration for the `pulsar` sink. -#[configurable_component(sink("pulsar"))] -#[derive(Clone, Debug)] -pub struct PulsarSinkConfig { - /// The endpoint to which the Pulsar client should connect to. - /// - /// The endpoint should specify the pulsar protocol and port. - #[serde(alias = "address")] - #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))] - endpoint: String, - - /// The Pulsar topic name to write events to. - #[configurable(metadata(docs::examples = "topic-1234"))] - topic: String, - - /// The name of the producer. If not specified, the default name assigned by Pulsar is used. - #[configurable(metadata(docs::examples = "producer-name"))] - producer_name: Option, - - #[configurable(derived)] - pub encoding: EncodingConfig, - - #[configurable(derived)] - #[serde(default)] - batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - compression: PulsarCompression, - - #[configurable(derived)] - auth: Option, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub acknowledgements: AcknowledgementsConfig, - - /// Log field to use as Pulsar message key. - #[configurable(metadata(docs::examples = "message"))] - #[configurable(metadata(docs::examples = "my_field"))] - partition_key_field: Option, -} - -/// Event batching behavior. -#[configurable_component] -#[derive(Clone, Copy, Debug, Default)] -pub struct BatchConfig { - /// The maximum size of a batch before it is flushed. - #[configurable(metadata(docs::type_unit = "events"))] - #[configurable(metadata(docs::examples = 1000))] - pub max_events: Option, -} - -/// Authentication configuration. -#[configurable_component] -#[derive(Clone, Debug)] -struct AuthConfig { - /// Basic authentication name/username. - /// - /// This can be used either for basic authentication (username/password) or JWT authentication. - /// When used for JWT, the value should be `token`. - #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))] - #[configurable(metadata(docs::examples = "name123"))] - name: Option, - - /// Basic authentication password/token. - /// - /// This can be used either for basic authentication (username/password) or JWT authentication. - /// When used for JWT, the value should be the signed JWT, in the compact representation. - #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))] - #[configurable(metadata(docs::examples = "123456789"))] - token: Option, - - #[configurable(derived)] - oauth2: Option, -} - -/// OAuth2-specific authentication configuration. -#[configurable_component] -#[derive(Clone, Debug)] -pub struct OAuth2Config { - /// The issuer URL. - #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))] - #[configurable(metadata(docs::examples = "https://oauth2.issuer"))] - issuer_url: String, - - /// The credentials URL. - /// - /// A data URL is also supported. - #[configurable(metadata(docs::examples = "{OAUTH2_CREDENTIALS_URL}"))] - #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))] - #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))] - credentials_url: String, - - /// The OAuth2 audience. - #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))] - #[configurable(metadata(docs::examples = "pulsar"))] - audience: Option, - - /// The OAuth2 scope. - #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))] - #[configurable(metadata(docs::examples = "admin"))] - scope: Option, -} - -/// Supported compression types for Pulsar. -#[configurable_component] -#[derive(Clone, Copy, Debug, Derivative)] -#[derivative(Default)] -#[serde(rename_all = "lowercase")] -pub enum PulsarCompression { - /// No compression. - #[derivative(Default)] - None, - - /// LZ4. - Lz4, - - /// Zlib. - Zlib, - - /// Zstandard. - Zstd, - - /// Snappy. - Snappy, -} - -type PulsarProducer = Producer; -type BoxedPulsarProducer = Box; - -enum PulsarSinkState { - None, - Ready(BoxedPulsarProducer), - Sending( - BoxFuture< - 'static, - ( - BoxedPulsarProducer, - Result, - RequestMetadata, - EventFinalizers, - ), - >, - ), -} - -struct PulsarSink { - transformer: Transformer, - encoder: Encoder<()>, - partition_key_field: Option, - state: PulsarSinkState, - in_flight: FuturesUnordered< - BoxFuture< - 'static, - ( - Result, - RequestMetadata, - EventFinalizers, - ), - >, - >, - bytes_sent: Registered, - events_sent: Registered, -} - -impl GenerateConfig for PulsarSinkConfig { - fn generate_config() -> toml::Value { - toml::Value::try_from(Self { - endpoint: "pulsar://127.0.0.1:6650".to_string(), - topic: "topic-1234".to_string(), - partition_key_field: None, - compression: Default::default(), - encoding: TextSerializerConfig::default().into(), - auth: None, - acknowledgements: Default::default(), - producer_name: None, - batch: Default::default(), - }) - .unwrap() - } -} - -#[async_trait::async_trait] -impl SinkConfig for PulsarSinkConfig { - async fn build( - &self, - _cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let producer = self - .create_pulsar_producer(false) - .await - .context(CreatePulsarSinkSnafu)?; - - let transformer = self.encoding.transformer(); - let serializer = self.encoding.build()?; - let encoder = Encoder::<()>::new(serializer); - - let sink = PulsarSink::new( - producer, - transformer, - encoder, - self.partition_key_field.clone(), - )?; - - let producer = self - .create_pulsar_producer(true) - .await - .context(CreatePulsarSinkSnafu)?; - let healthcheck = healthcheck(producer).boxed(); - - Ok((super::VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - let requirement = - schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); - - Input::log().with_schema_requirement(requirement) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -impl PulsarSinkConfig { - async fn create_pulsar_producer( - &self, - is_healthcheck: bool, - ) -> Result { - let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor); - if let Some(auth) = &self.auth { - builder = match ( - auth.name.as_ref(), - auth.token.as_ref(), - auth.oauth2.as_ref(), - ) { - (Some(name), Some(token), None) => builder.with_auth(Authentication { - name: name.clone(), - data: token.inner().as_bytes().to_vec(), - }), - (None, None, Some(oauth2)) => builder.with_auth_provider( - OAuth2Authentication::client_credentials(OAuth2Params { - issuer_url: oauth2.issuer_url.clone(), - credentials_url: oauth2.credentials_url.clone(), - audience: oauth2.audience.clone(), - scope: oauth2.scope.clone(), - }), - ), - _ => return Err(PulsarError::Authentication(AuthenticationError::Custom( - "Invalid auth config: can only specify name and token or oauth2 configuration" - .to_string(), - ))), - }; - } - - let pulsar = builder.build().await?; - let mut pulsar_builder = pulsar.producer().with_topic(&self.topic); - - if let Some(producer_name) = self.producer_name.clone() { - pulsar_builder = pulsar_builder.with_name(if is_healthcheck { - format!("{}-healthcheck", producer_name) - } else { - producer_name - }); - } - - let mut producer_options = pulsar::ProducerOptions { - compression: Some(match self.compression { - PulsarCompression::None => compression::Compression::None, - PulsarCompression::Lz4 => { - compression::Compression::Lz4(compression::CompressionLz4::default()) - } - PulsarCompression::Zlib => { - compression::Compression::Zlib(compression::CompressionZlib::default()) - } - PulsarCompression::Zstd => { - compression::Compression::Zstd(compression::CompressionZstd::default()) - } - PulsarCompression::Snappy => { - compression::Compression::Snappy(compression::CompressionSnappy::default()) - } - }), - ..Default::default() - }; - - if !is_healthcheck { - producer_options.batch_size = self.batch.max_events; - } - - if let SerializerConfig::Avro { avro } = self.encoding.config() { - producer_options.schema = Some(proto::Schema { - schema_data: avro.schema.as_bytes().into(), - r#type: proto::schema::Type::Avro as i32, - ..Default::default() - }); - } - - pulsar_builder.with_options(producer_options).build().await - } -} - -async fn healthcheck(producer: PulsarProducer) -> crate::Result<()> { - producer.check_connection().await.map_err(Into::into) -} - -impl PulsarSink { - fn new( - producer: PulsarProducer, - transformer: Transformer, - encoder: Encoder<()>, - partition_key_field: Option, - ) -> crate::Result { - Ok(Self { - transformer, - encoder, - state: PulsarSinkState::Ready(Box::new(producer)), - in_flight: FuturesUnordered::new(), - bytes_sent: register!(BytesSent::from(Protocol::TCP)), - events_sent: register!(EventsSent::from(Output(None))), - partition_key_field, - }) - } - - fn poll_in_flight_prepare(&mut self, cx: &mut Context<'_>) -> Poll<()> { - if let PulsarSinkState::Sending(fut) = &mut self.state { - let (producer, result, metadata, finalizers) = ready!(fut.as_mut().poll(cx)); - - self.state = PulsarSinkState::Ready(producer); - self.in_flight.push(Box::pin(async move { - let result = match result { - Ok(fut) => fut.await, - Err(error) => Err(error), - }; - (result, metadata, finalizers) - })); - } - - Poll::Ready(()) - } -} - -impl Sink for PulsarSink { - type Error = (); - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.poll_in_flight_prepare(cx)); - Poll::Ready(Ok(())) - } - - fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> { - assert!( - matches!(self.state, PulsarSinkState::Ready(_)), - "Expected `poll_ready` to be called first." - ); - - let key_value: Option = match (event.maybe_as_log(), &self.partition_key_field) { - (Some(log), Some(field)) => log.get(field.as_str()).map(|x| match x { - Value::Bytes(x) => String::from_utf8_lossy(x).to_string(), - x => x.to_string(), - }), - _ => None, - }; - - let event_time: Option = event - .maybe_as_log() - .and_then(|log| log.get_timestamp()) - .and_then(|value| value.as_timestamp()) - .map(|ts| ts.timestamp_millis()) - .map(|i| i as u64); - - let metadata_builder = RequestMetadataBuilder::from_events(&event); - self.transformer.transform(&mut event); - - let finalizers = event.take_finalizers(); - let mut bytes = BytesMut::new(); - self.encoder.encode(event, &mut bytes).map_err(|_| { - finalizers.update_status(EventStatus::Errored); - // Error is handled by `Encoder`. - })?; - - let bytes_len = - NonZeroUsize::new(bytes.len()).expect("payload should never be zero length"); - let metadata = metadata_builder.with_request_size(bytes_len); - - let mut producer = match std::mem::replace(&mut self.state, PulsarSinkState::None) { - PulsarSinkState::Ready(producer) => producer, - _ => unreachable!(), - }; - - let _ = std::mem::replace( - &mut self.state, - PulsarSinkState::Sending(Box::pin(async move { - let mut builder = producer.create_message().with_content(bytes.as_ref()); - if let Some(ts) = event_time { - builder = builder.event_time(ts); - }; - - if let Some(key) = key_value { - builder = builder.with_key(key); - }; - let result = builder.send().await; - (producer, result, metadata, finalizers) - })), - ); - - Ok(()) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.poll_in_flight_prepare(cx)); - - let this = Pin::into_inner(self); - while !this.in_flight.is_empty() { - match ready!(Pin::new(&mut this.in_flight).poll_next(cx)) { - Some((Ok(result), metadata, finalizers)) => { - trace!( - message = "Pulsar sink produced message.", - message_id = ?result.message_id, - producer_id = %result.producer_id, - sequence_id = %result.sequence_id, - ); - - finalizers.update_status(EventStatus::Delivered); - - this.events_sent.emit(CountByteSize( - metadata.event_count(), - metadata.events_estimated_json_encoded_byte_size(), - )); - this.bytes_sent - .emit(ByteSize(metadata.request_encoded_size())); - } - Some((Err(error), metadata, finalizers)) => { - finalizers.update_status(EventStatus::Errored); - emit!(PulsarSendingError { - error: Box::new(error), - count: metadata.event_count(), - }); - return Poll::Ready(Err(())); - } - None => break, - } - } - - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_flush(cx) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } -} - -#[cfg(feature = "pulsar-integration-tests")] -#[cfg(test)] -mod integration_tests { - use futures::StreamExt; - use pulsar::SubType; - - use super::*; - use crate::sinks::VectorSink; - use crate::test_util::{ - components::{assert_sink_compliance, SINK_TAGS}, - random_lines_with_stream, random_string, trace_init, - }; - - fn pulsar_address() -> String { - std::env::var("PULSAR_ADDRESS").unwrap_or_else(|_| "pulsar://127.0.0.1:6650".into()) - } - - #[tokio::test] - async fn pulsar_happy() { - trace_init(); - - let num_events = 1_000; - let (input, events) = random_lines_with_stream(100, num_events, None); - - let topic = format!("test-{}", random_string(10)); - let cnf = PulsarSinkConfig { - endpoint: pulsar_address(), - topic: topic.clone(), - producer_name: None, - compression: PulsarCompression::None, - encoding: TextSerializerConfig::default().into(), - auth: None, - acknowledgements: Default::default(), - partition_key_field: Some("message".to_string()), - batch: Default::default(), - }; - - let pulsar = Pulsar::::builder(&cnf.endpoint, TokioExecutor) - .build() - .await - .unwrap(); - let mut consumer = pulsar - .consumer() - .with_topic(&topic) - .with_consumer_name("VectorTestConsumer") - .with_subscription_type(SubType::Shared) - .with_subscription("VectorTestSub") - .with_options(pulsar::consumer::ConsumerOptions { - read_compacted: Some(false), - ..Default::default() - }) - .build::() - .await - .unwrap(); - - let producer = cnf.create_pulsar_producer(false).await.unwrap(); - let transformer = cnf.encoding.transformer(); - let serializer = cnf.encoding.build().unwrap(); - let encoder = Encoder::<()>::new(serializer); - - assert_sink_compliance(&SINK_TAGS, async move { - let sink = - PulsarSink::new(producer, transformer, encoder, cnf.partition_key_field).unwrap(); - VectorSink::from_event_sink(sink).run(events).await - }) - .await - .expect("Running sink failed"); - - for line in input { - let msg = match consumer.next().await.unwrap() { - Ok(msg) => msg, - Err(error) => panic!("{:?}", error), - }; - consumer.ack(&msg).await.unwrap(); - assert_eq!(String::from_utf8_lossy(&msg.payload.data), line); - assert_eq!( - msg.key(), - Some(String::from_utf8_lossy(&msg.payload.data).to_string()) - ); - assert!(msg.metadata().event_time.is_some()); - } - } -} diff --git a/src/sinks/pulsar/config.rs b/src/sinks/pulsar/config.rs new file mode 100644 index 0000000000000..af3ad4c07b644 --- /dev/null +++ b/src/sinks/pulsar/config.rs @@ -0,0 +1,308 @@ +use crate::{ + codecs::EncodingConfig, + config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, + schema, + sinks::{ + pulsar::sink::{healthcheck, PulsarSink}, + Healthcheck, VectorSink, + }, + template::Template, +}; +use codecs::{encoding::SerializerConfig, TextSerializerConfig}; +use futures_util::FutureExt; +use lookup::lookup_v2::OptionalTargetPath; +use pulsar::{ + authentication::oauth2::{OAuth2Authentication, OAuth2Params}, + compression, + message::proto, + Authentication, ConnectionRetryOptions, Error as PulsarError, ProducerOptions, Pulsar, + TokioExecutor, +}; +use pulsar::{error::AuthenticationError, OperationRetryOptions}; +use snafu::ResultExt; +use value::Kind; +use vector_common::sensitive_string::SensitiveString; +use vector_config::configurable_component; +use vector_core::config::DataType; + +/// Configuration for the `pulsar` sink. +#[configurable_component(sink("pulsar"))] +#[derive(Clone, Debug)] +pub struct PulsarSinkConfig { + /// The endpoint to which the Pulsar client should connect to. + /// + /// The endpoint should specify the pulsar protocol and port. + #[serde(alias = "address")] + #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))] + pub(crate) endpoint: String, + + /// The Pulsar topic name to write events to. + #[configurable(metadata(docs::examples = "topic-1234"))] + pub(crate) topic: Template, + + /// The name of the producer. If not specified, the default name assigned by Pulsar is used. + #[configurable(metadata(docs::examples = "producer-name"))] + pub(crate) producer_name: Option, + + /// The log field name or tags key to use for the partition key. + /// + /// If the field does not exist in the log event or metric tags, a blank value will be used. + /// + /// If omitted, the key is not sent. + /// + /// Pulsar uses a hash of the key to choose the topic-partition or uses round-robin if the record has no key. + #[configurable(metadata(docs::examples = "message"))] + #[configurable(metadata(docs::examples = "my_field"))] + pub(crate) partition_key_field: Option, + + /// The log field name to use for the Pulsar properties key. + /// + /// If omitted, no properties will be written. + pub properties_key: Option, + + #[configurable(derived)] + #[serde(default)] + pub(crate) batch: PulsarBatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub compression: PulsarCompression, + + #[configurable(derived)] + pub encoding: EncodingConfig, + + #[configurable(derived)] + pub(crate) auth: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +/// Event batching behavior. +#[configurable_component] +#[derive(Clone, Copy, Debug, Default)] +pub(crate) struct PulsarBatchConfig { + /// The maximum size of a batch before it is flushed. + /// + /// Note this is an unsigned 32 bit integer which is a smaller capacity than + /// many of the other sink batch settings. + #[configurable(metadata(docs::type_unit = "events"))] + #[configurable(metadata(docs::examples = 1000))] + pub max_events: Option, +} + +/// Authentication configuration. +#[configurable_component] +#[derive(Clone, Debug)] +pub(crate) struct PulsarAuthConfig { + /// Basic authentication name/username. + /// + /// This can be used either for basic authentication (username/password) or JWT authentication. + /// When used for JWT, the value should be `token`. + #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))] + #[configurable(metadata(docs::examples = "name123"))] + name: Option, + + /// Basic authentication password/token. + /// + /// This can be used either for basic authentication (username/password) or JWT authentication. + /// When used for JWT, the value should be the signed JWT, in the compact representation. + #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))] + #[configurable(metadata(docs::examples = "123456789"))] + token: Option, + + #[configurable(derived)] + oauth2: Option, +} + +/// OAuth2-specific authentication configuration. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct OAuth2Config { + /// The issuer URL. + #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))] + #[configurable(metadata(docs::examples = "https://oauth2.issuer"))] + issuer_url: String, + + /// The credentials URL. + /// + /// A data URL is also supported. + #[configurable(metadata(docs::examples = "{OAUTH2_CREDENTIALS_URL}"))] + #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))] + #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))] + credentials_url: String, + + /// The OAuth2 audience. + #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))] + #[configurable(metadata(docs::examples = "pulsar"))] + audience: Option, + + /// The OAuth2 scope. + #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))] + #[configurable(metadata(docs::examples = "admin"))] + scope: Option, +} + +/// Supported compression types for Pulsar. +#[configurable_component] +#[derive(Clone, Copy, Debug, Derivative)] +#[derivative(Default)] +#[serde(rename_all = "lowercase")] +pub enum PulsarCompression { + /// No compression. + #[derivative(Default)] + None, + + /// LZ4. + Lz4, + + /// Zlib. + Zlib, + + /// Zstandard. + Zstd, + + /// Snappy. + Snappy, +} + +impl Default for PulsarSinkConfig { + fn default() -> Self { + Self { + endpoint: "pulsar://127.0.0.1:6650".to_string(), + topic: Template::try_from("topic-1234") + .expect("Unable to parse default template topic"), + producer_name: None, + properties_key: None, + partition_key_field: None, + batch: Default::default(), + compression: Default::default(), + encoding: TextSerializerConfig::default().into(), + auth: None, + acknowledgements: Default::default(), + } + } +} + +impl PulsarSinkConfig { + pub(crate) async fn create_pulsar_client(&self) -> Result, PulsarError> { + let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor); + if let Some(auth) = &self.auth { + builder = match ( + auth.name.as_ref(), + auth.token.as_ref(), + auth.oauth2.as_ref(), + ) { + (Some(name), Some(token), None) => builder.with_auth(Authentication { + name: name.clone(), + data: token.inner().as_bytes().to_vec(), + }), + (None, None, Some(oauth2)) => builder.with_auth_provider( + OAuth2Authentication::client_credentials(OAuth2Params { + issuer_url: oauth2.issuer_url.clone(), + credentials_url: oauth2.credentials_url.clone(), + audience: oauth2.audience.clone(), + scope: oauth2.scope.clone(), + }), + ), + _ => return Err(PulsarError::Authentication(AuthenticationError::Custom( + "Invalid auth config: can only specify name and token or oauth2 configuration" + .to_string(), + ))), + }; + } + + // Apply configuration for reconnection exponential backoff. + let retry_opts = ConnectionRetryOptions::default(); + builder = builder.with_connection_retry_options(retry_opts); + + // Apply configuration for retrying Pulsar operations. + let operation_retry_opts = OperationRetryOptions::default(); + builder = builder.with_operation_retry_options(operation_retry_opts); + + builder.build().await + } + + pub(crate) fn build_producer_options(&self) -> ProducerOptions { + let mut opts = ProducerOptions { + encrypted: None, + access_mode: Some(0), + metadata: Default::default(), + schema: None, + batch_size: self.batch.max_events, + compression: None, + }; + + match &self.compression { + PulsarCompression::None => opts.compression = Some(compression::Compression::None), + PulsarCompression::Lz4 => { + opts.compression = Some(compression::Compression::Lz4( + compression::CompressionLz4::default(), + )) + } + PulsarCompression::Zlib => { + opts.compression = Some(compression::Compression::Zlib( + compression::CompressionZlib::default(), + )) + } + PulsarCompression::Zstd => { + opts.compression = Some(compression::Compression::Zstd( + compression::CompressionZstd::default(), + )) + } + PulsarCompression::Snappy => { + opts.compression = Some(compression::Compression::Snappy( + compression::CompressionSnappy::default(), + )) + } + } + + if let SerializerConfig::Avro { avro } = self.encoding.config() { + opts.schema = Some(proto::Schema { + schema_data: avro.schema.as_bytes().into(), + r#type: proto::schema::Type::Avro as i32, + ..Default::default() + }); + } + opts + } +} + +impl GenerateConfig for PulsarSinkConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self::default()).unwrap() + } +} + +#[async_trait::async_trait] +impl SinkConfig for PulsarSinkConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let client = self + .create_pulsar_client() + .await + .context(super::sink::CreatePulsarSinkSnafu)?; + + let sink = PulsarSink::new(client, self.clone())?; + + let hc = healthcheck(self.clone()).boxed(); + + Ok((VectorSink::from_event_streamsink(sink), hc)) + } + + fn input(&self) -> Input { + let requirement = + schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + + Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric)) + .with_schema_requirement(requirement) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} diff --git a/src/sinks/pulsar/encoder.rs b/src/sinks/pulsar/encoder.rs new file mode 100644 index 0000000000000..63d25f2e05711 --- /dev/null +++ b/src/sinks/pulsar/encoder.rs @@ -0,0 +1,30 @@ +//! Encoding for the `Pulsar` sink. +use crate::{ + event::Event, + sinks::util::encoding::{write_all, Encoder}, +}; +use bytes::BytesMut; +use std::io; +use tokio_util::codec::Encoder as _; + +#[derive(Clone, Debug)] +pub(super) struct PulsarEncoder { + pub(super) encoder: crate::codecs::Encoder<()>, + pub(super) transformer: crate::codecs::Transformer, +} + +impl Encoder for PulsarEncoder { + fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result { + let mut body = BytesMut::new(); + self.transformer.transform(&mut input); + let mut encoder = self.encoder.clone(); + encoder + .encode(input, &mut body) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "unable to encode"))?; + + let body = body.freeze(); + write_all(writer, 1, body.as_ref())?; + + Ok(body.len()) + } +} diff --git a/src/sinks/pulsar/integration_tests.rs b/src/sinks/pulsar/integration_tests.rs new file mode 100644 index 0000000000000..ac11fa940cb0b --- /dev/null +++ b/src/sinks/pulsar/integration_tests.rs @@ -0,0 +1,93 @@ +use crate::sinks::pulsar::{config::PulsarSinkConfig, sink::PulsarSink}; +use futures::StreamExt; +use pulsar::SubType; +use std::collections::BTreeMap; + +use crate::event::Value; +use crate::sinks::VectorSink; +use crate::template::Template; +use crate::test_util::{ + components::{assert_sink_compliance, SINK_TAGS}, + random_lines_with_stream, random_string, trace_init, +}; +use bytes::Bytes; + +fn pulsar_address() -> String { + std::env::var("PULSAR_ADDRESS").unwrap_or_else(|_| "pulsar://127.0.0.1:6650".into()) +} + +async fn pulsar_happy_reuse(mut cnf: PulsarSinkConfig) { + trace_init(); + + let prop_1_key = "prop-1-key"; + let prop_1_value = "prop-1-value"; + let num_events = 1_000; + let (input, events) = random_lines_with_stream(100, num_events, None); + + let prop_key_opt = cnf.properties_key.clone(); + let input_events = events.map(move |mut events| { + // if a property_key is defined, add some properties! + if let Some(properties_key) = &prop_key_opt { + if let Some(properties_key) = &properties_key.path { + let mut property_values = BTreeMap::new(); + property_values.insert( + prop_1_key.to_owned(), + Value::Bytes(Bytes::from(prop_1_value)), + ); + events.iter_logs_mut().for_each(move |log| { + log.insert(properties_key, property_values.clone()); + }); + return events; + } + } + events + }); + + let topic_str = format!("test-{}", random_string(10)); + let topic = Template::try_from(topic_str.clone()).expect("Unable to parse template"); + + cnf.topic = topic.clone(); + + let pulsar = cnf.create_pulsar_client().await.unwrap(); + let mut consumer = pulsar + .consumer() + .with_topic(&topic_str) + .with_consumer_name("VectorTestConsumer") + .with_subscription_type(SubType::Shared) + .with_subscription("VectorTestSub") + .with_options(pulsar::consumer::ConsumerOptions { + read_compacted: Some(false), + ..Default::default() + }) + .build::() + .await + .unwrap(); + + assert_sink_compliance(&SINK_TAGS, async move { + let sink = PulsarSink::new(pulsar, cnf).unwrap(); + let sink = VectorSink::from_event_streamsink(sink); + sink.run(input_events).await + }) + .await + .expect("Running sink failed"); + + for line in input { + let msg = match consumer.next().await.unwrap() { + Ok(msg) => msg, + Err(error) => panic!("{:?}", error), + }; + consumer.ack(&msg).await.unwrap(); + assert_eq!(String::from_utf8_lossy(&msg.payload.data), line); + } +} + +#[tokio::test] +async fn pulsar_happy() { + let cnf = PulsarSinkConfig { + endpoint: pulsar_address(), + // overriden by test + ..Default::default() + }; + + pulsar_happy_reuse(cnf).await +} diff --git a/src/sinks/pulsar/mod.rs b/src/sinks/pulsar/mod.rs new file mode 100644 index 0000000000000..3a4fe63a621d7 --- /dev/null +++ b/src/sinks/pulsar/mod.rs @@ -0,0 +1,12 @@ +pub(super) mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; +pub(super) mod util; + +#[cfg(test)] +mod tests; + +#[cfg(all(test, feature = "pulsar-integration-tests"))] +mod integration_tests; diff --git a/src/sinks/pulsar/request_builder.rs b/src/sinks/pulsar/request_builder.rs new file mode 100644 index 0000000000000..ec104d0ebf508 --- /dev/null +++ b/src/sinks/pulsar/request_builder.rs @@ -0,0 +1,75 @@ +use bytes::Bytes; +use std::collections::HashMap; +use std::io; +use vector_common::finalization::EventFinalizers; +use vector_common::request_metadata::RequestMetadata; + +use crate::sinks::pulsar::encoder::PulsarEncoder; +use crate::sinks::pulsar::sink::PulsarEvent; +use crate::sinks::util::metadata::RequestMetadataBuilder; +use crate::sinks::util::request_builder::EncodeResult; +use crate::sinks::util::{Compression, RequestBuilder}; +use crate::{ + event::{Event, Finalizable}, + sinks::pulsar::service::PulsarRequest, +}; + +#[derive(Clone)] +pub(super) struct PulsarMetadata { + pub finalizers: EventFinalizers, + pub key: Option, + pub properties: Option>, + pub timestamp_millis: Option, + pub topic: String, +} + +pub(super) struct PulsarRequestBuilder { + pub(super) encoder: PulsarEncoder, +} + +impl RequestBuilder for PulsarRequestBuilder { + type Metadata = PulsarMetadata; + type Events = Event; + type Encoder = PulsarEncoder; + type Payload = Bytes; + type Request = PulsarRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + // Compression is handled by the pulsar crate through the producer settings. + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut input: PulsarEvent, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let builder = RequestMetadataBuilder::from_events(&input); + let metadata = PulsarMetadata { + finalizers: input.event.take_finalizers(), + key: input.key, + timestamp_millis: input.timestamp_millis, + properties: input.properties, + topic: input.topic, + }; + (metadata, builder, input.event) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + let body = payload.into_payload(); + PulsarRequest { + body, + metadata, + request_metadata, + } + } +} diff --git a/src/sinks/pulsar/service.rs b/src/sinks/pulsar/service.rs new file mode 100644 index 0000000000000..d723748fd1c7a --- /dev/null +++ b/src/sinks/pulsar/service.rs @@ -0,0 +1,157 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures::future::BoxFuture; +use pulsar::producer::Message; +use pulsar::{Error as PulsarError, Executor, MultiTopicProducer, ProducerOptions, Pulsar}; +use tokio::sync::Mutex; +use tower::Service; +use vector_common::internal_event::CountByteSize; +use vector_core::stream::DriverResponse; + +use crate::event::{EventFinalizers, EventStatus, Finalizable}; +use crate::internal_events::PulsarSendingError; +use crate::sinks::pulsar::request_builder::PulsarMetadata; +use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; + +#[derive(Clone)] +pub(super) struct PulsarRequest { + pub body: Bytes, + pub metadata: PulsarMetadata, + pub request_metadata: RequestMetadata, +} + +pub struct PulsarResponse { + event_byte_size: usize, +} + +impl DriverResponse for PulsarResponse { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> CountByteSize { + CountByteSize(1, self.event_byte_size) + } + + fn bytes_sent(&self) -> Option { + Some(self.event_byte_size) + } +} + +impl Finalizable for PulsarRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.metadata.finalizers) + } +} + +impl MetaDescriptive for PulsarRequest { + fn get_metadata(&self) -> RequestMetadata { + self.request_metadata + } +} + +pub struct PulsarService { + // NOTE: the reason for the Mutex here is because the `Producer` from the pulsar crate + // needs to be `mut`, and the `Service::call()` returns a Future. + producer: Arc>>, +} + +impl PulsarService { + pub(crate) fn new( + pulsar_client: Pulsar, + producer_options: ProducerOptions, + producer_name: Option, + ) -> PulsarService { + let mut builder = pulsar_client.producer().with_options(producer_options); + + if let Some(name) = producer_name { + builder = builder.with_name(name); + } + + let producer = builder.build_multi_topic(); + + PulsarService { + producer: Arc::new(Mutex::new(producer)), + } + } +} + +impl Service for PulsarService { + type Response = PulsarResponse; + type Error = PulsarError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + match self.producer.try_lock() { + Ok(_) => Poll::Ready(Ok(())), + Err(_) => Poll::Pending, + } + } + + fn call(&mut self, request: PulsarRequest) -> Self::Future { + let producer = Arc::clone(&self.producer); + let topic = request.metadata.topic.clone(); + let event_time = request + .metadata + .timestamp_millis + .to_owned() + .map(|t| t as u64); + + Box::pin(async move { + let body = request.body.clone(); + + let mut properties = HashMap::new(); + if let Some(props) = request.metadata.properties { + for (key, value) in props { + properties.insert(key, String::from_utf8_lossy(&value).to_string()); + } + } + + let partition_key = request + .metadata + .key + .map(|key| String::from_utf8_lossy(&key).to_string()); + + let message = Message { + payload: body.as_ref().to_vec(), + properties, + partition_key, + event_time, + ..Default::default() + }; + + // The locking if this mutex is not normal in `Service::call()` implementations, but we + // at least can limit the scope of the lock by placing it here, and reduce the + // possibility of performance impact by checking the `try_lock()` result in + // `poll_ready()`. This sink is already limited to sequential request handling due to + // the pulsar API, so this shouldn't impact performance from a concurrent requests + // standpoint. + let fut = producer.lock().await.send(topic, message).await; + + match fut { + Ok(resp) => match resp.await { + Ok(_) => Ok(PulsarResponse { + event_byte_size: request.request_metadata.events_byte_size(), + }), + Err(e) => { + emit!(PulsarSendingError { + error: Box::new(PulsarError::Custom("failed to send".to_string())), + count: 1 + }); + Err(e) + } + }, + Err(e) => { + emit!(PulsarSendingError { + error: Box::new(PulsarError::Custom("failed to send".to_string())), + count: 1, + }); + Err(e) + } + } + }) + } +} diff --git a/src/sinks/pulsar/sink.rs b/src/sinks/pulsar/sink.rs new file mode 100644 index 0000000000000..0d7e13ec6c84d --- /dev/null +++ b/src/sinks/pulsar/sink.rs @@ -0,0 +1,146 @@ +use async_trait::async_trait; +use bytes::Bytes; +use futures::{stream::BoxStream, StreamExt}; +use pulsar::{Error as PulsarError, Pulsar, TokioExecutor}; +use serde::Serialize; +use snafu::Snafu; +use std::collections::HashMap; +use tower::ServiceBuilder; + +use crate::{ + codecs::{Encoder, Transformer}, + event::Event, + sinks::util::SinkBuilderExt, + template::Template, +}; +use vector_buffers::EventCount; +use vector_common::byte_size_of::ByteSizeOf; +use vector_core::{ + event::{EstimatedJsonEncodedSizeOf, LogEvent}, + sink::StreamSink, +}; + +use super::{ + config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder, + service::PulsarService, util, +}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +pub(crate) enum BuildError { + #[snafu(display("creating pulsar producer failed: {}", source))] + CreatePulsarSink { source: PulsarError }, +} + +pub(crate) struct PulsarSink { + transformer: Transformer, + encoder: Encoder<()>, + service: PulsarService, + config: PulsarSinkConfig, + topic_template: Template, +} + +/// Stores the event together with the extracted keys, topics, etc. +/// This is passed into the `RequestBuilder` which then splits it out into the event +/// and metadata containing the keys, and metadata. +/// This event needs to be created prior to building the request so we can filter out +/// any events that error whilst rendering the templates. +#[derive(Serialize)] +pub(super) struct PulsarEvent { + pub(super) event: Event, + pub(super) topic: String, + pub(super) key: Option, + pub(super) properties: Option>, + pub(super) timestamp_millis: Option, +} + +impl EventCount for PulsarEvent { + fn event_count(&self) -> usize { + // A PulsarEvent represents one event. + 1 + } +} + +impl ByteSizeOf for PulsarEvent { + fn allocated_bytes(&self) -> usize { + self.event.size_of() + + self.topic.size_of() + + self.key.as_ref().map_or(0, |bytes| bytes.size_of()) + + self.properties.as_ref().map_or(0, |props| { + props + .iter() + .map(|(key, val)| key.capacity() + val.size_of()) + .sum() + }) + } +} + +impl EstimatedJsonEncodedSizeOf for PulsarEvent { + fn estimated_json_encoded_size_of(&self) -> usize { + self.event.estimated_json_encoded_size_of() + } +} + +pub(crate) async fn healthcheck(config: PulsarSinkConfig) -> crate::Result<()> { + let client = config.create_pulsar_client().await?; + let topic = config.topic.render_string(&LogEvent::from_str_legacy(""))?; + client.lookup_topic(topic).await?; + Ok(()) +} + +impl PulsarSink { + pub(crate) fn new( + client: Pulsar, + config: PulsarSinkConfig, + ) -> crate::Result { + let producer_opts = config.build_producer_options(); + let transformer = config.encoding.transformer(); + let serializer = config.encoding.build()?; + let encoder = Encoder::<()>::new(serializer); + let service = PulsarService::new(client, producer_opts, config.producer_name.clone()); + let topic_template = config.topic.clone(); + + Ok(PulsarSink { + config, + transformer, + encoder, + service, + topic_template, + }) + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let service = ServiceBuilder::new().service(self.service); + let request_builder = PulsarRequestBuilder { + encoder: PulsarEncoder { + transformer: self.transformer.clone(), + encoder: self.encoder.clone(), + }, + }; + let sink = input + .filter_map(|event| { + std::future::ready(util::make_pulsar_event( + &self.topic_template, + &self.config, + event, + )) + }) + .request_builder(None, request_builder) + .filter_map(|request| async move { + request + .map_err(|e| error!("Failed to build Pulsar request: {:?}.", e)) + .ok() + }) + .into_driver(service) + .protocol("tcp"); + + sink.run().await + } +} + +#[async_trait] +impl StreamSink for PulsarSink { + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/pulsar/tests.rs b/src/sinks/pulsar/tests.rs new file mode 100644 index 0000000000000..3464f194c89bb --- /dev/null +++ b/src/sinks/pulsar/tests.rs @@ -0,0 +1,33 @@ +use crate::event::Event; +use crate::sinks::pulsar::config::PulsarSinkConfig; +use lookup::lookup_v2::OptionalTargetPath; +use std::collections::BTreeMap; +use value::Value; +use vector_config::component::GenerateConfig; + +use bytes::Bytes; + +use crate::event::LogEvent; + +#[test] +fn generate_config() { + PulsarSinkConfig::generate_config(); +} + +#[test] +fn pulsar_get_headers() { + let properties_key = OptionalTargetPath::try_from("properties".to_string()) + .expect("unable to parse OptionalTargetPath"); + let mut property_values = BTreeMap::new(); + property_values.insert("a-key".to_string(), Value::Bytes(Bytes::from("a-value"))); + property_values.insert("b-key".to_string(), Value::Bytes(Bytes::from("b-value"))); + + let mut event = Event::Log(LogEvent::from("hello")); + event + .as_mut_log() + .insert(properties_key.path.as_ref().unwrap(), property_values); + + let properties = super::util::get_properties(&event, &Some(properties_key)).unwrap(); + assert_eq!(properties.get("a-key").unwrap(), "a-value".as_bytes()); + assert_eq!(properties.get("b-key").unwrap(), "b-value".as_bytes()); +} diff --git a/src/sinks/pulsar/util.rs b/src/sinks/pulsar/util.rs new file mode 100644 index 0000000000000..f658bd8e394dc --- /dev/null +++ b/src/sinks/pulsar/util.rs @@ -0,0 +1,88 @@ +use crate::internal_events::PulsarPropertyExtractionError; +use crate::sinks::pulsar::config::PulsarSinkConfig; +use crate::sinks::pulsar::sink::PulsarEvent; +use crate::template::Template; +use bytes::Bytes; +use lookup::lookup_v2::OptionalTargetPath; +use std::collections::HashMap; +use value::Value; +use vector_core::event::Event; + +/// Transforms an event into a Pulsar event by rendering the required template fields. +/// Returns None if there is an error whilst rendering. +pub(super) fn make_pulsar_event( + topic: &Template, + config: &PulsarSinkConfig, + event: Event, +) -> Option { + let topic = topic.render_string(&event).ok()?; + let key = get_key(&event, &config.partition_key_field); + let timestamp_millis = get_timestamp_millis(&event); + let properties = get_properties(&event, &config.properties_key); + Some(PulsarEvent { + event, + topic, + key, + timestamp_millis, + properties, + }) +} + +fn get_key(event: &Event, partition_key_field: &Option) -> Option { + partition_key_field + .as_ref() + .and_then(|partition_key_field| match event { + Event::Log(log) => partition_key_field + .path + .as_ref() + .and_then(|path| log.get(path).map(|value| value.coerce_to_bytes())), + Event::Metric(metric) => partition_key_field + .path + .as_ref() + .and_then(|path| metric.tags().and_then(|tags| tags.get(&path.to_string()))) + .map(|value| value.to_owned().into()), + _ => None, + }) +} + +fn get_timestamp_millis(event: &Event) -> Option { + match &event { + Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(), + Event::Metric(metric) => metric.timestamp(), + _ => None, + } + .map(|ts| ts.timestamp_millis()) +} + +pub(super) fn get_properties( + event: &Event, + properties_key: &Option, +) -> Option> { + properties_key.as_ref().and_then(|properties_key| { + properties_key.path.as_ref().and_then(|path| { + event.maybe_as_log().and_then(|log| { + log.get(path).and_then(|properties| match properties { + Value::Object(headers_map) => { + let mut property_map = HashMap::new(); + for (key, value) in headers_map { + if let Value::Bytes(value_bytes) = value { + property_map.insert(key.clone(), value_bytes.clone()); + } else { + emit!(PulsarPropertyExtractionError { + property_field: path + }); + } + } + Some(property_map) + } + _ => { + emit!(PulsarPropertyExtractionError { + property_field: path + }); + None + } + }) + }) + }) + }) +} diff --git a/src/validate.rs b/src/validate.rs index ef6bc2911e098..f11d41d1845f8 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -218,6 +218,7 @@ async fn validate_healthchecks( fmt.error(error); }; + trace!("Healthcheck for {id} starting."); match tokio::spawn(healthcheck).await { Ok(Ok(_)) => { if config @@ -238,6 +239,7 @@ async fn validate_healthchecks( } Err(_) => failed(format!("Health check for \"{}\" panicked", id)), } + trace!("Healthcheck for {id} done."); } validated diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index 619dd89cd16d8..cd55deaf6093c 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -87,8 +87,13 @@ base: components: sinks: pulsar: configuration: { description: "Event batching behavior." required: false type: object: options: max_events: { - description: "The maximum size of a batch before it is flushed." - required: false + description: """ + The maximum size of a batch before it is flushed. + + Note this is an unsigned 32 bit integer which is a smaller capacity than + many of the other sink batch settings. + """ + required: false type: uint: { examples: [1000] unit: "events" @@ -258,8 +263,16 @@ base: components: sinks: pulsar: configuration: { type: string: examples: ["pulsar://127.0.0.1:6650"] } partition_key_field: { - description: "Log field to use as Pulsar message key." - required: false + description: """ + The log field name or tags key to use for the partition key. + + If the field does not exist in the log event or metric tags, a blank value will be used. + + If omitted, the key is not sent. + + Pulsar uses a hash of the key to choose the topic-partition or uses round-robin if the record has no key. + """ + required: false type: string: examples: ["message", "my_field"] } producer_name: { @@ -267,9 +280,21 @@ base: components: sinks: pulsar: configuration: { required: false type: string: examples: ["producer-name"] } + properties_key: { + description: """ + The log field name to use for the Pulsar properties key. + + If omitted, no properties will be written. + """ + required: false + type: string: {} + } topic: { description: "The Pulsar topic name to write events to." required: true - type: string: examples: ["topic-1234"] + type: string: { + examples: ["topic-1234"] + syntax: "template" + } } }