diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 44600c712e588..57c2fb7ff9110 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -55,6 +55,7 @@ jobs: fail-fast: false matrix: include: + - test: 'amqp' - test: 'apex' - test: 'aws' - test: 'axiom' diff --git a/Cargo.lock b/Cargo.lock index 03dba1f41a2a2..43553d1f4071a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,54 @@ dependencies = [ "memchr", ] +[[package]] +name = "amq-protocol" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acc7cad07d1b4533fcb46f0819a6126fa201fd0385469aba75e405424f3fe009" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d8b20aba8c35a0b885e1e978eff456ced925730a4e012e63e4ff89a1deb602b" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing 0.1.34", +] + +[[package]] +name = "amq-protocol-types" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e245e0e9083b6a6db5f8c10013074cb382266eb9e2a37204d19c651b8d3b8114" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56987108bf48d2eb500cae8896cd9291564eedd8744776ecc5c3338a8b2ca5f8" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + [[package]] name = "ansi_term" version = "0.12.1" @@ -223,6 +271,33 @@ dependencies = [ "futures-lite", ] +[[package]] +name = "async-global-executor" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5262ed948da60dd8956c6c5aca4d4163593dddb7b32d73267c93dab7b2e98940" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "num_cpus", + "once_cell", +] + +[[package]] +name = "async-global-executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33dd14c5a15affd2abcff50d84efd4009ada28a860f01c14f9d654f3e81b3f75" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + [[package]] name = "async-graphql" version = "4.0.13" @@ -362,6 +437,18 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io", + "async-trait", + "futures-core", + "reactor-trait", +] + [[package]] name = "async-recursion" version = "1.0.0" @@ -1708,6 +1795,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.3" @@ -2515,6 +2608,15 @@ version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" +[[package]] +name = "executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a1052dd43212a7777ec6a69b117da52f5e52f07aec47d00c1a2b33b85d06b08" +dependencies = [ + "async-trait", +] + [[package]] name = "exitcode" version = "1.1.2" @@ -2633,6 +2735,18 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b59b6469c35ab601d6487d28879bccfbe8c896c33a3fe699c4d29817e552cc58" +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin 0.9.3", +] + [[package]] name = "fnv" version = "1.0.7" @@ -3879,6 +3993,28 @@ dependencies = [ "regex", ] +[[package]] +name = "lapin" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd03ea5831b44775e296239a64851e2fd14a80a363d202ba147009ffc994ff0f" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "parking_lot", + "pinky-swear", + "reactor-trait", + "serde", + "tracing 0.1.34", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -5316,6 +5452,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac" +dependencies = [ + "doc-comment", + "flume", + "parking_lot", + "tracing 0.1.34", +] + [[package]] name = "pkcs8" version = "0.7.6" @@ -5997,6 +6145,17 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + [[package]] name = "redis" version = "0.21.6" @@ -7094,6 +7253,9 @@ name = "spin" version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" +dependencies = [ + "lock_api", +] [[package]] name = "spki" @@ -7277,6 +7439,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" +[[package]] +name = "tcp-stream" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a4b0a70bac0a58ca6a7659d1328e34ee462339c70b0fa49f72bad1f278910a" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "temp-dir" version = "0.1.11" @@ -8510,6 +8681,7 @@ dependencies = [ "itertools 0.10.4", "k8s-openapi", "kube", + "lapin", "libc", "listenfd", "logfmt", diff --git a/Cargo.toml b/Cargo.toml index d2fb56096ed98..369d0e84afc99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -198,6 +198,9 @@ prost-types = { version = "0.10.1", default-features = false, optional = true } goauth = { version = "0.13.1", optional = true } smpl_jwt = { version = "0.7.1", default-features = false, optional = true } +# AMQP +lapin = { version = "2.1.1", default-features = false, optional = true } + # API async-graphql = { version = "4.0.13", default-features = false, optional = true, features = ["chrono"] } async-graphql-warp = { version = "4.0.13", default-features = false, optional = true } @@ -442,6 +445,7 @@ enrichment-tables-geoip = ["dep:maxminddb"] # Sources sources = ["sources-logs", "sources-metrics"] sources-logs = [ + "sources-amqp", "sources-aws_kinesis_firehose", "sources-aws_s3", "sources-aws_sqs", @@ -484,6 +488,7 @@ sources-metrics = [ "sources-vector", ] +sources-amqp = ["lapin"] sources-apache_metrics = [] sources-aws_ecs_metrics = [] sources-aws_kinesis_firehose = ["dep:base64", "dep:infer"] @@ -577,6 +582,7 @@ transforms-throttle = ["dep:governor"] # Sinks sinks = ["sinks-logs", "sinks-metrics"] sinks-logs = [ + "sinks-amqp", "sinks-apex", "sinks-aws_cloudwatch_logs", "sinks-aws_kinesis_firehose", @@ -631,6 +637,7 @@ sinks-metrics = [ "sinks-splunk_hec" ] +sinks-amqp = ["lapin"] sinks-apex = [] sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs"] sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"] @@ -693,6 +700,7 @@ nightly = [] # Testing-related features all-integration-tests = [ + "amqp-integration-tests", "apex-integration-tests", "aws-integration-tests", "axiom-integration-tests", @@ -728,6 +736,8 @@ all-integration-tests = [ "dnstap-integration-tests", ] +amqp-integration-tests = ["sources-amqp", "sinks-amqp"] + aws-integration-tests = [ "aws-cloudwatch-logs-integration-tests", "aws-cloudwatch-metrics-integration-tests", @@ -877,4 +887,3 @@ name = "codecs" path = "benches/codecs/main.rs" harness = false required-features = ["codecs-benches"] - diff --git a/Makefile b/Makefile index 01f6db048f217..516bf9d5d59c5 100644 --- a/Makefile +++ b/Makefile @@ -334,7 +334,7 @@ test-enterprise: ## Runs enterprise related behavioral tests .PHONY: test-integration test-integration: ## Runs all integration tests -test-integration: test-integration-apex test-integration-aws test-integration-axiom test-integration-azure test-integration-clickhouse test-integration-docker-logs test-integration-elasticsearch +test-integration: test-integration-amqp test-integration-apex test-integration-aws test-integration-axiom test-integration-azure test-integration-clickhouse test-integration-docker-logs test-integration-elasticsearch test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-humio test-integration-http-scrape test-integration-influxdb test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats test-integration: test-integration-nginx test-integration-opentelemetry test-integration-postgres test-integration-prometheus test-integration-pulsar diff --git a/scripts/integration/docker-compose.amqp.yml b/scripts/integration/docker-compose.amqp.yml new file mode 100644 index 0000000000000..c3eca99cf70b9 --- /dev/null +++ b/scripts/integration/docker-compose.amqp.yml @@ -0,0 +1,36 @@ +version: "3" + +services: + rabbitmq: + image: docker.io/rabbitmq:3.8 + ports: + - 5672:5672 + runner: + build: + context: ${PWD} + dockerfile: scripts/integration/Dockerfile + args: + - RUST_VERSION=${RUST_VERSION} + working_dir: /code + command: + - "cargo" + - "nextest" + - "run" + - "--no-fail-fast" + - "--no-default-features" + - "--features" + - "amqp-integration-tests" + - "--lib" + - "${FILTER:-::amqp::}" + depends_on: + - rabbitmq + volumes: + - ${PWD}:/code + - target:/code/target + - cargogit:/usr/local/cargo/git + - cargoregistry:/usr/local/cargo/registry + +volumes: + target: {} + cargogit: {} + cargoregistry: {} diff --git a/src/amqp.rs b/src/amqp.rs new file mode 100644 index 0000000000000..82059e82c97d0 --- /dev/null +++ b/src/amqp.rs @@ -0,0 +1,68 @@ +use lapin::tcp::{OwnedIdentity, OwnedTLSConfig}; +use vector_config::configurable_component; + +/// Connection options for `AMQP`. +#[configurable_component] +#[derive(Clone, Debug)] +pub(crate) struct AmqpConfig { + /// URI for the `AMQP` server. + /// + /// Format: amqp://:@:/?timeout= + pub(crate) connection_string: String, + + #[configurable(derived)] + pub(crate) tls: Option, +} + +impl Default for AmqpConfig { + fn default() -> Self { + Self { + connection_string: "amqp://127.0.0.1/%2f".to_string(), + tls: None, + } + } +} + +impl AmqpConfig { + pub(crate) async fn connect( + &self, + ) -> Result<(lapin::Connection, lapin::Channel), Box> { + debug!("Connecting to {}.", self.connection_string); + let addr = self.connection_string.clone(); + let conn = match &self.tls { + Some(tls) => { + let cert_chain = if let Some(ca) = &tls.ca_file { + Some(tokio::fs::read_to_string(ca.to_owned()).await?) + } else { + None + }; + let identity = if let Some(identity) = &tls.key_file { + let der = tokio::fs::read(identity.to_owned()).await?; + Some(OwnedIdentity { + der, + password: tls + .key_pass + .as_ref() + .map(|s| s.to_string()) + .unwrap_or_else(String::default), + }) + } else { + None + }; + let tls_config = OwnedTLSConfig { + identity, + cert_chain, + }; + lapin::Connection::connect_with_config( + &addr, + lapin::ConnectionProperties::default(), + tls_config, + ) + .await + } + None => lapin::Connection::connect(&addr, lapin::ConnectionProperties::default()).await, + }?; + let channel = conn.create_channel().await?; + Ok((conn, channel)) + } +} diff --git a/src/app.rs b/src/app.rs index d59b0c68a6326..a94f6e00561c9 100644 --- a/src/app.rs +++ b/src/app.rs @@ -88,6 +88,7 @@ impl Application { "tower_limit=trace".to_owned(), format!("rdkafka={}", level), format!("buffers={}", level), + format!("lapin={}", level), format!("kube={}", level), ] .join(","), diff --git a/src/internal_events/amqp.rs b/src/internal_events/amqp.rs new file mode 100644 index 0000000000000..7034e5f2f3bfe --- /dev/null +++ b/src/internal_events/amqp.rs @@ -0,0 +1,155 @@ +#[cfg(feature = "sources-amqp")] +pub mod source { + use metrics::counter; + use vector_common::internal_event::{error_stage, error_type}; + use vector_core::internal_event::InternalEvent; + + #[derive(Debug)] + pub struct AmqpBytesReceived { + pub byte_size: usize, + pub protocol: &'static str, + } + + impl InternalEvent for AmqpBytesReceived { + fn emit(self) { + trace!( + message = "Bytes received.", + byte_size = %self.byte_size, + protocol = %self.protocol, + ); + counter!( + "component_received_bytes_total", + self.byte_size as u64, + "protocol" => self.protocol, + ); + } + } + + #[derive(Debug)] + pub struct AmqpEventError { + pub error: lapin::Error, + } + + impl InternalEvent for AmqpEventError { + fn emit(self) { + error!(message = "Failed to read message.", + error = ?self.error, + error_type = error_type::REQUEST_FAILED, + stage = error_stage::RECEIVING, + internal_log_rate_secs = 10 + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::REQUEST_FAILED, + "stage" => error_stage::RECEIVING, + ); + } + } + + #[derive(Debug)] + pub struct AmqpAckError { + pub error: lapin::Error, + } + + impl InternalEvent for AmqpAckError { + fn emit(self) { + error!(message = "Unable to ack.", + error = ?self.error, + error_type = error_type::ACKNOWLEDGMENT_FAILED, + stage = error_stage::RECEIVING, + internal_log_rate_secs = 10 + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::ACKNOWLEDGMENT_FAILED, + "stage" => error_stage::RECEIVING, + ); + } + } + + #[derive(Debug)] + pub struct AmqpRejectError { + pub error: lapin::Error, + } + + impl InternalEvent for AmqpRejectError { + fn emit(self) { + error!(message = "Unable to reject.", + error = ?self.error, + error_type = error_type::COMMAND_FAILED, + stage = error_stage::RECEIVING, + internal_log_rate_secs = 10 + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::COMMAND_FAILED, + "stage" => error_stage::RECEIVING, + ); + } + } +} + +#[cfg(feature = "sinks-amqp")] +pub mod sink { + use crate::{ + emit, + internal_events::{ComponentEventsDropped, UNINTENTIONAL}, + }; + use metrics::counter; + use vector_common::internal_event::{error_stage, error_type}; + use vector_core::internal_event::InternalEvent; + + #[derive(Debug)] + pub struct AmqpDeliveryError<'a> { + pub error: &'a lapin::Error, + } + + impl InternalEvent for AmqpDeliveryError<'_> { + fn emit(self) { + let deliver_reason = "Unable to deliver."; + + error!(message = deliver_reason, + error = ?self.error, + error_type = error_type::REQUEST_FAILED, + stage = error_stage::SENDING, + internal_log_rate_secs = 10 + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::REQUEST_FAILED, + "stage" => error_stage::SENDING, + ); + emit!(ComponentEventsDropped:: { + count: 1, + reason: deliver_reason + }); + } + } + + #[derive(Debug)] + pub struct AmqpAcknowledgementError<'a> { + pub error: &'a lapin::Error, + } + + impl InternalEvent for AmqpAcknowledgementError<'_> { + fn emit(self) { + let ack_reason = "Acknowledgement failed."; + + error!(message = ack_reason, + error = ?self.error, + error_type = error_type::REQUEST_FAILED, + stage = error_stage::SENDING, + internal_log_rate_secs = 10 + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::REQUEST_FAILED, + "stage" => error_stage::SENDING, + ); + emit!(ComponentEventsDropped:: { + count: 1, + reason: ack_reason + }); + } + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index da6007836dbae..0354665402423 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -2,6 +2,8 @@ pub mod prelude; mod adaptive_concurrency; mod aggregate; +#[cfg(any(feature = "sources-amqp", feature = "sinks-amqp"))] +mod amqp; #[cfg(feature = "sources-apache_metrics")] mod apache_metrics; #[cfg(feature = "api")] @@ -133,6 +135,8 @@ pub(crate) use mongodb_metrics::*; #[cfg(feature = "transforms-aggregate")] pub(crate) use self::aggregate::*; +#[cfg(any(feature = "sources-amqp", feature = "sinks-amqp"))] +pub(crate) use self::amqp::*; #[cfg(feature = "sources-apache_metrics")] pub(crate) use self::apache_metrics::*; #[cfg(feature = "api")] diff --git a/src/lib.rs b/src/lib.rs index 7c0f321bf452e..2e93c4ec78330 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,8 @@ pub mod generate_schema; #[macro_use] #[allow(unreachable_pub)] pub mod internal_events; +#[cfg(feature = "lapin")] +pub mod amqp; #[cfg(feature = "api")] #[allow(unreachable_pub)] pub mod api; diff --git a/src/sinks/amqp/config.rs b/src/sinks/amqp/config.rs new file mode 100644 index 0000000000000..a131e25438f8b --- /dev/null +++ b/src/sinks/amqp/config.rs @@ -0,0 +1,104 @@ +//! Configuration functionality for the `AMQP` sink. +use crate::{ + amqp::AmqpConfig, + codecs::EncodingConfig, + config::{DataType, GenerateConfig, Input, SinkConfig, SinkContext}, + sinks::{Healthcheck, VectorSink}, + template::Template, +}; +use codecs::TextSerializerConfig; +use futures::FutureExt; +use std::sync::Arc; +use vector_config::configurable_component; +use vector_core::config::AcknowledgementsConfig; + +use super::sink::AmqpSink; + +/// Configuration for the `amqp` sink. +/// +/// Supports AMQP version 0.9.1 +#[configurable_component(sink("amqp"))] +#[derive(Clone, Debug)] +pub struct AmqpSinkConfig { + /// The exchange to publish messages to. + #[configurable(metadata(templateable))] + pub(crate) exchange: Template, + + /// Template used to generate a routing key which corresponds to a queue binding. + #[configurable(metadata(templateable))] + pub(crate) routing_key: Option