diff --git a/Cargo.toml b/Cargo.toml index 58e6301b..d6dc16c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,63 +16,63 @@ description = "Rust client for Apache Pulsar" keywords = ["pulsar", "api", "client"] [dependencies] -async-channel = "2" -bytes = "^1.4.0" -crc = "^3.0.1" -nom = { version="^7.1.3", default-features=false, features=["alloc"] } +async-channel = "^2.3.1" +async-trait = "^0.1.81" +async-std = { version = "^1.12.0", features = ["attributes", "unstable"], optional = true } +async-native-tls = { version = "^0.5.0", optional = true } +asynchronous-codec = { version = "^0.7.0", optional = true } +bytes = "^1.6.1" +bit-vec = "^0.8.0" +chrono = { version = "^0.4.38", default-features = false, features = ["clock", "std"] } +crc = "^3.2.1" +data-url = { version = "^0.3.1", optional = true } +flate2 = { version = "^1.0.30", optional = true } +futures = "^0.3.30" +futures-io = "^0.3.30" +futures-timer = "^3.0.3" +futures-rustls = { version = "^0.26.0", optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls) +log = "^0.4.22" +lz4 = { version = "^1.26.0", optional = true } +native-tls = { version = "^0.2.12", optional = true } +nom = { version = "^7.1.3", default-features = false, features = ["alloc"] } +openidconnect = { version = "^3.5.0", optional = true } +oauth2 = { version = "^4.4.1", optional = true } +pem = "^3.0.4" prost = "^0.13.1" prost-derive = "^0.13.1" rand = "^0.8.5" -chrono = { version = "^0.4.26", default-features = false, features = ["clock", "std"] } -futures-timer = "^3.0.2" -log = "^0.4.19" -url = "^2.4.0" -regex = "^1.9.1" -bit-vec = "^0.6.3" -futures = "^0.3.28" -futures-io = "^0.3.28" -native-tls = { version = "^0.2.11", optional = true } -rustls = { version = "^0.21.6", optional = true } -webpki-roots = { version = "^0.25.1", optional = true } -pem = "^3.0.0" -tokio = { version = "^1.29.1", features = ["rt", "net", "time"], optional = true } -tokio-util = { version = "^0.7.8", features = ["codec"], optional = true } -tokio-rustls = { version = "^0.24.1", optional = true } +regex = "^1.10.5" +rustls = { version = "^0.23.12", optional = true } +snap = { version = "^1.1.1", optional = true } +serde = { version = "^1.0.204", features = ["derive"], optional = true } +serde_json = { version = "^1.0.121", optional = true } +tokio = { version = "^1.39.2", features = ["rt", "net", "time"], optional = true } +tokio-util = { version = "^0.7.11", features = ["codec"], optional = true } +tokio-rustls = { version = "^0.26.0", optional = true } tokio-native-tls = { version = "^0.3.1", optional = true } -async-std = { version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true } -asynchronous-codec = { version = "^0.6.2", optional = true } -async-rustls = { version = "^0.4.0", optional = true } -async-native-tls = { version = "^0.5.0", optional = true } -lz4 = { version = "^1.24.0", optional = true } -flate2 = { version = "^1.0.26", optional = true } -zstd = { version = "^0.12.4", optional = true } -snap = { version = "^1.1.0", optional = true } -openidconnect = { version = "^3.3.0", optional = true } -oauth2 = { version = "^4.4.1", optional = true } -serde = { version = "^1.0.175", features = ["derive"], optional = true } -serde_json = { version = "^1.0.103", optional = true } -tracing = { version = "^0.1.37", optional = true } -async-trait = "^0.1.72" -data-url = { version = "^0.3.0", optional = true } -uuid = { version = "^1.4.1", features = ["v4", "fast-rng"] } +tracing = { version = "^0.1.40", optional = true } +url = "^2.5.2" +uuid = { version = "^1.10.0", features = ["v4", "fast-rng"] } +webpki-roots = { version = "^0.26.3", optional = true } +zstd = { version = "^0.13.2", optional = true } [dev-dependencies] -serde = { version = "^1.0.175", features = ["derive"] } -serde_json = "^1.0.103" -env_logger = "^0.10.0" -tokio = { version = "^1.29.1", features = ["macros", "rt-multi-thread"] } +env_logger = "^0.11.5" +serde = { version = "^1.0.204", features = ["derive"] } +serde_json = "^1.0.121" +tokio = { version = "^1.39.2", features = ["macros", "rt-multi-thread"] } [build-dependencies] prost-build = "^0.13.1" -protobuf-src = { version = "1.1.0", optional = true } +protobuf-src = { version = "^2.1.0", optional = true } [features] -default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2" ] -compression = [ "lz4", "flate2", "zstd", "snap" ] -tokio-runtime = [ "tokio", "tokio-util", "native-tls", "tokio-native-tls" ] -tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots" ] -async-std-runtime = [ "async-std", "asynchronous-codec", "native-tls", "async-native-tls" ] -async-std-rustls-runtime = ["async-std", "asynchronous-codec", "async-rustls", "rustls", "webpki-roots" ] -auth-oauth2 = [ "openidconnect", "oauth2", "serde", "serde_json", "data-url" ] -telemetry = ["tracing"] +async-std-runtime = ["async-std", "asynchronous-codec", "native-tls", "async-native-tls"] +async-std-rustls-runtime = ["async-std", "asynchronous-codec", "futures-rustls", "rustls", "webpki-roots"] +auth-oauth2 = ["openidconnect", "oauth2", "serde", "serde_json", "data-url"] +compression = ["lz4", "flate2", "zstd", "snap"] +default = ["compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"] protobuf-src = ["dep:protobuf-src"] +telemetry = ["tracing"] +tokio-runtime = ["tokio", "tokio-util", "native-tls", "tokio-native-tls"] +tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots"] diff --git a/examples/batching.rs b/examples/batching.rs index 4cf3abc3..3aa5d9ff 100644 --- a/examples/batching.rs +++ b/examples/batching.rs @@ -65,7 +65,7 @@ async fn main() -> Result<(), pulsar::Error> { loop { println!("will send"); let receipt_rx = producer - .send(TestData { + .send_non_blocking(TestData { data: "data".to_string(), }) .await diff --git a/examples/producer.rs b/examples/producer.rs index 4b6cbf25..3305c270 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -67,7 +67,7 @@ async fn main() -> Result<(), pulsar::Error> { let mut counter = 0usize; loop { producer - .send(TestData { + .send_non_blocking(TestData { data: "data".to_string(), }) .await? diff --git a/examples/round_trip.rs b/examples/round_trip.rs index 85b7eaf7..f8c17aaa 100644 --- a/examples/round_trip.rs +++ b/examples/round_trip.rs @@ -55,7 +55,7 @@ async fn main() -> Result<(), pulsar::Error> { let mut counter = 0usize; loop { producer - .send(TestData { + .send_non_blocking(TestData { data: "data".to_string(), }) .await diff --git a/src/connection.rs b/src/connection.rs index c2a63a63..fad471b5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -20,15 +20,8 @@ use futures::{ task::{Context, Poll}, Future, FutureExt, Sink, SinkExt, Stream, StreamExt, }; -#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] -use native_tls::Certificate; use proto::MessageIdData; use rand::{seq::SliceRandom, thread_rng}; -#[cfg(all( - any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"), - not(any(feature = "tokio-runtime", feature = "async-std-runtime")) -))] -use rustls::Certificate; use url::Url; use uuid::Uuid; @@ -41,6 +34,7 @@ use crate::{ BaseCommand, Codec, Message, }, producer::{self, ProducerOptions}, + Certificate, }; pub(crate) enum Register { @@ -983,35 +977,22 @@ impl Connection { if tls { let stream = tokio::net::TcpStream::connect(&address).await?; let mut root_store = rustls::RootCertStore::empty(); + + root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); for certificate in certificate_chain { - root_store.add(certificate)?; + root_store.add(certificate.clone())?; } - let trust_anchors = webpki_roots::TLS_SERVER_ROOTS.iter().fold( - vec![], - |mut acc, trust_anchor| { - acc.push( - rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( - trust_anchor.subject, - trust_anchor.spki, - trust_anchor.name_constraints, - ), - ); - acc - }, - ); - - root_store.add_trust_anchors(trust_anchors.into_iter()); let config = rustls::ClientConfig::builder() - .with_safe_default_cipher_suites() - .with_safe_default_kx_groups() - .with_safe_default_protocol_versions()? .with_root_certificates(root_store) .with_no_client_auth(); let cx = tokio_rustls::TlsConnector::from(Arc::new(config)); let stream = cx - .connect(rustls::ServerName::try_from(hostname.as_str())?, stream) + .connect( + rustls::pki_types::ServerName::try_from(hostname.as_str())?.to_owned(), + stream, + ) .await .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?; @@ -1099,35 +1080,22 @@ impl Connection { if tls { let stream = async_std::net::TcpStream::connect(&address).await?; let mut root_store = rustls::RootCertStore::empty(); + + root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); for certificate in certificate_chain { - root_store.add(certificate)?; + root_store.add(certificate.clone())?; } - let trust_anchors = webpki_roots::TLS_SERVER_ROOTS.iter().fold( - vec![], - |mut acc, trust_anchor| { - acc.push( - rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( - trust_anchor.subject, - trust_anchor.spki, - trust_anchor.name_constraints, - ), - ); - acc - }, - ); - - root_store.add_trust_anchors(trust_anchors.into_iter()); let config = rustls::ClientConfig::builder() - .with_safe_default_cipher_suites() - .with_safe_default_kx_groups() - .with_safe_default_protocol_versions()? .with_root_certificates(root_store) .with_no_client_auth(); - let connector = async_rustls::TlsConnector::from(Arc::new(config)); + let connector = futures_rustls::TlsConnector::from(Arc::new(config)); let stream = connector - .connect(rustls::ServerName::try_from(hostname.as_str())?, stream) + .connect( + rustls::pki_types::ServerName::try_from(hostname.as_str())?.to_owned(), + stream, + ) .await .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?; diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 12ab4895..6dcb1a9b 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -1,17 +1,10 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{channel::oneshot, lock::Mutex}; -#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] -use native_tls::Certificate; use rand::Rng; -#[cfg(all( - any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"), - not(any(feature = "tokio-runtime", feature = "async-std-runtime")) -))] -use rustls::Certificate; use url::Url; -use crate::{connection::Connection, error::ConnectionError, executor::Executor}; +use crate::{connection::Connection, error::ConnectionError, executor::Executor, Certificate}; /// holds connection information for a broker #[derive(Debug, Clone, Hash, PartialEq, Eq)] @@ -156,11 +149,10 @@ impl ConnectionManager { None => vec![], Some(certificate_chain) => { let mut v = vec![]; - for cert in pem::parse_many(certificate_chain) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))? - .iter() - .rev() - { + let certificates = pem::parse_many(certificate_chain) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + for cert in certificates.iter().rev() { #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] v.push( Certificate::from_der(cert.contents()) @@ -174,7 +166,7 @@ impl ConnectionManager { ), not(any(feature = "tokio-runtime", feature = "async-std-runtime")) ))] - v.push(Certificate(cert.contents().to_vec())); + v.push(Certificate::from(cert.contents().to_vec())); } v } diff --git a/src/error.rs b/src/error.rs index 02c76324..a5b2535e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -97,7 +97,7 @@ pub enum ConnectionError { ))] Tls(rustls::Error), #[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] - DnsName(rustls::client::InvalidDnsNameError), + DnsName(rustls::pki_types::InvalidDnsNameError), Authentication(AuthenticationError), NotFound, Canceled, @@ -142,9 +142,9 @@ impl From for ConnectionError { } #[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] -impl From for ConnectionError { +impl From for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - fn from(err: rustls::client::InvalidDnsNameError) -> Self { + fn from(err: rustls::pki_types::InvalidDnsNameError) -> Self { ConnectionError::DnsName(err) } } diff --git a/src/lib.rs b/src/lib.rs index 5ab374f7..53d473c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -197,6 +197,14 @@ pub mod reader; mod retry_op; mod service_discovery; +#[cfg(all( + any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) +))] +pub(crate) type Certificate = rustls::pki_types::CertificateDer<'static>; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] +pub(crate) type Certificate = native_tls::Certificate; + #[cfg(test)] mod tests { use std::{ diff --git a/src/message.rs b/src/message.rs index c71c4ba1..2c9fd8ec 100644 --- a/src/message.rs +++ b/src/message.rs @@ -327,7 +327,7 @@ impl tokio_util::codec::Decoder for Codec { #[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] impl asynchronous_codec::Encoder for Codec { - type Item = Message; + type Item<'a> = Message; type Error = ConnectionError; #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] diff --git a/src/producer.rs b/src/producer.rs index 818e8543..d73cf9a9 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -162,8 +162,8 @@ pub struct ProducerOptions { /// # let message = "data".to_owned(); /// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?; /// let mut producer = pulsar.producer().with_name("name").build_multi_topic(); -/// let send_1 = producer.send(topic, &message).await?; -/// let send_2 = producer.send(topic, &message).await?; +/// let send_1 = producer.send_non_blocking(topic, &message).await?; +/// let send_2 = producer.send_non_blocking(topic, &message).await?; /// send_1.await?; /// send_2.await?; /// # Ok(()) @@ -398,8 +398,8 @@ impl Producer { /// /// ```rust,no_run /// # async fn run(mut producer: pulsar::Producer) -> Result<(), pulsar::Error> { - /// let f1 = producer.send("hello").await?; - /// let f2 = producer.send("world").await?; + /// let f1 = producer.send_non_blocking("hello").await?; + /// let f2 = producer.send_non_blocking("world").await?; /// let receipt1 = f1.await?; /// let receipt2 = f2.await?; /// # Ok(())