diff --git a/Cargo.lock b/Cargo.lock index 5e99dca0b9cc95..addd31e9125702 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,7 +63,7 @@ checksum = "4860b3d63ffb63b3ed440b78ccd549b009e0f72d995b9e9aec51e8e1af01e694" dependencies = [ "amq-protocol-uri", "tcp-stream", - "tracing 0.1.23", + "tracing 0.1.25", ] [[package]] @@ -359,17 +359,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "async-lapin" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bef3111e70cb8ed40a2250252cfa96d64b880fbb58b0f67a049b6945503f8d6" -dependencies = [ - "async-io", - "lapin", - "parking_lot", -] - [[package]] name = "async-lock" version = "2.3.0" @@ -4741,7 +4730,7 @@ checksum = "9bf8cda6f8e1500338634e4e3ce90ac59eb7929a1e088b6946c742be1cc44dc1" dependencies = [ "doc-comment", "parking_lot", - "tracing 0.1.23", + "tracing 0.1.25", ] [[package]] @@ -7044,6 +7033,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "tokio-amqp" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a236324e1e84931e62c22e2ee47688e077aa33a2a95f14d29ab8ec4dbaf9845" +dependencies = [ + "lapin", + "parking_lot", + "tokio", +] + [[package]] name = "tokio-executor" version = "0.1.10" @@ -7778,7 +7778,6 @@ dependencies = [ "async-compression", "async-graphql", "async-graphql-warp", - "async-lapin", "async-nats", "async-stream", "async-trait", @@ -7893,7 +7892,6 @@ dependencies = [ "serde_json", "serde_yaml", "shared", - "smol", "smpl_jwt", "snafu", "snap", @@ -7905,6 +7903,7 @@ dependencies = [ "syslog_loose", "tempfile", "tokio", + "tokio-amqp", "tokio-openssl", "tokio-postgres", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 15ab9d1c34f7ee..353e09187642c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,8 +149,7 @@ smpl_jwt = { version = "0.6.1", default-features = false, optional = true } # Amqp lapin = { version = "1.6", optional = true } -async-lapin = { version = "1.2", optional = true } -smol = { version = "1.2", optional = true } +tokio-amqp = { version = "1.0", optional = true } # API async-graphql = { version = "=2.6.4", default-features = false, optional = true, features = ["chrono"] } @@ -389,7 +388,7 @@ sources-metrics = [ "sources-vector", ] -sources-amqp = ["lapin", "async-lapin", "smol"] +sources-amqp = ["lapin", "tokio-amqp"] sources-apache_metrics = [] sources-aws_ecs_metrics = [] sources-aws_kinesis_firehose = ["base64", "sources-utils-tls", "warp"] @@ -544,7 +543,7 @@ sinks-metrics = [ "sinks-vector" ] -sinks-amqp = ["lapin", "async-lapin", "smol"] +sinks-amqp = ["lapin", "tokio-amqp"] sinks-aws_cloudwatch_logs = ["rusoto", "rusoto_logs"] sinks-aws_cloudwatch_metrics = ["rusoto", "rusoto_cloudwatch"] sinks-aws_kinesis_firehose = ["rusoto", "rusoto_firehose"] diff --git a/src/amqp.rs b/src/amqp.rs index ff9a02eddc216b..b5720ef2aadf65 100644 --- a/src/amqp.rs +++ b/src/amqp.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; -use async_lapin::*; +use tokio_amqp::*; #[derive(Clone, Debug, Deserialize, Serialize)] pub(crate) struct AmqpConfig { @@ -25,16 +25,6 @@ impl Default for AmqpConfig { } } -#[derive(Debug)] -struct SmolExecutor; - -impl lapin::executor::Executor for SmolExecutor { - fn spawn(&self, f: std::pin::Pin + Send>>) -> lapin::Result<()> { - smol::spawn(f).detach(); - Ok(()) - } -} - impl AmqpConfig { fn connection_string(&self) -> String { let mut user = String::default(); @@ -66,7 +56,7 @@ impl AmqpConfig { pub async fn connect(&self) -> Result<(lapin::Connection, lapin::Channel), lapin::Error> { let addr = self.connection_string(); info!("Connecting to {}", addr); - let conn = lapin::Connection::connect(&addr, lapin::ConnectionProperties::default().with_async_io(SmolExecutor)).await?; + let conn = lapin::Connection::connect(&addr, lapin::ConnectionProperties::default().with_tokio()).await?; let channel = conn.create_channel().await?; Ok((conn, channel)) } diff --git a/src/sinks/amqp.rs b/src/sinks/amqp.rs index 00df1e8032ba78..2006249c30643b 100644 --- a/src/sinks/amqp.rs +++ b/src/sinks/amqp.rs @@ -532,7 +532,7 @@ mod integration_test { out.push(s); } else { failures += 1; - tokio::time::delay_for(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; } } diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index e1d8641f6ccb0d..4b010dac9e0820 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -1,5 +1,5 @@ use crate::{ - config::{log_schema, DataType, GlobalOptions, SourceConfig, SourceDescription}, + config::{log_schema, DataType, SourceConfig, SourceDescription}, event::{Event, Value}, internal_events::{AmqpEventFailed, AmqpEventReceived, AmqpCommitFailed, AmqpConsumerFailed}, amqp::AmqpConfig, @@ -12,6 +12,7 @@ use futures::{FutureExt, SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use snafu::Snafu; use lapin::{Connection, Channel}; +use crate::config::SourceContext; #[derive(Debug, Snafu)] enum BuildError { @@ -56,12 +57,9 @@ impl_generate_config_from_default!(AmqpSourceConfig); impl SourceConfig for AmqpSourceConfig { async fn build( &self, - _name: &str, - _globals: &GlobalOptions, - shutdown: ShutdownSignal, - out: Pipeline, + cx: SourceContext, ) -> crate::Result { - amqp_source(self, shutdown, out).await + amqp_source(self, cx.shutdown, cx.out).await } fn output_type(&self) -> DataType { @@ -252,8 +250,8 @@ mod integration_test { let mut config = make_config(); config.consumer = consumer; config.queue = queue; - config.key_field = Some("message_key".to_string()); - config.topic_key = Some("exchange".to_string()); + config.routing_key = Some("message_key".to_string()); + config.exchange_key = Some("exchange".to_string()); let (_conn, channel) = config.connection.connect().await.unwrap(); let mut exchange_opts = lapin::options::ExchangeDeclareOptions::default();