From 58b1032c498e3303feb00c69ca060314d9cdf218 Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Tue, 5 May 2020 15:21:01 +0100 Subject: [PATCH 1/9] Load balancing with dynamic discovery Attempt to add a feature to Tonic so client side load balancing can be done over a group of endpoints that change dynamically over time. An external service which is responsible for discovering new endpoints and monitoring their health could make a decision to add/remove an endpoint from the load balancing group in Tonic client. modified: examples/Cargo.toml new file: examples/src/load_balance_with_discovery/client.rs new file: examples/src/load_balance_with_discovery/server.rs modified: tonic/src/transport/channel/endpoint.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs --- examples/Cargo.toml | 12 +- .../src/load_balance_with_discovery/client.rs | 118 ++++++++++++++++++ .../src/load_balance_with_discovery/server.rs | 82 ++++++++++++ tonic/src/transport/channel/endpoint.rs | 5 + tonic/src/transport/channel/mod.rs | 9 +- tonic/src/transport/mod.rs | 2 +- tonic/src/transport/service/discover.rs | 58 +++++++++ tonic/src/transport/service/mod.rs | 1 + 8 files changed, 283 insertions(+), 4 deletions(-) create mode 100644 examples/src/load_balance_with_discovery/client.rs create mode 100644 examples/src/load_balance_with_discovery/server.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 7f29b990a..00f648acc 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -42,6 +42,14 @@ path = "src/load_balance/client.rs" name = "load-balance-server" path = "src/load_balance/server.rs" +[[bin]] +name = "load-balance-client-discovery" +path = "src/load_balance_with_discovery/client.rs" + +[[bin]] +name = "load-balance-server-discovery" +path = "src/load_balance_with_discovery/server.rs" + [[bin]] name = "tls-client" path = "src/tls/client.rs" @@ -116,14 +124,14 @@ prost = "0.6" tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] } futures = { version = "0.3", default-features = false, features = ["alloc"] } async-stream = "0.2" -tower = "0.3" +tower = { version="0.3"} # Required for routeguide serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" rand = "0.7" # Tracing tracing = "0.1" -tracing-subscriber = { version = "0.2.0-alpha", features = ["tracing-log"] } +tracing-subscriber = { version = "0.2", features = ["tracing-log"] } tracing-attributes = "0.1" tracing-futures = "0.2" # Required for wellknown types diff --git a/examples/src/load_balance_with_discovery/client.rs b/examples/src/load_balance_with_discovery/client.rs new file mode 100644 index 000000000..2d6a8fbea --- /dev/null +++ b/examples/src/load_balance_with_discovery/client.rs @@ -0,0 +1,118 @@ +pub mod pb { + tonic::include_proto!("grpc.examples.echo"); +} + +use pb::{echo_client::EchoClient, EchoRequest}; +use tonic::transport::Channel; +use std::collections::VecDeque; +use tonic::{transport::Endpoint}; +use tonic::{transport::EndpointManager}; +use std::sync::Arc; +use tokio::sync::Mutex; +use std::sync::atomic::{AtomicBool,Ordering::SeqCst}; +use tokio::time::timeout; + + + +#[derive(Clone)] +struct SimpleEndpointManager{ + to_add: Arc>>, + to_remove: Arc>>, + +} + + +impl SimpleEndpointManager{ + fn new()->Self{ + SimpleEndpointManager{ + to_add: Arc::new(Mutex::new(VecDeque::new())), + to_remove: Arc::new(Mutex::new(VecDeque::new())), + } + } + async fn add(&self,key:usize, endpoint:Endpoint)->usize{ + let mut to_add = self.to_add.lock().await; + to_add.push_back((key,endpoint)); + key + } + + async fn remove(&self, key:usize){ + let mut to_remove = self.to_remove.lock().await; + to_remove.push_back(key); + } +} + + +impl EndpointManager for SimpleEndpointManager{ + fn to_add(&self)->Option<(usize,Endpoint)>{ + match self.to_add.try_lock(){ + Ok(mut to_add) => to_add.pop_front(), + Err(e) => { + println!("error {:?}",e); + None + } + } + + } + + fn to_remove(& self)->Option{ + match self.to_remove.try_lock(){ + Ok(mut to_remove) => to_remove.pop_front(), + Err(_) => None + } + + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let e1 = Endpoint::from_static("http://[::1]:50051").timeout(std::time::Duration::from_secs(1)); + let e2 = Endpoint::from_static("http://[::1]:50052").timeout(std::time::Duration::from_secs(1)); + let em = Box::new(SimpleEndpointManager::new()); + let channel = Channel::balance_with_manager(em.clone()); + let mut client = EchoClient::new(channel); + + let done= Arc::new(AtomicBool::new(false)); + let demo_done = done.clone(); + tokio::spawn(async move{ + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Added first endpoint"); + let e1_id = em.add(1,e1).await; + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Added second endpoint"); + let e2_id = em.add(2,e2).await; + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Removed first endpoint"); + em.remove(e1_id).await; + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Removed second endpoint"); + em.remove(e2_id).await; + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Added third endpoint"); + let e3 = Endpoint::from_static("http://[::1]:50051"); + let e3_id = em.add(3,e3).await; + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Removed third endpoint"); + em.remove(e3_id).await; + demo_done.swap(true,SeqCst); + }); + + while !done.load(SeqCst){ + tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await; + let request = tonic::Request::new(EchoRequest { + message: "hello".into(), + }); + + let rx = client.unary_echo(request); + if let Ok(resp) = timeout(tokio::time::Duration::from_secs(10), rx).await { + println!("RESPONSE={:?}", resp); + }else{ + println!("did not receive value within 10 secs"); + } + } + + println!("... Bye"); + + Ok(()) +} + + diff --git a/examples/src/load_balance_with_discovery/server.rs b/examples/src/load_balance_with_discovery/server.rs new file mode 100644 index 000000000..8623d8f7a --- /dev/null +++ b/examples/src/load_balance_with_discovery/server.rs @@ -0,0 +1,82 @@ +pub mod pb { + tonic::include_proto!("grpc.examples.echo"); +} + +use futures::Stream; +use std::net::SocketAddr; +use std::pin::Pin; +use tokio::sync::mpsc; +use tonic::{transport::Server, Request, Response, Status, Streaming}; + +use pb::{EchoRequest, EchoResponse}; + +type EchoResult = Result, Status>; +type ResponseStream = Pin> + Send + Sync>>; + +#[derive(Debug)] +pub struct EchoServer { + addr: SocketAddr, +} + +#[tonic::async_trait] +impl pb::echo_server::Echo for EchoServer { + async fn unary_echo(&self, request: Request) -> EchoResult { + let message = format!("{} (from {})", request.into_inner().message, self.addr); + + Ok(Response::new(EchoResponse { message })) + } + + type ServerStreamingEchoStream = ResponseStream; + + async fn server_streaming_echo( + &self, + _: Request, + ) -> EchoResult { + Err(Status::unimplemented("not implemented")) + } + + async fn client_streaming_echo( + &self, + _: Request>, + ) -> EchoResult { + Err(Status::unimplemented("not implemented")) + } + + type BidirectionalStreamingEchoStream = ResponseStream; + + async fn bidirectional_streaming_echo( + &self, + _: Request>, + ) -> EchoResult { + Err(Status::unimplemented("not implemented")) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addrs = ["[::1]:50051", "[::1]:50052"]; + + let (tx, mut rx) = mpsc::unbounded_channel(); + + for addr in &addrs { + let addr = addr.parse()?; + let tx = tx.clone(); + + let server = EchoServer { addr }; + let serve = Server::builder() + .add_service(pb::echo_server::EchoServer::new(server)) + .serve(addr); + + tokio::spawn(async move { + if let Err(e) = serve.await { + eprintln!("Error = {:?}", e); + } + + tx.send(()).unwrap(); + }); + } + + rx.recv().await; + + Ok(()) +} diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index f9c6e7631..73d0af706 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -14,6 +14,11 @@ use std::{ }; use tower_make::MakeConnection; +pub trait EndpointManager:Send+'static{ + fn to_add(&self)->Option<(usize, Endpoint)>; + fn to_remove(&self)->Option; +} + /// Channel builder. /// /// This struct is used to build and configure HTTP/2 channels. diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index acbafb2cf..0bc819c4e 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -6,10 +6,11 @@ mod endpoint; mod tls; pub use endpoint::Endpoint; +pub use endpoint::EndpointManager; #[cfg(feature = "tls")] pub use tls::ClientTlsConfig; -use super::service::{Connection, ServiceList}; +use super::service::{Connection, ServiceList,DynamicServiceList}; use crate::{body::BoxBody, client::GrpcService}; use bytes::Bytes; use http::{ @@ -117,6 +118,12 @@ impl Channel { Self::balance(discover, buffer_size) } + pub fn balance_with_manager(manager: Box) -> Self { + let list = DynamicServiceList::new(manager); + Self::balance(list, DEFAULT_BUFFER_SIZE) + } + + pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result where C: Service + Send + 'static, diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index c970ee314..6a660ab34 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -94,7 +94,7 @@ mod service; mod tls; #[doc(inline)] -pub use self::channel::{Channel, Endpoint}; +pub use self::channel::{Channel, Endpoint,EndpointManager}; pub use self::error::Error; #[doc(inline)] pub use self::server::{NamedService, Server}; diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 534e26c45..902c961b1 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -1,5 +1,6 @@ use super::connection::Connection; use crate::transport::Endpoint; +use crate::transport::EndpointManager; use std::{ collections::VecDeque, fmt, @@ -16,6 +17,12 @@ pub(crate) struct ServiceList { i: usize, } +pub(crate) struct DynamicServiceList { + manager: Box, + connecting: + Option<(usize, Pin> + Send + 'static>>)>, +} + impl ServiceList { pub(crate) fn new(list: Vec) -> Self { Self { @@ -69,3 +76,54 @@ impl fmt::Debug for ServiceList { .finish() } } + + +impl DynamicServiceList { + pub(crate) fn new(manager:Box) -> Self { + Self { + manager, + connecting: None, + } + } +} + + +impl Discover for DynamicServiceList { + type Key = usize; + type Service = Connection; + type Error = crate::Error; + + fn poll_discover( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + loop { + let waker = cx.waker().clone(); + if let Some((key,connecting)) = &mut self.connecting { + let svc = futures_core::ready!(Pin::new(connecting).poll(cx))?; + let key = key.clone(); + self.connecting = None; + let change = Ok(Change::Insert(key, svc)); + return Poll::Ready(change); + }; + + if let Some(key) = self.manager.to_remove(){ + let change = Ok(Change::Remove(key)); + return Poll::Ready(change); + }; + + if let Some((key,endpoint)) = self.manager.to_add() { + let mut http = hyper::client::connect::HttpConnector::new(); + http.set_nodelay(endpoint.tcp_nodelay); + http.set_keepalive(endpoint.tcp_keepalive); + + let fut = Connection::new(http, endpoint); + self.connecting = Some((key,Box::pin(fut))); + } else { + waker.wake(); + return Poll::Pending; + } + + } + } +} diff --git a/tonic/src/transport/service/mod.rs b/tonic/src/transport/service/mod.rs index 3bc691871..b43b45cbe 100644 --- a/tonic/src/transport/service/mod.rs +++ b/tonic/src/transport/service/mod.rs @@ -13,6 +13,7 @@ pub(crate) use self::add_origin::AddOrigin; pub(crate) use self::connection::Connection; pub(crate) use self::connector::connector; pub(crate) use self::discover::ServiceList; +pub(crate) use self::discover::DynamicServiceList; pub(crate) use self::io::ServerIo; pub(crate) use self::layer::ServiceBuilderExt; pub(crate) use self::router::{Or, Routes}; From 654bcda89f0f1bb41db71b1cf988fbae620ceb79 Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Mon, 11 May 2020 10:35:30 +0100 Subject: [PATCH 2/9] Load balancing with dynamic discovery Take 2 after code review. Changing main interface to take a stream of tower:Change(K,Endpoint) which will get converted to Change:(K,Connection) modified: examples/src/load_balance_with_discovery/client.rs modified: tonic/src/transport/channel/endpoint.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs --- .../src/load_balance_with_discovery/client.rs | 147 +++++++----------- tonic/src/transport/channel/endpoint.rs | 5 - tonic/src/transport/channel/mod.rs | 17 +- tonic/src/transport/mod.rs | 2 +- tonic/src/transport/service/discover.rs | 70 +++++---- tonic/src/transport/service/mod.rs | 2 +- 6 files changed, 107 insertions(+), 136 deletions(-) diff --git a/examples/src/load_balance_with_discovery/client.rs b/examples/src/load_balance_with_discovery/client.rs index 2d6a8fbea..9b9a6fb1c 100644 --- a/examples/src/load_balance_with_discovery/client.rs +++ b/examples/src/load_balance_with_discovery/client.rs @@ -4,115 +4,80 @@ pub mod pb { use pb::{echo_client::EchoClient, EchoRequest}; use tonic::transport::Channel; -use std::collections::VecDeque; -use tonic::{transport::Endpoint}; -use tonic::{transport::EndpointManager}; -use std::sync::Arc; -use tokio::sync::Mutex; -use std::sync::atomic::{AtomicBool,Ordering::SeqCst}; -use tokio::time::timeout; - - - -#[derive(Clone)] -struct SimpleEndpointManager{ - to_add: Arc>>, - to_remove: Arc>>, - -} - - -impl SimpleEndpointManager{ - fn new()->Self{ - SimpleEndpointManager{ - to_add: Arc::new(Mutex::new(VecDeque::new())), - to_remove: Arc::new(Mutex::new(VecDeque::new())), - } - } - async fn add(&self,key:usize, endpoint:Endpoint)->usize{ - let mut to_add = self.to_add.lock().await; - to_add.push_back((key,endpoint)); - key - } - - async fn remove(&self, key:usize){ - let mut to_remove = self.to_remove.lock().await; - to_remove.push_back(key); - } -} - -impl EndpointManager for SimpleEndpointManager{ - fn to_add(&self)->Option<(usize,Endpoint)>{ - match self.to_add.try_lock(){ - Ok(mut to_add) => to_add.pop_front(), - Err(e) => { - println!("error {:?}",e); - None - } - } - - } +use tonic::transport::Endpoint; - fn to_remove(& self)->Option{ - match self.to_remove.try_lock(){ - Ok(mut to_remove) => to_remove.pop_front(), - Err(_) => None - } +use std::sync::Arc; - } -} +use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +use tokio::time::timeout; +use tower::discover::Change; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { let e1 = Endpoint::from_static("http://[::1]:50051").timeout(std::time::Duration::from_secs(1)); let e2 = Endpoint::from_static("http://[::1]:50052").timeout(std::time::Duration::from_secs(1)); - let em = Box::new(SimpleEndpointManager::new()); - let channel = Channel::balance_with_manager(em.clone()); + + let (mut rx, tx) = tokio::sync::mpsc::channel(10); + + let channel = Channel::balance_channel(tx); let mut client = EchoClient::new(channel); - let done= Arc::new(AtomicBool::new(false)); + let done = Arc::new(AtomicBool::new(false)); let demo_done = done.clone(); - tokio::spawn(async move{ - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; - println!("Added first endpoint"); - let e1_id = em.add(1,e1).await; - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; - println!("Added second endpoint"); - let e2_id = em.add(2,e2).await; - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; - println!("Removed first endpoint"); - em.remove(e1_id).await; - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; - println!("Removed second endpoint"); - em.remove(e2_id).await; - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; - println!("Added third endpoint"); - let e3 = Endpoint::from_static("http://[::1]:50051"); - let e3_id = em.add(3,e3).await; - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; - println!("Removed third endpoint"); - em.remove(e3_id).await; - demo_done.swap(true,SeqCst); + tokio::spawn(async move { + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Added first endpoint"); + let change = Change::Insert("1", e1); + let res = rx.send(change).await; + println!("{:?}", res); + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Added second endpoint"); + let change = Change::Insert("2", e2); + let res = rx.send(change).await; + println!("{:?}", res); + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Removed first endpoint"); + let change = Change::Remove("1"); + let res = rx.send(change).await; + println!("{:?}", res); + + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Removed second endpoint"); + let change = Change::Remove("2"); + let res = rx.send(change).await; + println!("{:?}", res); + + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Added third endpoint"); + let e3 = Endpoint::from_static("http://[::1]:50051"); + let change = Change::Insert("3", e3); + let res = rx.send(change).await; + println!("{:?}", res); + + tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + println!("Removed third endpoint"); + let change = Change::Remove("3"); + let res = rx.send(change).await; + println!("{:?}", res); + demo_done.swap(true, SeqCst); }); - - while !done.load(SeqCst){ - tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await; + + while !done.load(SeqCst) { + tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await; let request = tonic::Request::new(EchoRequest { message: "hello".into(), }); - let rx = client.unary_echo(request); - if let Ok(resp) = timeout(tokio::time::Duration::from_secs(10), rx).await { - println!("RESPONSE={:?}", resp); - }else{ - println!("did not receive value within 10 secs"); - } + let rx = client.unary_echo(request); + if let Ok(resp) = timeout(tokio::time::Duration::from_secs(10), rx).await { + println!("RESPONSE={:?}", resp); + } else { + println!("did not receive value within 10 secs"); + } } println!("... Bye"); Ok(()) } - - diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 73d0af706..f9c6e7631 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -14,11 +14,6 @@ use std::{ }; use tower_make::MakeConnection; -pub trait EndpointManager:Send+'static{ - fn to_add(&self)->Option<(usize, Endpoint)>; - fn to_remove(&self)->Option; -} - /// Channel builder. /// /// This struct is used to build and configure HTTP/2 channels. diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index 0bc819c4e..ff0836630 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -6,11 +6,10 @@ mod endpoint; mod tls; pub use endpoint::Endpoint; -pub use endpoint::EndpointManager; #[cfg(feature = "tls")] pub use tls::ClientTlsConfig; -use super::service::{Connection, ServiceList,DynamicServiceList}; +use super::service::{Connection, DynamicServiceStream, ServiceList}; use crate::{body::BoxBody, client::GrpcService}; use bytes::Bytes; use http::{ @@ -18,6 +17,7 @@ use http::{ Request, Response, }; use hyper::client::connect::Connection as HyperConnection; +use std::hash::Hash; use std::{ fmt, future::Future, @@ -25,9 +25,10 @@ use std::{ task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::stream::Stream; use tower::{ buffer::{self, Buffer}, - discover::Discover, + discover::{Change, Discover}, util::{BoxService, Either}, Service, }; @@ -118,12 +119,16 @@ impl Channel { Self::balance(discover, buffer_size) } - pub fn balance_with_manager(manager: Box) -> Self { - let list = DynamicServiceList::new(manager); + pub fn balance_channel( + changes: impl Stream> + Unpin + Send + 'static, + ) -> Self + where + K: Hash + Eq + Send + Clone + Unpin + 'static, + { + let list = DynamicServiceStream::new(changes); Self::balance(list, DEFAULT_BUFFER_SIZE) } - pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result where C: Service + Send + 'static, diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index 6a660ab34..c970ee314 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -94,7 +94,7 @@ mod service; mod tls; #[doc(inline)] -pub use self::channel::{Channel, Endpoint,EndpointManager}; +pub use self::channel::{Channel, Endpoint}; pub use self::error::Error; #[doc(inline)] pub use self::server::{NamedService, Server}; diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 902c961b1..cc8a78845 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -1,6 +1,6 @@ use super::connection::Connection; use crate::transport::Endpoint; -use crate::transport::EndpointManager; +use std::hash::Hash; use std::{ collections::VecDeque, fmt, @@ -8,6 +8,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use tokio::stream::Stream; use tower::discover::{Change, Discover}; pub(crate) struct ServiceList { @@ -17,10 +18,12 @@ pub(crate) struct ServiceList { i: usize, } -pub(crate) struct DynamicServiceList { - manager: Box, - connecting: - Option<(usize, Pin> + Send + 'static>>)>, +pub(crate) struct DynamicServiceStream { + changes: Box> + Unpin + Send + 'static>, + connecting: Option<( + K, + Pin> + Send + 'static>>, + )>, } impl ServiceList { @@ -77,19 +80,19 @@ impl fmt::Debug for ServiceList { } } - -impl DynamicServiceList { - pub(crate) fn new(manager:Box) -> Self { +impl DynamicServiceStream { + pub(crate) fn new( + changes: impl Stream> + Unpin + Send + 'static, + ) -> Self { Self { - manager, + changes: Box::new(changes), connecting: None, } } } - -impl Discover for DynamicServiceList { - type Key = usize; +impl Discover for DynamicServiceStream { + type Key = K; type Service = Connection; type Error = crate::Error; @@ -98,32 +101,35 @@ impl Discover for DynamicServiceList { cx: &mut Context<'_>, ) -> Poll, Self::Error>> { loop { - let waker = cx.waker().clone(); - if let Some((key,connecting)) = &mut self.connecting { + if let Some((key, connecting)) = &mut self.connecting { let svc = futures_core::ready!(Pin::new(connecting).poll(cx))?; - let key = key.clone(); + let key = key.to_owned(); self.connecting = None; let change = Ok(Change::Insert(key, svc)); return Poll::Ready(change); }; - - if let Some(key) = self.manager.to_remove(){ - let change = Ok(Change::Remove(key)); - return Poll::Ready(change); - }; - - if let Some((key,endpoint)) = self.manager.to_add() { - let mut http = hyper::client::connect::HttpConnector::new(); - http.set_nodelay(endpoint.tcp_nodelay); - http.set_keepalive(endpoint.tcp_keepalive); - - let fut = Connection::new(http, endpoint); - self.connecting = Some((key,Box::pin(fut))); - } else { - waker.wake(); - return Poll::Pending; - } + let c = &mut self.changes; + + match Pin::new(&mut *c).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + let transport_type = crate::transport::error::Kind::Transport; + let err = Box::new(crate::transport::Error::new(transport_type)); + return Poll::Ready(Err(err)); + } + Poll::Ready(Some(change)) => match change { + Change::Insert(k, endpoint) => { + let mut http = hyper::client::connect::HttpConnector::new(); + http.set_nodelay(endpoint.tcp_nodelay); + http.set_keepalive(endpoint.tcp_keepalive); + let fut = Connection::new(http, endpoint); + self.connecting = Some((k, Box::pin(fut))); + continue; + } + Change::Remove(k) => return Poll::Ready(Ok(Change::Remove(k))), + }, + } } } } diff --git a/tonic/src/transport/service/mod.rs b/tonic/src/transport/service/mod.rs index b43b45cbe..050789a78 100644 --- a/tonic/src/transport/service/mod.rs +++ b/tonic/src/transport/service/mod.rs @@ -12,8 +12,8 @@ mod tls; pub(crate) use self::add_origin::AddOrigin; pub(crate) use self::connection::Connection; pub(crate) use self::connector::connector; +pub(crate) use self::discover::DynamicServiceStream; pub(crate) use self::discover::ServiceList; -pub(crate) use self::discover::DynamicServiceList; pub(crate) use self::io::ServerIo; pub(crate) use self::layer::ServiceBuilderExt; pub(crate) use self::router::{Or, Routes}; From 8f8bf4f3c9a06e7241287904aa00e47cff0200d0 Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Mon, 11 May 2020 22:13:42 +0100 Subject: [PATCH 3/9] Load balancing with dynamic discovery * Converting balance_list to use balance_channel internally. modified: examples/Cargo.toml modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs --- examples/Cargo.toml | 2 +- tonic/src/transport/channel/mod.rs | 22 ++++---- tonic/src/transport/service/discover.rs | 71 +++---------------------- tonic/src/transport/service/mod.rs | 1 - 4 files changed, 18 insertions(+), 78 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 00f648acc..3e5a9f4e3 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -124,7 +124,7 @@ prost = "0.6" tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] } futures = { version = "0.3", default-features = false, features = ["alloc"] } async-stream = "0.2" -tower = { version="0.3"} +tower = "0.3" # Required for routeguide serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index ff0836630..990352339 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -9,7 +9,7 @@ pub use endpoint::Endpoint; #[cfg(feature = "tls")] pub use tls::ClientTlsConfig; -use super::service::{Connection, DynamicServiceStream, ServiceList}; +use super::service::{Connection, DynamicServiceStream}; use crate::{body::BoxBody, client::GrpcService}; use bytes::Bytes; use http::{ @@ -26,6 +26,8 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::stream::Stream; + + use tower::{ buffer::{self, Buffer}, discover::{Change, Discover}, @@ -105,20 +107,16 @@ impl Channel { /// /// This creates a [`Channel`] that will load balance accross all the /// provided endpoints. - pub fn balance_list(list: impl Iterator) -> Self { - let list = list.collect::>(); - - let buffer_size = list - .iter() - .next() - .and_then(|e| e.buffer_size) - .unwrap_or(DEFAULT_BUFFER_SIZE); - - let discover = ServiceList::new(list); + pub fn balance_list(list: impl Iterator + Send+'static) -> Self { + let list = list.map(|endpoint| Change::Insert(endpoint.uri.clone(),endpoint)); - Self::balance(discover, buffer_size) + Self::balance_channel(tokio::stream::iter(list)) } + + /// Balance a list of [`Endpoint`]'s. + /// + /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. pub fn balance_channel( changes: impl Stream> + Unpin + Send + 'static, ) -> Self diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index cc8a78845..d0f1f3d2a 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -2,8 +2,6 @@ use super::connection::Connection; use crate::transport::Endpoint; use std::hash::Hash; use std::{ - collections::VecDeque, - fmt, future::Future, pin::Pin, task::{Context, Poll}, @@ -11,12 +9,6 @@ use std::{ use tokio::stream::Stream; use tower::discover::{Change, Discover}; -pub(crate) struct ServiceList { - list: VecDeque, - connecting: - Option> + Send + 'static>>>, - i: usize, -} pub(crate) struct DynamicServiceStream { changes: Box> + Unpin + Send + 'static>, @@ -26,59 +18,7 @@ pub(crate) struct DynamicServiceStream { )>, } -impl ServiceList { - pub(crate) fn new(list: Vec) -> Self { - Self { - list: list.into(), - connecting: None, - i: 0, - } - } -} - -impl Discover for ServiceList { - type Key = usize; - type Service = Connection; - type Error = crate::Error; - - fn poll_discover( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - loop { - if let Some(connecting) = &mut self.connecting { - let svc = futures_core::ready!(Pin::new(connecting).poll(cx))?; - self.connecting = None; - - let i = self.i; - self.i += 1; - - let change = Ok(Change::Insert(i, svc)); - - return Poll::Ready(change); - } - - if let Some(endpoint) = self.list.pop_front() { - let mut http = hyper::client::connect::HttpConnector::new(); - http.set_nodelay(endpoint.tcp_nodelay); - http.set_keepalive(endpoint.tcp_keepalive); - let fut = Connection::new(http, endpoint); - self.connecting = Some(Box::pin(fut)); - } else { - return Poll::Pending; - } - } - } -} - -impl fmt::Debug for ServiceList { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ServiceList") - .field("list", &self.list) - .finish() - } -} impl DynamicServiceStream { pub(crate) fn new( @@ -112,11 +52,14 @@ impl Discover for DynamicServiceStream { let c = &mut self.changes; match Pin::new(&mut *c).poll_next(cx) { - Poll::Pending => return Poll::Pending, + Poll::Pending => { + return Poll::Pending + }, Poll::Ready(None) => { - let transport_type = crate::transport::error::Kind::Transport; - let err = Box::new(crate::transport::Error::new(transport_type)); - return Poll::Ready(Err(err)); + // let transport_type = crate::transport::error::Kind::Transport; + // let err = Box::new(crate::transport::Error::new(transport_type)); + // return Poll::Ready(Err(err)); + return Poll::Pending } Poll::Ready(Some(change)) => match change { Change::Insert(k, endpoint) => { diff --git a/tonic/src/transport/service/mod.rs b/tonic/src/transport/service/mod.rs index 050789a78..92453cdbf 100644 --- a/tonic/src/transport/service/mod.rs +++ b/tonic/src/transport/service/mod.rs @@ -13,7 +13,6 @@ pub(crate) use self::add_origin::AddOrigin; pub(crate) use self::connection::Connection; pub(crate) use self::connector::connector; pub(crate) use self::discover::DynamicServiceStream; -pub(crate) use self::discover::ServiceList; pub(crate) use self::io::ServerIo; pub(crate) use self::layer::ServiceBuilderExt; pub(crate) use self::router::{Or, Routes}; From 523ec8f44421882e2a50f2c70377e10ea0a59f49 Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Mon, 11 May 2020 22:38:41 +0100 Subject: [PATCH 4/9] Load balancing with dynamic discovery * keeping fmt happy Changes to be committed: modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs --- tonic/src/transport/channel/mod.rs | 8 +++----- tonic/src/transport/service/discover.rs | 27 ++++++++++--------------- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index 990352339..fa98cdcd4 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -27,7 +27,6 @@ use std::{ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::stream::Stream; - use tower::{ buffer::{self, Buffer}, discover::{Change, Discover}, @@ -107,16 +106,15 @@ impl Channel { /// /// This creates a [`Channel`] that will load balance accross all the /// provided endpoints. - pub fn balance_list(list: impl Iterator + Send+'static) -> Self { - let list = list.map(|endpoint| Change::Insert(endpoint.uri.clone(),endpoint)); + pub fn balance_list(list: impl Iterator + Send + 'static) -> Self { + let list = list.map(|endpoint| Change::Insert(endpoint.uri.clone(), endpoint)); Self::balance_channel(tokio::stream::iter(list)) } - /// Balance a list of [`Endpoint`]'s. /// - /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. + /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. pub fn balance_channel( changes: impl Stream> + Unpin + Send + 'static, ) -> Self diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 165017540..7ceef2070 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -10,7 +10,6 @@ use std::{ use tokio::stream::Stream; use tower::discover::{Change, Discover}; - pub(crate) struct DynamicServiceStream { changes: Box> + Unpin + Send + 'static>, connecting: Option<( @@ -19,8 +18,6 @@ pub(crate) struct DynamicServiceStream { )>, } - - impl DynamicServiceStream { pub(crate) fn new( changes: impl Stream> + Unpin + Send + 'static, @@ -52,32 +49,30 @@ impl Discover for DynamicServiceStream { let c = &mut self.changes; match Pin::new(&mut *c).poll_next(cx) { - Poll::Pending => { - return Poll::Pending - }, + Poll::Pending => return Poll::Pending, Poll::Ready(None) => { // let transport_type = crate::transport::error::Kind::Transport; // let err = Box::new(crate::transport::Error::new(transport_type)); - // return Poll::Ready(Err(err)); - return Poll::Pending + // return Poll::Ready(Err(err)); + return Poll::Pending; } Poll::Ready(Some(change)) => match change { Change::Insert(k, endpoint) => { let mut http = hyper::client::connect::HttpConnector::new(); http.set_nodelay(endpoint.tcp_nodelay); http.set_keepalive(endpoint.tcp_keepalive); - http.enforce_http(false); - #[cfg(feature = "tls")] - let connector = service::connector(http, endpoint.tls.clone()); - - #[cfg(not(feature = "tls"))] - let connector = service::connector(http); - let fut = Connection::new(connector, endpoint); + http.enforce_http(false); + #[cfg(feature = "tls")] + let connector = service::connector(http, endpoint.tls.clone()); + + #[cfg(not(feature = "tls"))] + let connector = service::connector(http); + let fut = Connection::new(connector, endpoint); self.connecting = Some((k, Box::pin(fut))); continue; } Change::Remove(k) => return Poll::Ready(Ok(Change::Remove(k))), - } + }, } } } From f69870e0b24e72a3dae201206c21580bb8fe859a Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Thu, 14 May 2020 21:02:09 +0100 Subject: [PATCH 5/9] Load balancing with dynamic discovery - core review comments Changes to be committed: modified: examples/src/load_balance_with_discovery/client.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs --- .../src/load_balance_with_discovery/client.rs | 4 +--- tonic/src/transport/channel/mod.rs | 22 ++++++++++--------- tonic/src/transport/service/discover.rs | 9 +++----- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/examples/src/load_balance_with_discovery/client.rs b/examples/src/load_balance_with_discovery/client.rs index 9b9a6fb1c..571eae9dc 100644 --- a/examples/src/load_balance_with_discovery/client.rs +++ b/examples/src/load_balance_with_discovery/client.rs @@ -18,9 +18,7 @@ async fn main() -> Result<(), Box> { let e1 = Endpoint::from_static("http://[::1]:50051").timeout(std::time::Duration::from_secs(1)); let e2 = Endpoint::from_static("http://[::1]:50052").timeout(std::time::Duration::from_secs(1)); - let (mut rx, tx) = tokio::sync::mpsc::channel(10); - - let channel = Channel::balance_channel(tx); + let (channel,mut rx) = Channel::balance_channel(10); let mut client = EchoClient::new(channel); let done = Arc::new(AtomicBool::new(false)); diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index fa98cdcd4..0b5653522 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -25,7 +25,6 @@ use std::{ task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::stream::Stream; use tower::{ buffer::{self, Buffer}, @@ -106,23 +105,26 @@ impl Channel { /// /// This creates a [`Channel`] that will load balance accross all the /// provided endpoints. - pub fn balance_list(list: impl Iterator + Send + 'static) -> Self { - let list = list.map(|endpoint| Change::Insert(endpoint.uri.clone(), endpoint)); - - Self::balance_channel(tokio::stream::iter(list)) + pub fn balance_list(list: impl Iterator) -> Self { + + let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); + list.for_each(|endpoint|{ + let _res = tx.try_send(Change::Insert(endpoint.uri.clone(),endpoint)); + }); + + channel } /// Balance a list of [`Endpoint`]'s. /// /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. - pub fn balance_channel( - changes: impl Stream> + Unpin + Send + 'static, - ) -> Self + pub fn balance_channel(capacity:usize) -> (Self, tokio::sync::mpsc::Sender>) where K: Hash + Eq + Send + Clone + Unpin + 'static, { - let list = DynamicServiceStream::new(changes); - Self::balance(list, DEFAULT_BUFFER_SIZE) + let (tx,rx) = tokio::sync::mpsc::channel(capacity); + let list = DynamicServiceStream::new(rx); + (Self::balance(list, DEFAULT_BUFFER_SIZE),tx) } pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 7ceef2070..7807ecee9 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -11,7 +11,7 @@ use tokio::stream::Stream; use tower::discover::{Change, Discover}; pub(crate) struct DynamicServiceStream { - changes: Box> + Unpin + Send + 'static>, + changes: tokio::sync::mpsc::Receiver>, connecting: Option<( K, Pin> + Send + 'static>>, @@ -20,10 +20,10 @@ pub(crate) struct DynamicServiceStream { impl DynamicServiceStream { pub(crate) fn new( - changes: impl Stream> + Unpin + Send + 'static, + changes: tokio::sync::mpsc::Receiver>, ) -> Self { Self { - changes: Box::new(changes), + changes, connecting: None, } } @@ -51,9 +51,6 @@ impl Discover for DynamicServiceStream { match Pin::new(&mut *c).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { - // let transport_type = crate::transport::error::Kind::Transport; - // let err = Box::new(crate::transport::Error::new(transport_type)); - // return Poll::Ready(Err(err)); return Poll::Pending; } Poll::Ready(Some(change)) => match change { From 86187057c8a49ea637ad5e2cb9734f24677017cd Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Thu, 14 May 2020 21:04:15 +0100 Subject: [PATCH 6/9] Load balancing with dynamic discovery - making fmt happy --- .../src/load_balance_with_discovery/client.rs | 2 +- tonic/src/transport/channel/mod.rs | 21 ++++++++++--------- tonic/src/transport/service/discover.rs | 4 +--- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/examples/src/load_balance_with_discovery/client.rs b/examples/src/load_balance_with_discovery/client.rs index 571eae9dc..ae3171bba 100644 --- a/examples/src/load_balance_with_discovery/client.rs +++ b/examples/src/load_balance_with_discovery/client.rs @@ -18,7 +18,7 @@ async fn main() -> Result<(), Box> { let e1 = Endpoint::from_static("http://[::1]:50051").timeout(std::time::Duration::from_secs(1)); let e2 = Endpoint::from_static("http://[::1]:50052").timeout(std::time::Duration::from_secs(1)); - let (channel,mut rx) = Channel::balance_channel(10); + let (channel, mut rx) = Channel::balance_channel(10); let mut client = EchoClient::new(channel); let done = Arc::new(AtomicBool::new(false)); diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index 0b5653522..6602a5dbe 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -105,26 +105,27 @@ impl Channel { /// /// This creates a [`Channel`] that will load balance accross all the /// provided endpoints. - pub fn balance_list(list: impl Iterator) -> Self { - + pub fn balance_list(list: impl Iterator) -> Self { let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); - list.for_each(|endpoint|{ - let _res = tx.try_send(Change::Insert(endpoint.uri.clone(),endpoint)); - }); - - channel + list.for_each(|endpoint| { + let _res = tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)); + }); + + channel } /// Balance a list of [`Endpoint`]'s. /// /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. - pub fn balance_channel(capacity:usize) -> (Self, tokio::sync::mpsc::Sender>) + pub fn balance_channel( + capacity: usize, + ) -> (Self, tokio::sync::mpsc::Sender>) where K: Hash + Eq + Send + Clone + Unpin + 'static, { - let (tx,rx) = tokio::sync::mpsc::channel(capacity); + let (tx, rx) = tokio::sync::mpsc::channel(capacity); let list = DynamicServiceStream::new(rx); - (Self::balance(list, DEFAULT_BUFFER_SIZE),tx) + (Self::balance(list, DEFAULT_BUFFER_SIZE), tx) } pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 7807ecee9..4b59825ae 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -19,9 +19,7 @@ pub(crate) struct DynamicServiceStream { } impl DynamicServiceStream { - pub(crate) fn new( - changes: tokio::sync::mpsc::Receiver>, - ) -> Self { + pub(crate) fn new(changes: tokio::sync::mpsc::Receiver>) -> Self { Self { changes, connecting: None, From 078412bf9d667c5a9be506c07da515e03ecb9a0c Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Thu, 14 May 2020 22:49:09 +0100 Subject: [PATCH 7/9] Load balancing with dynamic discovery - code review comments * Removing Unpin for K Changes to be committed: modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs --- tonic/src/transport/channel/mod.rs | 2 +- tonic/src/transport/service/discover.rs | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index 6602a5dbe..bd2a819b5 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -121,7 +121,7 @@ impl Channel { capacity: usize, ) -> (Self, tokio::sync::mpsc::Sender>) where - K: Hash + Eq + Send + Clone + Unpin + 'static, + K: Hash + Eq + Send + Clone + 'static, { let (tx, rx) = tokio::sync::mpsc::channel(capacity); let list = DynamicServiceStream::new(rx); diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 4b59825ae..c55839e4e 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -10,7 +10,7 @@ use std::{ use tokio::stream::Stream; use tower::discover::{Change, Discover}; -pub(crate) struct DynamicServiceStream { +pub(crate) struct DynamicServiceStream { changes: tokio::sync::mpsc::Receiver>, connecting: Option<( K, @@ -18,7 +18,7 @@ pub(crate) struct DynamicServiceStream { )>, } -impl DynamicServiceStream { +impl DynamicServiceStream { pub(crate) fn new(changes: tokio::sync::mpsc::Receiver>) -> Self { Self { changes, @@ -27,7 +27,7 @@ impl DynamicServiceStream { } } -impl Discover for DynamicServiceStream { +impl Discover for DynamicServiceStream { type Key = K; type Service = Connection; type Error = crate::Error; @@ -72,3 +72,5 @@ impl Discover for DynamicServiceStream { } } } + +impl Unpin for DynamicServiceStream {} From 821abd97f44584952990ef833788ec23779c6a4d Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Thu, 14 May 2020 22:55:57 +0100 Subject: [PATCH 8/9] Load balancing with dynamic discovery - making fmt happy again --- tonic/src/transport/service/discover.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index c55839e4e..f38be3904 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -73,4 +73,4 @@ impl Discover for DynamicServiceStream { } } -impl Unpin for DynamicServiceStream {} +impl Unpin for DynamicServiceStream {} From 7e63868f304127a47adb05d1bef0cd8ea93a3224 Mon Sep 17 00:00:00 2001 From: Dawid Nowak Date: Fri, 15 May 2020 19:43:58 +0100 Subject: [PATCH 9/9] Load balancing with dynamic discovery - code review comments 2 modified: examples/Cargo.toml modified: examples/README.md renamed: examples/src/load_balance_with_discovery/client.rs -> examples/src/dynamic_load_balance/client.rs renamed: examples/src/load_balance_with_discovery/server.rs -> examples/src/dynamic_load_balance/server.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs --- examples/Cargo.toml | 8 ++++---- examples/README.md | 16 +++++++++++++++- .../client.rs | 4 ++-- .../server.rs | 0 tonic/src/transport/channel/mod.rs | 16 +++++++++------- tonic/src/transport/service/discover.rs | 10 ++++++---- 6 files changed, 36 insertions(+), 18 deletions(-) rename examples/src/{load_balance_with_discovery => dynamic_load_balance}/client.rs (92%) rename examples/src/{load_balance_with_discovery => dynamic_load_balance}/server.rs (100%) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 3e5a9f4e3..0cd42cbae 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -43,12 +43,12 @@ name = "load-balance-server" path = "src/load_balance/server.rs" [[bin]] -name = "load-balance-client-discovery" -path = "src/load_balance_with_discovery/client.rs" +name = "dynamic-load-balance-client" +path = "src/dynamic_load_balance/client.rs" [[bin]] -name = "load-balance-server-discovery" -path = "src/load_balance_with_discovery/server.rs" +name = "dynamic-load-balance-server" +path = "src/dynamic_load_balance/server.rs" [[bin]] name = "tls-client" diff --git a/examples/README.md b/examples/README.md index 1783b4bd3..784af6279 100644 --- a/examples/README.md +++ b/examples/README.md @@ -44,7 +44,7 @@ $ cargo run --bin authentication-client $ cargo run --bin authentication-server ``` -## Load balance +## Load Balance ### Client @@ -58,6 +58,20 @@ $ cargo run --bin load-balance-client $ cargo run --bin load-balance-server ``` +## Dyanmic Load Balance + +### Client + +```bash +$ cargo run --bin dynamic-load-balance-client +``` + +### Server + +```bash +$ cargo run --bin dynamic-load-balance-server +``` + ## TLS (rustls) ### Client diff --git a/examples/src/load_balance_with_discovery/client.rs b/examples/src/dynamic_load_balance/client.rs similarity index 92% rename from examples/src/load_balance_with_discovery/client.rs rename to examples/src/dynamic_load_balance/client.rs index ae3171bba..4ffa6a66d 100644 --- a/examples/src/load_balance_with_discovery/client.rs +++ b/examples/src/dynamic_load_balance/client.rs @@ -15,8 +15,8 @@ use tower::discover::Change; #[tokio::main] async fn main() -> Result<(), Box> { - let e1 = Endpoint::from_static("http://[::1]:50051").timeout(std::time::Duration::from_secs(1)); - let e2 = Endpoint::from_static("http://[::1]:50052").timeout(std::time::Duration::from_secs(1)); + let e1 = Endpoint::from_static("http://[::1]:50051"); + let e2 = Endpoint::from_static("http://[::1]:50052"); let (channel, mut rx) = Channel::balance_channel(10); let mut client = EchoClient::new(channel); diff --git a/examples/src/load_balance_with_discovery/server.rs b/examples/src/dynamic_load_balance/server.rs similarity index 100% rename from examples/src/load_balance_with_discovery/server.rs rename to examples/src/dynamic_load_balance/server.rs diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index bd2a819b5..773f8f20f 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -17,14 +17,17 @@ use http::{ Request, Response, }; use hyper::client::connect::Connection as HyperConnection; -use std::hash::Hash; use std::{ fmt, future::Future, + hash::Hash, pin::Pin, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::mpsc::{channel, Sender}, +}; use tower::{ buffer::{self, Buffer}, @@ -108,7 +111,8 @@ impl Channel { pub fn balance_list(list: impl Iterator) -> Self { let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); list.for_each(|endpoint| { - let _res = tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)); + tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)) + .unwrap(); }); channel @@ -117,13 +121,11 @@ impl Channel { /// Balance a list of [`Endpoint`]'s. /// /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. - pub fn balance_channel( - capacity: usize, - ) -> (Self, tokio::sync::mpsc::Sender>) + pub fn balance_channel(capacity: usize) -> (Self, Sender>) where K: Hash + Eq + Send + Clone + 'static, { - let (tx, rx) = tokio::sync::mpsc::channel(capacity); + let (tx, rx) = channel(capacity); let list = DynamicServiceStream::new(rx); (Self::balance(list, DEFAULT_BUFFER_SIZE), tx) } diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index f38be3904..bed7f1310 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -1,17 +1,19 @@ use super::super::service; use super::connection::Connection; use crate::transport::Endpoint; -use std::hash::Hash; + use std::{ future::Future, + hash::Hash, pin::Pin, task::{Context, Poll}, }; -use tokio::stream::Stream; +use tokio::{stream::Stream, sync::mpsc::Receiver}; + use tower::discover::{Change, Discover}; pub(crate) struct DynamicServiceStream { - changes: tokio::sync::mpsc::Receiver>, + changes: Receiver>, connecting: Option<( K, Pin> + Send + 'static>>, @@ -19,7 +21,7 @@ pub(crate) struct DynamicServiceStream { } impl DynamicServiceStream { - pub(crate) fn new(changes: tokio::sync::mpsc::Receiver>) -> Self { + pub(crate) fn new(changes: Receiver>) -> Self { Self { changes, connecting: None,