From ee83c9c43aa0a613f4a8cc7b990e8aabc5d97ca0 Mon Sep 17 00:00:00 2001 From: Millione Date: Thu, 11 Apr 2024 17:13:20 +0800 Subject: [PATCH 1/9] feat: refactor `MakeTransport` and `Incoming` --- Cargo.lock | 50 +++++----- Cargo.toml | 1 + volo-grpc/src/server/mod.rs | 2 + volo-http/src/server/mod.rs | 11 ++- volo-thrift/src/client/mod.rs | 35 ++++++- volo-thrift/src/codec/mod.rs | 7 +- volo-thrift/src/message_wrapper.rs | 11 +-- volo-thrift/src/server/mod.rs | 33 +++---- volo-thrift/src/transport/multiplex/client.rs | 37 +++++--- .../transport/multiplex/thrift_transport.rs | 9 +- volo-thrift/src/transport/pingpong/client.rs | 30 +++--- volo-thrift/src/transport/pingpong/mod.rs | 1 + volo-thrift/src/transport/pingpong/server.rs | 13 ++- .../transport/pingpong/thrift_transport.rs | 23 ++++- .../src/transport/pool/make_transport.rs | 19 +++- volo-thrift/src/transport/pool/mod.rs | 91 +++++++++++++------ volo/Cargo.toml | 2 + volo/src/context.rs | 14 +++ volo/src/net/conn.rs | 50 +++++++++- volo/src/net/dial.rs | 23 ++--- volo/src/net/incoming.rs | 11 ++- volo/src/net/mod.rs | 1 + volo/src/net/shm.rs | 10 ++ volo/src/net/tls/mod.rs | 14 +-- 24 files changed, 346 insertions(+), 152 deletions(-) create mode 100644 volo/src/net/shm.rs diff --git a/Cargo.lock b/Cargo.lock index 3222bf84..00b404d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,9 +180,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "axum" -version = "0.7.5" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +checksum = "8f43644eed690f5374f1af436ecd6aea01cd201f6fbdf0178adaf6907afb2cec" dependencies = [ "async-trait", "axum-core", @@ -200,16 +200,16 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 1.0.1", - "tower 0.4.13", + "tower 0.5.1", "tower-layer", "tower-service", ] [[package]] name = "axum-core" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +checksum = "5e6b8ba012a258d63c9adfa28b9ddcf66149da6f986c5b5452e629d5ee64bf00" dependencies = [ "async-trait", "bytes", @@ -220,7 +220,7 @@ dependencies = [ "mime", "pin-project-lite", "rustversion", - "sync_wrapper 0.1.2", + "sync_wrapper 1.0.1", "tower-layer", "tower-service", ] @@ -355,9 +355,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.17" +version = "4.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e5a21b8495e732f1b3c364c9949b201ca7bae518c502c80256c96ad79eaf6ac" +checksum = "b0956a43b323ac1afaffc053ed5c4b7c1f1800bacd1683c353aabbb752515dd3" dependencies = [ "clap_builder", "clap_derive", @@ -365,9 +365,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.17" +version = "4.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cf2dd12af7a047ad9d6da2b6b249759a22a7abc0f474c1dae1777afa4b21a73" +checksum = "4d72166dd41634086d5803a47eb71ae740e61d84709c36f3c34110173db3961b" dependencies = [ "anstream", "anstyle", @@ -378,9 +378,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.13" +version = "4.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" +checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2123,9 +2123,9 @@ checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" [[package]] name = "portable-atomic" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" +checksum = "d30538d42559de6b034bc76fd6dd4c38961b1ee5c6c56e3808c50128fdbc22ce" [[package]] name = "powerfmt" @@ -2198,9 +2198,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.2" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b2ecbe40f08db5c006b5764a2645f7f3f141ce756412ac9e1dd6087e6d32995" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" dependencies = [ "bytes", ] @@ -2697,9 +2697,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.1" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" +checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" dependencies = [ "core-foundation-sys", "libc", @@ -2814,9 +2814,9 @@ dependencies = [ [[package]] name = "simdutf8" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" [[package]] name = "siphasher" @@ -3004,18 +3004,18 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "thiserror" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" +checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" +checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", @@ -3562,7 +3562,9 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" name = "volo" version = "0.10.3" dependencies = [ + "anyhow", "async-broadcast", + "async-trait", "dashmap 6.1.0", "faststr", "futures", diff --git a/Cargo.toml b/Cargo.toml index 6886bb77..8128fb28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ ahash = "0.8" anyhow = "1" async-broadcast = "0.7" async-stream = "0.3" +async-trait = "0.1" base64 = "0.22" bytes = "1" chrono = { version = "0.4", default-features = false, features = [ diff --git a/volo-grpc/src/server/mod.rs b/volo-grpc/src/server/mod.rs index 0f868966..8f44ea70 100644 --- a/volo-grpc/src/server/mod.rs +++ b/volo-grpc/src/server/mod.rs @@ -307,6 +307,7 @@ impl Server { >>::Future: Send + 'static, >>::Error: Into + Send + Sync + std::error::Error, + A::Incoming: Incoming, { let mut incoming = incoming.make_incoming().await?; tracing::info!("[VOLO] server start at: {:?}", incoming); @@ -434,6 +435,7 @@ impl Server { >>::Future: Send + 'static, >>::Error: Into + Send + Sync + std::error::Error, + A::Incoming: Incoming, { self.run_with_shutdown(incoming, tokio::signal::ctrl_c()) .await diff --git a/volo-http/src/server/mod.rs b/volo-http/src/server/mod.rs index b8cbddbd..d9a3555e 100644 --- a/volo-http/src/server/mod.rs +++ b/volo-http/src/server/mod.rs @@ -28,7 +28,11 @@ use tokio::sync::Notify; use volo::net::{conn::ConnStream, tls::Acceptor, tls::ServerTlsConfig}; use volo::{ context::Context, - net::{conn::Conn, incoming::Incoming, Address, MakeIncoming}, + net::{ + conn::{Conn, ConnExt}, + incoming::Incoming, + Address, MakeIncoming, + }, }; use crate::{ @@ -270,6 +274,7 @@ impl Server { Service + Send + Sync + 'static, >::Response: IntoResponse, MI: MakeIncoming, + MI::Incoming: Incoming, { let server = Arc::new(self.server); let service = Arc::new(self.layer.layer(self.service)); @@ -367,7 +372,7 @@ async fn serve( exit_notify: Arc, #[cfg(feature = "__tls")] tls_config: Option, ) where - I: Incoming, + I: Incoming, S: Service + Clone + Send + Sync + 'static, S::Response: IntoResponse, E: IntoResponse, @@ -399,7 +404,7 @@ async fn serve( } }; - let peer = match conn.info.peer_addr { + let peer = match conn.peer_addr() { Some(ref peer) => { tracing::trace!("accept connection from: {peer:?}"); peer.clone() diff --git a/volo-thrift/src/client/mod.rs b/volo-thrift/src/client/mod.rs index 20f56189..73741ebd 100644 --- a/volo-thrift/src/client/mod.rs +++ b/volo-thrift/src/client/mod.rs @@ -23,6 +23,7 @@ use volo::{ discovery::{Discover, DummyDiscover}, loadbalance::{random::WeightedRandomBalance, LbConfig, MkLbLayer}, net::{ + conn::ConnExt, dial::{DefaultMakeTransport, MakeTransport}, Address, }, @@ -52,6 +53,7 @@ pub struct ClientBuilder { callee_name: FastStr, caller_name: FastStr, address: Option
, // maybe address use Arc avoid memory alloc + shmipc_address: Option
, inner_layer: IL, outer_layer: OL, make_transport: MkT, @@ -86,6 +88,7 @@ impl caller_name: "".into(), callee_name: FastStr::new(service_name), address: None, + shmipc_address: None, inner_layer: Identity::new(), outer_layer: Identity::new(), mk_client: service_client, @@ -116,6 +119,7 @@ impl caller_name: self.caller_name, callee_name: self.callee_name, address: self.address, + shmipc_address: self.shmipc_address, inner_layer: self.inner_layer, outer_layer: self.outer_layer, mk_client: self.mk_client, @@ -142,6 +146,7 @@ impl caller_name: self.caller_name, callee_name: self.callee_name, address: self.address, + shmipc_address: self.shmipc_address, inner_layer: self.inner_layer, outer_layer: self.outer_layer, mk_client: self.mk_client, @@ -226,6 +231,7 @@ impl ClientBuilder ClientBuilder ClientBuilder ClientBuilder>(mut self, target: A) -> Self { + self.shmipc_address = Some(target.into()); + self + } + /// Adds a new inner layer to the client. /// /// The layer's `Service` should be `Send + Sync + Clone + 'static`. @@ -337,6 +351,7 @@ impl ClientBuilder ClientBuilder ClientBuilder ClientBuilder ClientBuilder where Resp: EntryMessage + Send + 'static, MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, { #[cfg(not(feature = "multiplex"))] inner: pingpong::Client, @@ -523,7 +542,7 @@ where Req: EntryMessage + 'static + Send, Resp: Send + 'static + EntryMessage + Sync, MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, { type Response = Option; @@ -576,7 +595,7 @@ where Service> + Sync + Clone + Send + 'static, >::Error: Send + Into, MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, OL: Layer, ClientError>>, OL::Service: Service> + 'static + Send + Clone + Sync, @@ -628,6 +647,7 @@ where callee_name: self.callee_name, config: self.config, address: self.address, + shmipc_address: self.shmipc_address, caller_name: self.caller_name, seq_id: AtomicI32::new(0), }), @@ -654,6 +674,7 @@ struct ClientInner { caller_name: FastStr, config: Config, address: Option
, + shmipc_address: Option
, seq_id: AtomicI32, } @@ -686,6 +707,11 @@ impl Client { if let Some(target) = &self.inner.address { cx.rpc_info_mut().callee_mut().set_address(target.clone()); } + if let Some(shmipc_address) = &self.inner.shmipc_address { + cx.rpc_info_mut() + .callee_mut() + .set_shmipc_address(shmipc_address.clone()); + } cx.rpc_info_mut().set_config(self.inner.config); cx.rpc_info_mut().set_method(FastStr::new(method)); cx @@ -712,6 +738,9 @@ impl Client { if let Some(target) = &self.inner.address { callee.set_address(target.clone()); } + if let Some(shmipc_address) = &self.inner.shmipc_address { + callee.set_shmipc_address(shmipc_address.clone()); + } let config = self.inner.config; RpcInfo::new(Role::Client, FastStr::new(method), caller, callee, config) diff --git a/volo-thrift/src/codec/mod.rs b/volo-thrift/src/codec/mod.rs index 6f191b1b..7c8923c9 100644 --- a/volo-thrift/src/codec/mod.rs +++ b/volo-thrift/src/codec/mod.rs @@ -1,7 +1,6 @@ use std::future::Future; use pilota::thrift::ThriftException; -use tokio::io::{AsyncRead, AsyncWrite}; use crate::{context::ThriftContext, EntryMessage, ThriftMessage}; @@ -40,7 +39,7 @@ pub trait Encoder: Send + Sync + 'static { } } -/// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a +/// [`MakeCodec`] receives an [`R`] and an [`W`] and returns a /// [`Decoder`] and an [`Encoder`]. /// /// The implementation of [`MakeCodec`] must make sure the [`Decoder`] and [`Encoder`] @@ -52,8 +51,8 @@ pub trait Encoder: Send + Sync + 'static { /// The reason why we split the [`Decoder`] and [`Encoder`] is that we want to support multiplex. pub trait MakeCodec: Clone + Send + 'static where - R: AsyncRead + Unpin + Send + Sync + 'static, - W: AsyncWrite + Unpin + Send + Sync + 'static, + R: Unpin + Send + Sync + 'static, + W: Unpin + Send + Sync + 'static, { type Encoder: Encoder; type Decoder: Decoder; diff --git a/volo-thrift/src/message_wrapper.rs b/volo-thrift/src/message_wrapper.rs index 053892f2..27c3ff0c 100644 --- a/volo-thrift/src/message_wrapper.rs +++ b/volo-thrift/src/message_wrapper.rs @@ -85,7 +85,7 @@ where U: EntryMessage, { #[inline] - pub(crate) fn size(&self, protocol: &mut T) -> usize { + pub fn size(&self, protocol: &mut T) -> usize { let ident = TMessageIdentifier::new( self.meta.method.clone(), self.meta.msg_type, @@ -112,10 +112,7 @@ where U: EntryMessage + Send, { #[inline] - pub(crate) fn encode( - &self, - protocol: &mut T, - ) -> Result<(), ThriftException> { + pub fn encode(&self, protocol: &mut T) -> Result<(), ThriftException> { let ident = TMessageIdentifier::new( self.meta.method.clone(), self.meta.msg_type, @@ -136,7 +133,7 @@ where } #[inline] - pub(crate) fn decode( + pub fn decode( protocol: &mut T, cx: &mut Cx, ) -> Result { @@ -160,7 +157,7 @@ where } #[inline] - pub(crate) async fn decode_async( + pub async fn decode_async( protocol: &mut T, cx: &mut Cx, ) -> Result { diff --git a/volo-thrift/src/server/mod.rs b/volo-thrift/src/server/mod.rs index cdf4925d..0c8c1cb1 100644 --- a/volo-thrift/src/server/mod.rs +++ b/volo-thrift/src/server/mod.rs @@ -11,17 +11,10 @@ use motore::{ BoxError, }; use scopeguard::defer; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::Notify, -}; +use tokio::sync::Notify; use tracing::{info, trace}; use volo::{ - net::{ - conn::{OwnedReadHalf, OwnedWriteHalf}, - incoming::Incoming, - Address, - }, + net::{conn::ConnExt, incoming::Incoming, shm::ShmExt, Address}, service::BoxService, }; @@ -181,7 +174,10 @@ impl Server { ) -> Result<(), BoxError> where L: Layer>, - MkC: MakeCodec, + MkC: MakeCodec< + <::Conn as ConnExt>::ReadHalf, + <::Conn as ConnExt>::WriteHalf, + >, L::Service: Service + Send + 'static @@ -222,9 +218,10 @@ impl Server { } match incoming.accept().await { Ok(Some(conn)) => { - let peer_addr = conn.info.peer_addr; + let peer_addr = conn.peer_addr(); trace!("[VOLO] accept connection from: {:?}", peer_addr); - let (rh, wh) = conn.stream.into_split(); + let inner = conn.inner(); + let (rh, wh) = conn.into_split(); #[cfg(feature = "multiplex")] if self.multiplex { @@ -251,6 +248,7 @@ impl Server { conn_cnt.clone(), peer_addr, self.span_provider.clone(), + inner, )); } #[cfg(not(feature = "multiplex"))] @@ -265,6 +263,7 @@ impl Server { conn_cnt.clone(), peer_addr, self.span_provider.clone(), + inner, )); } // no more incoming connections @@ -399,9 +398,10 @@ async fn handle_conn( conn_cnt: Arc, peer_addr: Option
, span_provider: SP, + stream: Option>, ) where - R: AsyncRead + Unpin + Send + Sync + 'static, - W: AsyncWrite + Unpin + Send + Sync + 'static, + R: Unpin + Send + Sync + 'static, + W: Unpin + Send + Sync + 'static, Svc: Service + Clone + Send + 'static, Svc::Error: Send, Svc::Error: Into, @@ -430,6 +430,7 @@ async fn handle_conn( stat_tracer, peer_addr, span_provider, + stream, ) .await; } @@ -447,8 +448,8 @@ async fn handle_conn_multiplex( conn_cnt: Arc, peer_addr: Option
, ) where - R: AsyncRead + Unpin + Send + Sync + 'static, - W: AsyncWrite + Unpin + Send + Sync + 'static, + R: Unpin + Send + Sync + 'static, + W: Unpin + Send + Sync + 'static, Svc: Service + Clone + Send + 'static + Sync, Svc::Error: Into + Send, Req: EntryMessage + Send + 'static, diff --git a/volo-thrift/src/transport/multiplex/client.rs b/volo-thrift/src/transport/multiplex/client.rs index dd07fe86..b8620a93 100644 --- a/volo-thrift/src/transport/multiplex/client.rs +++ b/volo-thrift/src/transport/multiplex/client.rs @@ -1,7 +1,7 @@ use std::{io, marker::PhantomData}; use motore::service::{Service, UnaryService}; -use volo::net::{dial::MakeTransport, Address}; +use volo::net::{conn::ConnExt, dial::MakeTransport, Address}; use crate::{ codec::MakeCodec, @@ -9,7 +9,7 @@ use crate::{ protocol::TMessageType, transport::{ multiplex::thrift_transport::ThriftTransport, - pool::{Config, PooledMakeTransport, Ver}, + pool::{Config, PooledMakeTransport, Transport, Ver}, }, ClientError, EntryMessage, ThriftMessage, }; @@ -17,15 +17,18 @@ use crate::{ pub struct MakeClientTransport where MkT: MakeTransport, - MkC: MakeCodec, + MkC: MakeCodec<::ReadHalf, ::WriteHalf>, { make_transport: MkT, make_codec: MkC, _phantom: PhantomData Resp>, } -impl, Resp> Clone - for MakeClientTransport +impl< + MkT: MakeTransport, + MkC: MakeCodec<::ReadHalf, ::WriteHalf>, + Resp, + > Clone for MakeClientTransport { fn clone(&self) -> Self { Self { @@ -39,7 +42,7 @@ impl, Resp> Cl impl MakeClientTransport where MkT: MakeTransport, - MkC: MakeCodec, + MkC: MakeCodec<::ReadHalf, ::WriteHalf>, { #[allow(unused)] pub fn new(make_transport: MkT, make_codec: MkC) -> Self { @@ -54,7 +57,7 @@ where impl UnaryService
for MakeClientTransport where MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, Resp: EntryMessage + Send + 'static, { type Response = ThriftTransport; @@ -62,7 +65,8 @@ where async fn call(&self, target: Address) -> Result { let make_transport = self.make_transport.clone(); - let (rh, wh) = make_transport.make_transport(target.clone()).await?; + let conn = make_transport.make_transport(target.clone()).await?; + let (rh, wh) = conn.into_split(); Ok(ThriftTransport::new( rh, wh, @@ -75,7 +79,7 @@ where pub struct Client where MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, Resp: EntryMessage + Send + 'static, { #[allow(clippy::type_complexity)] @@ -86,7 +90,7 @@ where impl Clone for Client where MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, Resp: EntryMessage + Send + 'static, { fn clone(&self) -> Self { @@ -100,7 +104,7 @@ where impl Client where MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, Resp: EntryMessage + Send + 'static, { pub fn new(make_transport: MkT, pool_cfg: Option, make_codec: MkC) -> Self { @@ -118,7 +122,7 @@ where Req: Send + 'static + EntryMessage, Resp: EntryMessage + Send + 'static + Sync, MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, { type Response = Option>; @@ -136,7 +140,10 @@ where })?; let oneway = cx.message_type == TMessageType::OneWay; cx.stats.record_make_transport_start_at(); - let transport = self.make_transport.call((target, Ver::Multiplex)).await?; + let transport = self + .make_transport + .call((target, None, Ver::Multiplex)) + .await?; cx.stats.record_make_transport_end_at(); let resp = transport.send(cx, req, oneway).await; if let Ok(None) = resp { @@ -150,7 +157,9 @@ where } } if cx.transport.should_reuse && resp.is_ok() { - transport.reuse().await; + if let Transport::Pooled(pooled) = transport { + pooled.reuse(); + } } resp } diff --git a/volo-thrift/src/transport/multiplex/thrift_transport.rs b/volo-thrift/src/transport/multiplex/thrift_transport.rs index c344c00c..d1d7b455 100644 --- a/volo-thrift/src/transport/multiplex/thrift_transport.rs +++ b/volo-thrift/src/transport/multiplex/thrift_transport.rs @@ -9,10 +9,7 @@ use std::{ use metainfo::MetaInfo; use pilota::thrift::{ApplicationException, ApplicationExceptionKind}; use pin_project::pin_project; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::{oneshot, Mutex}, -}; +use tokio::sync::{oneshot, Mutex}; use volo::{ context::{Role, RpcInfo}, net::Address, @@ -67,8 +64,8 @@ where E: Encoder, { pub fn new< - R: AsyncRead + Send + Sync + Unpin + 'static, - W: AsyncWrite + Send + Sync + Unpin + 'static, + R: Send + Sync + Unpin + 'static, + W: Send + Sync + Unpin + 'static, MkC: MakeCodec, >( read_half: R, diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index a3c942be..a5f7cb55 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -2,7 +2,7 @@ use std::{io, marker::PhantomData}; use motore::service::{Service, UnaryService}; use pilota::thrift::TransportException; -use volo::net::{dial::MakeTransport, Address}; +use volo::net::{conn::ConnExt, dial::MakeTransport, Address}; use crate::{ codec::MakeCodec, @@ -19,7 +19,7 @@ use crate::{ pub struct MakeClientTransport where MkT: MakeTransport, - MkC: MakeCodec, + MkC: MakeCodec<::ReadHalf, ::WriteHalf>, { make_transport: MkT, make_codec: MkC, @@ -28,7 +28,7 @@ where impl MakeClientTransport where MkT: MakeTransport, - MkC: MakeCodec, + MkC: MakeCodec<::ReadHalf, ::WriteHalf>, { #[allow(unused)] #[inline] @@ -43,7 +43,7 @@ where impl UnaryService
for MakeClientTransport where MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, { type Response = ThriftTransport; type Error = io::Error; @@ -51,15 +51,17 @@ where #[inline] async fn call(&self, target: Address) -> Result { let make_transport = self.make_transport.clone(); - let (rh, wh) = make_transport.make_transport(target).await?; - Ok(ThriftTransport::new(rh, wh, self.make_codec.clone())) + let conn = make_transport.make_transport(target).await?; + let inner = conn.inner(); + let (rh, wh) = conn.into_split(); + Ok(ThriftTransport::new(rh, wh, self.make_codec.clone(), inner)) } } pub struct Client where MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, { #[allow(clippy::type_complexity)] make_transport: PooledMakeTransport, Address>, @@ -69,7 +71,7 @@ where impl Clone for Client where MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, { fn clone(&self) -> Self { Self { @@ -82,7 +84,7 @@ where impl Client where MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, { pub fn new(make_transport: MkT, pool_cfg: Option, make_codec: MkC) -> Self { let make_transport = MakeClientTransport::new(make_transport, make_codec); @@ -99,7 +101,7 @@ where Req: Send + 'static + EntryMessage, Resp: EntryMessage + Sync, MkT: MakeTransport, - MkC: MakeCodec + Sync, + MkC: MakeCodec<::ReadHalf, ::WriteHalf> + Sync, { type Response = Option>; @@ -118,9 +120,13 @@ where format!("address is required, rpc_info: {:?}", rpc_info), )) })?; + let shmipc_target = rpc_info.callee().shmipc_address(); let oneway = cx.message_type == TMessageType::OneWay; cx.stats.record_make_transport_start_at(); - let mut transport = self.make_transport.call((target, Ver::PingPong)).await?; + let mut transport = self + .make_transport + .call((target, shmipc_target.clone(), Ver::PingPong)) + .await?; cx.stats.record_make_transport_end_at(); let resp = transport.send(cx, req, oneway).await; if let Ok(None) = resp { @@ -138,6 +144,8 @@ where } if cx.transport.should_reuse && resp.is_ok() { transport.reuse().await; + } else { + transport.close().await; } resp } diff --git a/volo-thrift/src/transport/pingpong/mod.rs b/volo-thrift/src/transport/pingpong/mod.rs index 22ce370d..bcf0e27b 100644 --- a/volo-thrift/src/transport/pingpong/mod.rs +++ b/volo-thrift/src/transport/pingpong/mod.rs @@ -4,3 +4,4 @@ mod thrift_transport; pub use client::Client; pub use server::serve; +pub use thrift_transport::ThriftTransport; diff --git a/volo-thrift/src/transport/pingpong/server.rs b/volo-thrift/src/transport/pingpong/server.rs index d7797449..aa0bab6c 100644 --- a/volo-thrift/src/transport/pingpong/server.rs +++ b/volo-thrift/src/transport/pingpong/server.rs @@ -8,7 +8,10 @@ use motore::service::Service; use pilota::thrift::ThriftException; use tokio::sync::futures::Notified; use tracing::*; -use volo::{net::Address, volo_unreachable}; +use volo::{ + net::{shm::ShmExt, Address}, + volo_unreachable, +}; use crate::{ codec::{Decoder, Encoder}, @@ -29,6 +32,7 @@ pub async fn serve( stat_tracer: Arc<[crate::server::TraceFn]>, peer_addr: Option
, span_provider: SP, + stream: Option>, ) where Svc: Service, Svc::Error: Into, @@ -69,6 +73,10 @@ pub async fn serve( peer_addr ); + if let Some(stream) = &stream { + stream.release_previous_read(); + } + // it is promised safe here, because span only reads cx before handling polling let tracing_cx = unsafe { std::mem::transmute::< @@ -178,6 +186,9 @@ pub async fn serve( .instrument(span_provider.on_serve(tracing_cx)) .await; if result.is_err() { + if let Some(mut stream) = stream { + _ = stream.close().await; + } break; } } diff --git a/volo-thrift/src/transport/pingpong/thrift_transport.rs b/volo-thrift/src/transport/pingpong/thrift_transport.rs index 855206ab..dbe10c67 100644 --- a/volo-thrift/src/transport/pingpong/thrift_transport.rs +++ b/volo-thrift/src/transport/pingpong/thrift_transport.rs @@ -2,7 +2,7 @@ use std::sync::{atomic::AtomicUsize, LazyLock}; use pilota::thrift::{ApplicationException, ApplicationExceptionKind}; use pin_project::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; +use volo::net::shm::ShmExt; use crate::{ codec::{Decoder, Encoder, MakeCodec}, @@ -17,6 +17,7 @@ static TRANSPORT_ID_COUNTER: LazyLock = LazyLock::new(|| AtomicUsiz pub struct ThriftTransport { write_half: WriteHalf, read_half: ReadHalf, + stream: Option>, } impl ThriftTransport @@ -25,13 +26,14 @@ where D: Decoder, { pub fn new< - R: AsyncRead + Send + Sync + Unpin + 'static, - W: AsyncWrite + Send + Sync + Unpin + 'static, + R: Send + Sync + Unpin + 'static, + W: Send + Sync + Unpin + 'static, MkC: MakeCodec, >( read_half: R, write_half: W, make_codec: MkC, + stream: Option>, ) -> Self { let id = TRANSPORT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let (encoder, decoder) = make_codec.make_codec(read_half, write_half); @@ -46,6 +48,21 @@ where id, reusable: true, }, + stream, + } + } + + pub async fn reuse(self) { + if let Some(stream) = self.stream { + stream.reuse().await; + } + } + + pub async fn close(&mut self) { + if let Some(stream) = &mut self.stream { + if let Err(err) = stream.close().await { + tracing::error!("[VOLO SHMIPC] transport close stream error: {}", err); + } } } diff --git a/volo-thrift/src/transport/pool/make_transport.rs b/volo-thrift/src/transport/pool/make_transport.rs index 1eed6048..d9fa1c09 100644 --- a/volo-thrift/src/transport/pool/make_transport.rs +++ b/volo-thrift/src/transport/pool/make_transport.rs @@ -2,7 +2,7 @@ use motore::service::UnaryService; -use super::{Key, Pool, Poolable, Pooled, Ver}; +use super::{Key, Pool, Poolable, Transport, Ver}; // pooled make transport wrap the inner MakeTransport and return the pooled transport // when call make_transport @@ -42,18 +42,27 @@ where } } -impl UnaryService<(K, Ver)> for PooledMakeTransport +impl UnaryService<(K, Option, Ver)> for PooledMakeTransport where MT: UnaryService + Send + Clone + 'static + Sync, MT::Response: Poolable + Send, MT::Error: Into + Send, { - type Response = Pooled; + type Response = Transport; type Error = crate::ClientError; - async fn call(&self, kv: (K, Ver)) -> Result { + async fn call(&self, kv: (K, Option, Ver)) -> Result { let mt = self.inner.clone(); - self.pool.get(kv.0, kv.1, mt).await.map_err(Into::into) + if let Some(addr) = kv.1 { + if let Ok(resp) = mt.call(addr.clone()).await { + return Ok(Transport::UnPooled(resp)); + } + } + self.pool + .get(kv.0, kv.2, mt) + .await + .map_err(Into::into) + .map(Into::into) } } diff --git a/volo-thrift/src/transport/pool/mod.rs b/volo-thrift/src/transport/pool/mod.rs index 62365576..5e49bd30 100644 --- a/volo-thrift/src/transport/pool/mod.rs +++ b/volo-thrift/src/transport/pool/mod.rs @@ -29,7 +29,10 @@ use tokio::{ sync::oneshot, time::{interval, Duration, Instant, Interval}, }; -use volo::Unwrap; +use volo::{net::Address, Unwrap}; + +use super::pingpong::ThriftTransport; +use crate::codec::{Decoder, Encoder}; pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} @@ -412,6 +415,67 @@ struct Idle { idle_at: Instant, } +pub enum Transport { + Pooled(Pooled), + UnPooled(T), +} + +impl Transport> { + pub async fn reuse(self) { + match self { + Transport::Pooled(pooled) => { + pooled.reuse(); + } + Transport::UnPooled(t) => { + t.reuse().await; + } + } + } +} + +impl From> for Transport { + fn from(pooled: Pooled) -> Self { + Transport::Pooled(pooled) + } +} + +impl From for Transport { + fn from(t: T) -> Self { + Transport::UnPooled(t) + } +} + +impl AsRef for Transport { + fn as_ref(&self) -> &T { + match self { + Transport::Pooled(pooled) => pooled.t.as_ref().expect("not dropped"), + Transport::UnPooled(t) => t, + } + } +} + +impl AsMut for Transport { + fn as_mut(&mut self) -> &mut T { + match self { + Transport::Pooled(pooled) => pooled.t.as_mut().expect("not dropped"), + Transport::UnPooled(t) => t, + } + } +} + +impl Deref for Transport { + type Target = T; + fn deref(&self) -> &T { + self.as_ref() + } +} + +impl DerefMut for Transport { + fn deref_mut(&mut self) -> &mut T { + self.as_mut() + } +} + #[pin_project] pub struct Pooled { key: Option, @@ -449,31 +513,6 @@ impl Pooled { } } -impl AsRef for Pooled { - fn as_ref(&self) -> &T { - self.t.as_ref().expect("not dropped") - } -} - -impl AsMut for Pooled { - fn as_mut(&mut self) -> &mut T { - self.t.as_mut().expect("not dropped") - } -} - -impl Deref for Pooled { - type Target = T; - fn deref(&self) -> &T { - self.as_ref() - } -} - -impl DerefMut for Pooled { - fn deref_mut(&mut self) -> &mut T { - self.as_mut() - } -} - struct WaiterList { inner: LinkedHashMap>, counter: usize, diff --git a/volo/Cargo.toml b/volo/Cargo.toml index 54484593..47d09e5e 100644 --- a/volo/Cargo.toml +++ b/volo/Cargo.toml @@ -26,7 +26,9 @@ maintenance = { status = "actively-developed" } [dependencies] motore.workspace = true +anyhow.workspace = true async-broadcast.workspace = true +async-trait.workspace = true dashmap.workspace = true faststr.workspace = true futures.workspace = true diff --git a/volo/src/context.rs b/volo/src/context.rs index fcc0aafe..63ee6a2e 100644 --- a/volo/src/context.rs +++ b/volo/src/context.rs @@ -153,6 +153,7 @@ pub struct Endpoint { /// `service_name` is the most important information, which is used by the service discovering. pub service_name: FastStr, pub address: Option
, + pub shmipc_address: Option
, /// `faststr_tags` is a optimized typemap to store additional information of the endpoint. /// /// Use `FastStrMap` instead of `TypeMap` can reduce the Box allocation. @@ -173,6 +174,7 @@ impl Endpoint { Self { service_name, address: None, + shmipc_address: None, faststr_tags: FastStrMap::with_capacity(DEFAULT_MAP_CAPACITY), tags: Default::default(), } @@ -242,6 +244,18 @@ impl Endpoint { self.address.clone() } + /// Sets the shmipc address. + #[inline] + pub fn set_shmipc_address(&mut self, shmipc_address: Address) { + self.shmipc_address = Some(shmipc_address) + } + + /// Gets the shmipc address. + #[inline] + pub fn shmipc_address(&self) -> Option
{ + self.shmipc_address.clone() + } + /// Clear the information #[inline] pub fn clear(&mut self) { diff --git a/volo/src/net/conn.rs b/volo/src/net/conn.rs index a32cde98..f3391ce9 100644 --- a/volo/src/net/conn.rs +++ b/volo/src/net/conn.rs @@ -1,5 +1,6 @@ use std::{ io, + os::fd::{AsRawFd, RawFd}, pin::Pin, task::{Context, Poll}, }; @@ -12,7 +13,33 @@ use tokio::{ net::{tcp, TcpStream}, }; -use super::Address; +use super::{shm::ShmExt, Address}; + +pub trait ConnExt { + type ReadHalf: Unpin + Send + Sync + 'static; + type WriteHalf: Unpin + Send + Sync + 'static; + + fn peer_addr(&self) -> Option
; + fn inner(&self) -> Option> { + None + } + fn into_split(self) -> (Self::ReadHalf, Self::WriteHalf); +} + +impl ConnExt for Conn { + type ReadHalf = OwnedReadHalf; + type WriteHalf = OwnedWriteHalf; + + #[inline] + fn peer_addr(&self) -> Option
{ + self.stream.peer_addr() + } + + #[inline] + fn into_split(self) -> (Self::ReadHalf, Self::WriteHalf) { + self.stream.into_split() + } +} #[derive(Clone)] pub struct ConnInfo { @@ -333,7 +360,28 @@ impl ConnStream { .ok(), } } + + #[inline] + pub fn is_tcp(&self) -> bool { + matches!(self, Self::Tcp(_)) + } +} + +impl AsRawFd for ConnStream { + #[inline] + fn as_raw_fd(&self) -> RawFd { + match self { + Self::Tcp(s) => s.as_raw_fd(), + #[cfg(target_family = "unix")] + Self::Unix(s) => s.as_raw_fd(), + #[cfg(feature = "rustls")] + Self::Rustls(s) => s.as_raw_fd(), + #[cfg(feature = "native-tls")] + Self::NativeTls(s) => s.as_raw_fd(), + } + } } + pub struct Conn { pub stream: ConnStream, pub info: ConnInfo, diff --git a/volo/src/net/dial.rs b/volo/src/net/dial.rs index 746c23be..58e05ad8 100644 --- a/volo/src/net/dial.rs +++ b/volo/src/net/dial.rs @@ -5,25 +5,20 @@ use socket2::{Domain, Protocol, Socket, Type}; #[cfg(target_family = "unix")] use tokio::net::UnixStream; use tokio::{ - io::{AsyncRead, AsyncWrite}, net::{TcpSocket, TcpStream}, time::{timeout, Duration}, }; use super::{ - conn::{Conn, OwnedReadHalf, OwnedWriteHalf}, + conn::{Conn, ConnExt}, Address, }; -/// [`MakeTransport`] creates an [`AsyncRead`] and an [`AsyncWrite`] for the given [`Address`]. +/// [`MakeTransport`] creates a [`Conn`] for the given [`Address`]. pub trait MakeTransport: Clone + Send + Sync + 'static { - type ReadHalf: AsyncRead + Send + Sync + Unpin + 'static; - type WriteHalf: AsyncWrite + Send + Sync + Unpin + 'static; + type Conn: ConnExt; - fn make_transport( - &self, - addr: Address, - ) -> impl Future> + Send; + fn make_transport(&self, addr: Address) -> impl Future> + Send; fn set_connect_timeout(&mut self, timeout: Option); fn set_read_timeout(&mut self, timeout: Option); fn set_write_timeout(&mut self, timeout: Option); @@ -77,14 +72,10 @@ impl DefaultMakeTransport { } impl MakeTransport for DefaultMakeTransport { - type ReadHalf = OwnedReadHalf; + type Conn = Conn; - type WriteHalf = OwnedWriteHalf; - - async fn make_transport(&self, addr: Address) -> io::Result<(Self::ReadHalf, Self::WriteHalf)> { - let conn = self.make_connection(addr).await?; - let (read, write) = conn.stream.into_split(); - Ok((read, write)) + async fn make_transport(&self, addr: Address) -> io::Result { + self.make_connection(addr).await } fn set_connect_timeout(&mut self, timeout: Option) { diff --git a/volo/src/net/incoming.rs b/volo/src/net/incoming.rs index 086ecd20..02e9206f 100644 --- a/volo/src/net/incoming.rs +++ b/volo/src/net/incoming.rs @@ -14,7 +14,10 @@ use tokio::net::UnixListener; use tokio_stream::wrappers::UnixListenerStream; use tokio_stream::{wrappers::TcpListenerStream, StreamExt}; -use super::{conn::Conn, Address}; +use super::{ + conn::{Conn, ConnExt}, + Address, +}; #[pin_project(project = IncomingProj)] #[derive(Debug)] @@ -46,10 +49,14 @@ impl From for DefaultIncoming { } pub trait Incoming: fmt::Debug + Send + 'static { - fn accept(&mut self) -> impl Future>> + Send; + type Conn: ConnExt; + + fn accept(&mut self) -> impl Future>> + Send; } impl Incoming for DefaultIncoming { + type Conn = Conn; + async fn accept(&mut self) -> io::Result> { if let Some(conn) = self.try_next().await? { tracing::trace!("[VOLO] recv a connection from: {:?}", conn.info.peer_addr); diff --git a/volo/src/net/mod.rs b/volo/src/net/mod.rs index 93b85f6f..46af44ca 100644 --- a/volo/src/net/mod.rs +++ b/volo/src/net/mod.rs @@ -2,6 +2,7 @@ pub mod conn; pub mod dial; pub mod incoming; pub mod ready; +pub mod shm; #[cfg(feature = "__tls")] #[cfg_attr(docsrs, doc(cfg(any(feature = "rustls", feature = "native-tls"))))] pub mod tls; diff --git a/volo/src/net/shm.rs b/volo/src/net/shm.rs new file mode 100644 index 00000000..7083a3c2 --- /dev/null +++ b/volo/src/net/shm.rs @@ -0,0 +1,10 @@ +#[async_trait::async_trait] +pub trait ShmExt: Send + Sync { + fn release_previous_read(&self) {} + + async fn close(&mut self) -> Result<(), anyhow::Error> { + Ok(()) + } + + async fn reuse(&self) {} +} diff --git a/volo/src/net/tls/mod.rs b/volo/src/net/tls/mod.rs index 084fe5d4..1bf990b1 100644 --- a/volo/src/net/tls/mod.rs +++ b/volo/src/net/tls/mod.rs @@ -15,10 +15,7 @@ use super::{ conn::ConnStream, dial::{make_tcp_connection, Config, MakeTransport}, }; -use crate::net::{ - conn::{Conn, OwnedReadHalf, OwnedWriteHalf}, - Address, -}; +use crate::net::{conn::Conn, Address}; #[cfg(feature = "native-tls")] mod native_tls; @@ -294,13 +291,10 @@ impl UnaryService
for TlsMakeTransport { } impl MakeTransport for TlsMakeTransport { - type ReadHalf = OwnedReadHalf; - type WriteHalf = OwnedWriteHalf; + type Conn = Conn; - async fn make_transport(&self, addr: Address) -> Result<(Self::ReadHalf, Self::WriteHalf)> { - let conn = self.make_connection(addr).await?; - let (read, write) = conn.stream.into_split(); - Ok((read, write)) + async fn make_transport(&self, addr: Address) -> Result { + self.make_connection(addr).await } fn set_connect_timeout(&mut self, timeout: Option) { From 8b9897e352184af0467449cf899883889b435fb9 Mon Sep 17 00:00:00 2001 From: Millione Date: Sun, 28 Apr 2024 19:24:59 +0800 Subject: [PATCH 2/9] chore: change `release_previous_read` to `release_read_and_reuse` --- volo-thrift/src/transport/pingpong/server.rs | 2 +- volo/src/net/shm.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/volo-thrift/src/transport/pingpong/server.rs b/volo-thrift/src/transport/pingpong/server.rs index aa0bab6c..a99e15d4 100644 --- a/volo-thrift/src/transport/pingpong/server.rs +++ b/volo-thrift/src/transport/pingpong/server.rs @@ -74,7 +74,7 @@ pub async fn serve( ); if let Some(stream) = &stream { - stream.release_previous_read(); + stream.release_read_and_reuse(); } // it is promised safe here, because span only reads cx before handling polling diff --git a/volo/src/net/shm.rs b/volo/src/net/shm.rs index 7083a3c2..91e4ffe8 100644 --- a/volo/src/net/shm.rs +++ b/volo/src/net/shm.rs @@ -1,6 +1,6 @@ #[async_trait::async_trait] pub trait ShmExt: Send + Sync { - fn release_previous_read(&self) {} + fn release_read_and_reuse(&self) {} async fn close(&mut self) -> Result<(), anyhow::Error> { Ok(()) From 19a3313ed4a97ebe6d9cbfed280b6dab616e008a Mon Sep 17 00:00:00 2001 From: Millione Date: Mon, 29 Apr 2024 11:41:59 +0800 Subject: [PATCH 3/9] fix: escape oneway for shmipc --- volo-thrift/src/transport/pingpong/client.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index a5f7cb55..50b0dffb 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -123,10 +123,15 @@ where let shmipc_target = rpc_info.callee().shmipc_address(); let oneway = cx.message_type == TMessageType::OneWay; cx.stats.record_make_transport_start_at(); - let mut transport = self - .make_transport - .call((target, shmipc_target.clone(), Ver::PingPong)) - .await?; + let mut transport = if !oneway { + self.make_transport + .call((target, shmipc_target.clone(), Ver::PingPong)) + .await? + } else { + self.make_transport + .call((target, None, Ver::PingPong)) + .await? + }; cx.stats.record_make_transport_end_at(); let resp = transport.send(cx, req, oneway).await; if let Ok(None) = resp { From dbaff670ee9c5505388316c62c0603b3628580a6 Mon Sep 17 00:00:00 2001 From: Millione Date: Tue, 30 Apr 2024 17:28:50 +0800 Subject: [PATCH 4/9] fix: support oneway for shmipc --- volo-thrift/src/transport/pingpong/client.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index 50b0dffb..a5f7cb55 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -123,15 +123,10 @@ where let shmipc_target = rpc_info.callee().shmipc_address(); let oneway = cx.message_type == TMessageType::OneWay; cx.stats.record_make_transport_start_at(); - let mut transport = if !oneway { - self.make_transport - .call((target, shmipc_target.clone(), Ver::PingPong)) - .await? - } else { - self.make_transport - .call((target, None, Ver::PingPong)) - .await? - }; + let mut transport = self + .make_transport + .call((target, shmipc_target.clone(), Ver::PingPong)) + .await?; cx.stats.record_make_transport_end_at(); let resp = transport.send(cx, req, oneway).await; if let Ok(None) = resp { From b56b8d70f096dca80ed17bdb705e78cfcad25657 Mon Sep 17 00:00:00 2001 From: Millione Date: Tue, 30 Apr 2024 19:46:30 +0800 Subject: [PATCH 5/9] feat: add transport tag --- volo-thrift/src/transport/pingpong/client.rs | 12 ++++++-- volo-thrift/src/transport/pingpong/server.rs | 18 +++++++++-- volo/src/net/shm.rs | 32 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 5 deletions(-) diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index a5f7cb55..78758fb7 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -2,7 +2,10 @@ use std::{io, marker::PhantomData}; use motore::service::{Service, UnaryService}; use pilota::thrift::TransportException; -use volo::net::{conn::ConnExt, dial::MakeTransport, Address}; +use volo::{ + net::{conn::ConnExt, dial::MakeTransport, shm::TransportEndpoint, Address}, + FastStr, +}; use crate::{ codec::MakeCodec, @@ -10,7 +13,7 @@ use crate::{ protocol::TMessageType, transport::{ pingpong::thrift_transport::ThriftTransport, - pool::{Config, PooledMakeTransport, Ver}, + pool::{Config, PooledMakeTransport, Transport, Ver}, }, EntryMessage, ThriftMessage, }; @@ -128,6 +131,11 @@ where .call((target, shmipc_target.clone(), Ver::PingPong)) .await?; cx.stats.record_make_transport_end_at(); + if let Transport::UnPooled(_) = transport { + cx.rpc_info + .caller_mut() + .set_transport(volo::net::shm::Transport(FastStr::new("shmipc"))) + } let resp = transport.send(cx, req, oneway).await; if let Ok(None) = resp { if !oneway { diff --git a/volo-thrift/src/transport/pingpong/server.rs b/volo-thrift/src/transport/pingpong/server.rs index a99e15d4..ad8e8fc1 100644 --- a/volo-thrift/src/transport/pingpong/server.rs +++ b/volo-thrift/src/transport/pingpong/server.rs @@ -9,8 +9,11 @@ use pilota::thrift::ThriftException; use tokio::sync::futures::Notified; use tracing::*; use volo::{ - net::{shm::ShmExt, Address}, - volo_unreachable, + net::{ + shm::{ShmExt, TransportEndpoint}, + Address, + }, + volo_unreachable, FastStr, }; use crate::{ @@ -53,7 +56,16 @@ pub async fn serve( cache.pop().unwrap_or_default() }); if let Some(peer_addr) = &peer_addr { - cx.rpc_info.caller_mut().set_address(peer_addr.clone()); + if stream.is_none() { + cx.rpc_info.caller_mut().set_address(peer_addr.clone()); + } else { + cx.rpc_info + .caller_mut() + .set_transport(volo::net::shm::Transport(FastStr::new("shmipc"))); + cx.rpc_info + .caller_mut() + .set_shmipc_address(peer_addr.clone()); + } } let msg = tokio::select! { diff --git a/volo/src/net/shm.rs b/volo/src/net/shm.rs index 91e4ffe8..61c7b65f 100644 --- a/volo/src/net/shm.rs +++ b/volo/src/net/shm.rs @@ -1,3 +1,35 @@ +use crate::context::Endpoint; + +crate::new_type! { + #[derive(Debug, Hash, PartialEq, Eq, Clone)] + pub struct Transport(pub faststr::FastStr); +} + +pub trait TransportEndpoint { + fn get_transport(&self) -> Option; + fn has_transport(&self) -> bool; + fn set_transport(&mut self, transport: Transport); +} + +impl TransportEndpoint for Endpoint { + #[inline] + fn get_transport(&self) -> Option { + self.get_faststr::() + .cloned() + .map(Transport::from) + } + + #[inline] + fn has_transport(&self) -> bool { + self.contains_faststr::() + } + + #[inline] + fn set_transport(&mut self, transport: Transport) { + self.insert_faststr::(transport.0); + } +} + #[async_trait::async_trait] pub trait ShmExt: Send + Sync { fn release_read_and_reuse(&self) {} From d6c8003af56ac58589ff10eb1544624d29603c1d Mon Sep 17 00:00:00 2001 From: Millione Date: Thu, 8 Aug 2024 15:34:54 +0800 Subject: [PATCH 6/9] rebase main branch --- volo-thrift/src/transport/multiplex/client.rs | 2 +- volo-thrift/src/transport/pool/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/volo-thrift/src/transport/multiplex/client.rs b/volo-thrift/src/transport/multiplex/client.rs index b8620a93..b949e55e 100644 --- a/volo-thrift/src/transport/multiplex/client.rs +++ b/volo-thrift/src/transport/multiplex/client.rs @@ -158,7 +158,7 @@ where } if cx.transport.should_reuse && resp.is_ok() { if let Transport::Pooled(pooled) = transport { - pooled.reuse(); + pooled.reuse().await; } } resp diff --git a/volo-thrift/src/transport/pool/mod.rs b/volo-thrift/src/transport/pool/mod.rs index 5e49bd30..ee6af47b 100644 --- a/volo-thrift/src/transport/pool/mod.rs +++ b/volo-thrift/src/transport/pool/mod.rs @@ -424,7 +424,7 @@ impl Transport> { pub async fn reuse(self) { match self { Transport::Pooled(pooled) => { - pooled.reuse(); + pooled.reuse().await; } Transport::UnPooled(t) => { t.reuse().await; From 9370f246d658289347dc6405634555e84a35f0e6 Mon Sep 17 00:00:00 2001 From: Millione Date: Fri, 9 Aug 2024 11:09:39 +0800 Subject: [PATCH 7/9] fix ci --- volo-thrift/src/codec/mod.rs | 7 +++---- volo/src/net/conn.rs | 5 +++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/volo-thrift/src/codec/mod.rs b/volo-thrift/src/codec/mod.rs index 7c8923c9..240d912e 100644 --- a/volo-thrift/src/codec/mod.rs +++ b/volo-thrift/src/codec/mod.rs @@ -8,7 +8,7 @@ pub mod default; pub use default::DefaultMakeCodec; -/// [`Decoder`] reads from an [`AsyncRead`] and decodes the data into a [`ThriftMessage`]. +/// [`Decoder`] decodes the data into a [`ThriftMessage`]. /// /// Returning an Ok(None) indicates the EOF has been reached. /// @@ -24,7 +24,7 @@ pub trait Decoder: Send + Sync + 'static { } } -/// [`Encoder`] writes a [`ThriftMessage`] to an [`AsyncWrite`] and flushes the data. +/// [`Encoder`] writes a [`ThriftMessage`] and flushes the data. /// /// Note: [`Encoder`] should be designed to be ready for reuse. pub trait Encoder: Send + Sync + 'static { @@ -39,8 +39,7 @@ pub trait Encoder: Send + Sync + 'static { } } -/// [`MakeCodec`] receives an [`R`] and an [`W`] and returns a -/// [`Decoder`] and an [`Encoder`]. +/// [`MakeCodec`] returns a [`Decoder`] and an [`Encoder`]. /// /// The implementation of [`MakeCodec`] must make sure the [`Decoder`] and [`Encoder`] /// matches. diff --git a/volo/src/net/conn.rs b/volo/src/net/conn.rs index f3391ce9..531063f9 100644 --- a/volo/src/net/conn.rs +++ b/volo/src/net/conn.rs @@ -1,6 +1,7 @@ +#[cfg(target_family = "unix")] +use std::os::fd::{AsRawFd, RawFd}; use std::{ io, - os::fd::{AsRawFd, RawFd}, pin::Pin, task::{Context, Poll}, }; @@ -367,12 +368,12 @@ impl ConnStream { } } +#[cfg(target_family = "unix")] impl AsRawFd for ConnStream { #[inline] fn as_raw_fd(&self) -> RawFd { match self { Self::Tcp(s) => s.as_raw_fd(), - #[cfg(target_family = "unix")] Self::Unix(s) => s.as_raw_fd(), #[cfg(feature = "rustls")] Self::Rustls(s) => s.as_raw_fd(), From 088309979403073de9992e084b8e41a442548cc5 Mon Sep 17 00:00:00 2001 From: Millione Date: Tue, 3 Sep 2024 20:28:50 +0800 Subject: [PATCH 8/9] rename --- volo-thrift/src/transport/incoming.rs | 6 ------ volo-thrift/src/transport/multiplex/client.rs | 2 +- volo-thrift/src/transport/pingpong/client.rs | 2 +- volo-thrift/src/transport/pingpong/server.rs | 2 +- .../src/transport/pool/make_transport.rs | 2 +- volo-thrift/src/transport/pool/mod.rs | 20 +++++++++---------- 6 files changed, 14 insertions(+), 20 deletions(-) diff --git a/volo-thrift/src/transport/incoming.rs b/volo-thrift/src/transport/incoming.rs index 5a03246f..7a6d3e51 100644 --- a/volo-thrift/src/transport/incoming.rs +++ b/volo-thrift/src/transport/incoming.rs @@ -30,9 +30,3 @@ impl Stream for Incoming { } } } - -#[cfg(test)] -mod tests { - #[test] - fn test() {} -} diff --git a/volo-thrift/src/transport/multiplex/client.rs b/volo-thrift/src/transport/multiplex/client.rs index b949e55e..1fe3a02e 100644 --- a/volo-thrift/src/transport/multiplex/client.rs +++ b/volo-thrift/src/transport/multiplex/client.rs @@ -157,7 +157,7 @@ where } } if cx.transport.should_reuse && resp.is_ok() { - if let Transport::Pooled(pooled) = transport { + if let Transport::TcpOrUnix(pooled) = transport { pooled.reuse().await; } } diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index 78758fb7..b97b4bb7 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -131,7 +131,7 @@ where .call((target, shmipc_target.clone(), Ver::PingPong)) .await?; cx.stats.record_make_transport_end_at(); - if let Transport::UnPooled(_) = transport { + if let Transport::Shm(_) = transport { cx.rpc_info .caller_mut() .set_transport(volo::net::shm::Transport(FastStr::new("shmipc"))) diff --git a/volo-thrift/src/transport/pingpong/server.rs b/volo-thrift/src/transport/pingpong/server.rs index ad8e8fc1..f75870a3 100644 --- a/volo-thrift/src/transport/pingpong/server.rs +++ b/volo-thrift/src/transport/pingpong/server.rs @@ -60,7 +60,7 @@ pub async fn serve( cx.rpc_info.caller_mut().set_address(peer_addr.clone()); } else { cx.rpc_info - .caller_mut() + .callee_mut() .set_transport(volo::net::shm::Transport(FastStr::new("shmipc"))); cx.rpc_info .caller_mut() diff --git a/volo-thrift/src/transport/pool/make_transport.rs b/volo-thrift/src/transport/pool/make_transport.rs index d9fa1c09..9393eeac 100644 --- a/volo-thrift/src/transport/pool/make_transport.rs +++ b/volo-thrift/src/transport/pool/make_transport.rs @@ -56,7 +56,7 @@ where let mt = self.inner.clone(); if let Some(addr) = kv.1 { if let Ok(resp) = mt.call(addr.clone()).await { - return Ok(Transport::UnPooled(resp)); + return Ok(Transport::Shm(resp)); } } self.pool diff --git a/volo-thrift/src/transport/pool/mod.rs b/volo-thrift/src/transport/pool/mod.rs index ee6af47b..9334a8d7 100644 --- a/volo-thrift/src/transport/pool/mod.rs +++ b/volo-thrift/src/transport/pool/mod.rs @@ -416,17 +416,17 @@ struct Idle { } pub enum Transport { - Pooled(Pooled), - UnPooled(T), + TcpOrUnix(Pooled), + Shm(T), } impl Transport> { pub async fn reuse(self) { match self { - Transport::Pooled(pooled) => { + Transport::TcpOrUnix(pooled) => { pooled.reuse().await; } - Transport::UnPooled(t) => { + Transport::Shm(t) => { t.reuse().await; } } @@ -435,21 +435,21 @@ impl Transport> { impl From> for Transport { fn from(pooled: Pooled) -> Self { - Transport::Pooled(pooled) + Transport::TcpOrUnix(pooled) } } impl From for Transport { fn from(t: T) -> Self { - Transport::UnPooled(t) + Transport::Shm(t) } } impl AsRef for Transport { fn as_ref(&self) -> &T { match self { - Transport::Pooled(pooled) => pooled.t.as_ref().expect("not dropped"), - Transport::UnPooled(t) => t, + Transport::TcpOrUnix(pooled) => pooled.t.as_ref().expect("not dropped"), + Transport::Shm(t) => t, } } } @@ -457,8 +457,8 @@ impl AsRef for Transport { impl AsMut for Transport { fn as_mut(&mut self) -> &mut T { match self { - Transport::Pooled(pooled) => pooled.t.as_mut().expect("not dropped"), - Transport::UnPooled(t) => t, + Transport::TcpOrUnix(pooled) => pooled.t.as_mut().expect("not dropped"), + Transport::Shm(t) => t, } } } From 9f4d7d568b61b6d0f69cdb2d712db47c38f376be Mon Sep 17 00:00:00 2001 From: Millione Date: Tue, 15 Oct 2024 17:22:53 +0800 Subject: [PATCH 9/9] rebase --- volo-http/src/utils/test_helpers.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/volo-http/src/utils/test_helpers.rs b/volo-http/src/utils/test_helpers.rs index 81ce6d64..f9daf9b7 100644 --- a/volo-http/src/utils/test_helpers.rs +++ b/volo-http/src/utils/test_helpers.rs @@ -82,6 +82,7 @@ mod convert_service { Endpoint { service_name: ep.service_name.clone(), address: ep.address.clone(), + shmipc_address: ep.shmipc_address.clone(), faststr_tags: Default::default(), tags: Default::default(), }