-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(transport): Dynamic load balancing #341
feat(transport): Dynamic load balancing #341
Conversation
Attempt to add a feature to Tonic so client side load balancing can be done over a group of endpoints that change dynamically over time. An external service which is responsible for discovering new endpoints and monitoring their health could make a decision to add/remove an endpoint from the load balancing group in Tonic client. modified: examples/Cargo.toml new file: examples/src/load_balance_with_discovery/client.rs new file: examples/src/load_balance_with_discovery/server.rs modified: tonic/src/transport/channel/endpoint.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs
examples/Cargo.toml
Outdated
@@ -116,14 +124,14 @@ prost = "0.6" | |||
tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] } | |||
futures = { version = "0.3", default-features = false, features = ["alloc"] } | |||
async-stream = "0.2" | |||
tower = "0.3" | |||
tower = { version="0.3"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tower = { version="0.3"} | |
tower = "0.3" |
This seems like an interesting start, I am a bit cautious about adding more complex types instead of just exposing the ability to pass in your own discover? I'd rather expose some sort of discover or channel hook than a endpoint manager. What do you think? |
I agree that passing discover would be cleaner/better. For time being I just wanted to keep it as close to existing implementation. I looked into passing discover directly as there already is a method balance in channel::mod.rs that does exactly that. If we wanted to expose Discover directly we would need to make Connection public or seek some other solution which converts from Endpoint to Connection. Ultimately, I am not entirely sure how to solve this conundrum :) |
@dawid-nowak could we have a |
I think we are in agreement that ideally that would be the case. Expose Discover that produces the stream of Change events. Is that correct ? |
@dawid-nowak yeah, so I think instead of exposing a This way we can implement discover ourselves by taking the stream of the channel change events and mapping them to our internal connection type. So internally we would implement a new discover type similar to Hopefully that makes a bit of sense, I am totally happy to sketch some thing out a bit more :) |
yeah, that makes sense, |
basically I think we can do something similar to this https://docs.rs/tonic/0.2.1/tonic/transport/channel/struct.Channel.html#method.balance_list. Where we have: fn balance_channel::<K>() -> (Sender<Change<K, Endpoint>>, Self) This then would allow the user to: let (tx, channel) = Channel::balance_channel();
tx.send(("some_key", some_endpoint)).await.unwrap();
channel.call(...).await; We then implement a new discover type that is similar to |
Take 2 after code review. Changing main interface to take a stream of tower:Change(K,Endpoint) which will get converted to Change:(K,Connection) modified: examples/src/load_balance_with_discovery/client.rs modified: tonic/src/transport/channel/endpoint.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs
Take 2. I think we are getting closer. |
* Converting balance_list to use balance_channel internally. modified: examples/Cargo.toml modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs
…th_dynamic_discovery Conflicts: tonic/src/transport/service/discover.rs
* keeping fmt happy Changes to be committed: modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great! I suggested a few changes, let me know if you have any questions :)
Changes to be committed: modified: examples/src/load_balance_with_discovery/client.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs
* Removing Unpin for K Changes to be committed: modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great! left a few comments, I think we are almost there!
examples/Cargo.toml
Outdated
@@ -42,6 +42,14 @@ path = "src/load_balance/client.rs" | |||
name = "load-balance-server" | |||
path = "src/load_balance/server.rs" | |||
|
|||
[[bin]] | |||
name = "load-balance-client-discovery" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about naming this dynamic-load-balance
? Might be a bit more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
tonic/src/transport/channel/mod.rs
Outdated
use crate::{body::BoxBody, client::GrpcService}; | ||
use bytes::Bytes; | ||
use http::{ | ||
uri::{InvalidUri, Uri}, | ||
Request, Response, | ||
}; | ||
use hyper::client::connect::Connection as HyperConnection; | ||
use std::hash::Hash; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit: can we move this import into the one below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
tonic/src/transport/channel/mod.rs
Outdated
.unwrap_or(DEFAULT_BUFFER_SIZE); | ||
let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); | ||
list.for_each(|endpoint| { | ||
let _res = tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably should just unwrap here if there is an error we want to know because that would mean there is a bug in this code!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
tonic/src/transport/channel/mod.rs
Outdated
/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. | ||
pub fn balance_channel<K>( | ||
capacity: usize, | ||
) -> (Self, tokio::sync::mpsc::Sender<Change<K, Endpoint>>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want to just import this type, its making this signature a bit long 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -1,34 +1,34 @@ | |||
use super::super::service; | |||
use super::connection::Connection; | |||
use crate::transport::Endpoint; | |||
use std::hash::Hash; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Option<Pin<Box<dyn Future<Output = Result<Connection, crate::Error>> + Send + 'static>>>, | ||
i: usize, | ||
pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> { | ||
changes: tokio::sync::mpsc::Receiver<Change<K, Endpoint>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here can we import this type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let e1 = Endpoint::from_static("http://[::1]:50051").timeout(std::time::Duration::from_secs(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the timeout here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just me playing around, removed
modified: examples/Cargo.toml modified: examples/README.md renamed: examples/src/load_balance_with_discovery/client.rs -> examples/src/dynamic_load_balance/client.rs renamed: examples/src/load_balance_with_discovery/server.rs -> examples/src/dynamic_load_balance/server.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fantastic! Thanks!
🤘 |
* examples: update to `tracing` 0.1.14, use `#[instrument]` Now that `tracing-attributes`'s `#[instrument]` macro plays nicely with `async-trait`, we can update the tracing example to use `instrument`. This lets us simplify the events emitted in the example. Signed-off-by: Eliza Weisman <[email protected]> * feat(transport): Dynamic load balancing (#341) * Fix typo (#356) Co-authored-by: Dawid Nowak <[email protected]> Co-authored-by: Paulo Duarte <[email protected]>
Attempt to add a feature to Tonic so client side load balancing can be done over a
group of endpoints that change dynamically over time.
Motivation
An external service which is responsible for discovering new endpoints
and monitoring their health could make a decision to add/remove an
endpoint from the load balancing group in Tonic client. At the moment, the tonic only allows to load balance across endpoints from a list.
Solution
The solution is to expose a new method on Channel interface called balance_with_manager which will take a trait EventManager defined in Transport. EventManager will hold a lists of endpoint which need to be added/removed from balance service in tower.
The approach is very similar to existing Channel::balance_list.
##Comments
Comments on how to improve it are welcome :)