Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): Dynamic load balancing #341

Merged

Conversation

dawid-nowak
Copy link
Contributor

Attempt to add a feature to Tonic so client side load balancing can be done over a
group of endpoints that change dynamically over time.

Motivation

An external service which is responsible for discovering new endpoints
and monitoring their health could make a decision to add/remove an
endpoint from the load balancing group in Tonic client. At the moment, the tonic only allows to load balance across endpoints from a list.

Solution

The solution is to expose a new method on Channel interface called balance_with_manager which will take a trait EventManager defined in Transport. EventManager will hold a lists of endpoint which need to be added/removed from balance service in tower.
The approach is very similar to existing Channel::balance_list.

##Comments
Comments on how to improve it are welcome :)

Attempt to add a feature to Tonic so client side load balancing can be done over a
group of endpoints that change dynamically over time.

An external service which is responsible for discovering new endpoints
and monitoring their health could make a decision to add/remove an
endpoint from the load balancing group in Tonic client.

modified:   examples/Cargo.toml
new file:   examples/src/load_balance_with_discovery/client.rs
new file:   examples/src/load_balance_with_discovery/server.rs
modified:   tonic/src/transport/channel/endpoint.rs
modified:   tonic/src/transport/channel/mod.rs
modified:   tonic/src/transport/mod.rs
modified:   tonic/src/transport/service/discover.rs
modified:   tonic/src/transport/service/mod.rs
@@ -116,14 +124,14 @@ prost = "0.6"
tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
async-stream = "0.2"
tower = "0.3"
tower = { version="0.3"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tower = { version="0.3"}
tower = "0.3"

@LucioFranco
Copy link
Member

This seems like an interesting start, I am a bit cautious about adding more complex types instead of just exposing the ability to pass in your own discover? I'd rather expose some sort of discover or channel hook than a endpoint manager. What do you think?

@dawid-nowak
Copy link
Contributor Author

I agree that passing discover would be cleaner/better.

For time being I just wanted to keep it as close to existing implementation. I looked into passing discover directly as there already is a method balance in channel::mod.rs that does exactly that.
The problem is that method takes Connection which is not public. The code in balance_list and ServiceList which implements Discover converts public Endpoint to private Connection.

If we wanted to expose Discover directly we would need to make Connection public or seek some other solution which converts from Endpoint to Connection.

Ultimately, I am not entirely sure how to solve this conundrum :)

@LucioFranco
Copy link
Member

@dawid-nowak could we have a (Key, Endpoint) pair? That is sent over the channel via the enum https://docs.rs/tower-discover/0.3.0/tower_discover/enum.Change.html. We can then use endpoint which is public then internally translate it to a connection? That should work right?

@dawid-nowak
Copy link
Contributor Author

I think we are in agreement that ideally that would be the case.

Expose Discover that produces the stream of Change events.
I had a dig around and I think the main obstacle is that tower::Discover trait takes Service type. At the moment Endpoint is not a service so I am not sure how would that work.

Is that correct ?

@LucioFranco
Copy link
Member

@dawid-nowak yeah, so I think instead of exposing a D: Discover we just assume that the user always wants to configure the tonic clients via Endpoint. This way we can just have a channel::<Change<SomeKey, Endpoint>> we can then pass out the tx side and let the user push change events.

This way we can implement discover ourselves by taking the stream of the channel change events and mapping them to our internal connection type. So internally we would implement a new discover type similar to ServiceList but that wraps a the rx end of a channel of change enum where the value is Endpoint.

Hopefully that makes a bit of sense, I am totally happy to sketch some thing out a bit more :)

@dawid-nowak
Copy link
Contributor Author

yeah, that makes sense,
will need to figure out how to map those streams :) so if you have examples handy that would speed up the process

@LucioFranco
Copy link
Member

basically I think we can do something similar to this https://docs.rs/tonic/0.2.1/tonic/transport/channel/struct.Channel.html#method.balance_list. Where we have:

fn balance_channel::<K>() -> (Sender<Change<K, Endpoint>>, Self)

This then would allow the user to:

let (tx, channel) = Channel::balance_channel();

tx.send(("some_key", some_endpoint)).await.unwrap();

channel.call(...).await;

We then implement a new discover type that is similar to ServiceList but uses a channel. We can then do something very similar to this https://github.com/hyperium/tonic/blob/master/tonic/src/transport/channel/mod.rs#L115

Take 2 after code review. Changing main interface to take a stream of
tower:Change(K,Endpoint) which will get converted to Change:(K,Connection)

modified:   examples/src/load_balance_with_discovery/client.rs
modified:   tonic/src/transport/channel/endpoint.rs
modified:   tonic/src/transport/channel/mod.rs
modified:   tonic/src/transport/mod.rs
modified:   tonic/src/transport/service/discover.rs
modified:   tonic/src/transport/service/mod.rs
@dawid-nowak
Copy link
Contributor Author

Take 2. I think we are getting closer.

* Converting balance_list to use balance_channel internally.

modified:   examples/Cargo.toml
modified:   tonic/src/transport/channel/mod.rs
modified:   tonic/src/transport/service/discover.rs
modified:   tonic/src/transport/service/mod.rs
…th_dynamic_discovery

Conflicts:
tonic/src/transport/service/discover.rs
* keeping fmt happy

Changes to be committed:
	modified:   tonic/src/transport/channel/mod.rs
	modified:   tonic/src/transport/service/discover.rs
Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great! I suggested a few changes, let me know if you have any questions :)

tonic/src/transport/channel/mod.rs Outdated Show resolved Hide resolved
tonic/src/transport/channel/mod.rs Outdated Show resolved Hide resolved
tonic/src/transport/service/discover.rs Outdated Show resolved Hide resolved
Changes to be committed:
	modified:   examples/src/load_balance_with_discovery/client.rs
	modified:   tonic/src/transport/channel/mod.rs
	modified:   tonic/src/transport/service/discover.rs
@dawid-nowak dawid-nowak requested a review from LucioFranco May 14, 2020 21:44
* Removing Unpin for K

Changes to be committed:
	modified:   tonic/src/transport/channel/mod.rs
	modified:   tonic/src/transport/service/discover.rs
Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great! left a few comments, I think we are almost there!

@@ -42,6 +42,14 @@ path = "src/load_balance/client.rs"
name = "load-balance-server"
path = "src/load_balance/server.rs"

[[bin]]
name = "load-balance-client-discovery"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about naming this dynamic-load-balance? Might be a bit more clear

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

use crate::{body::BoxBody, client::GrpcService};
use bytes::Bytes;
use http::{
uri::{InvalidUri, Uri},
Request, Response,
};
use hyper::client::connect::Connection as HyperConnection;
use std::hash::Hash;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: can we move this import into the one below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.unwrap_or(DEFAULT_BUFFER_SIZE);
let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
list.for_each(|endpoint| {
let _res = tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably should just unwrap here if there is an error we want to know because that would mean there is a bug in this code!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
pub fn balance_channel<K>(
capacity: usize,
) -> (Self, tokio::sync::mpsc::Sender<Change<K, Endpoint>>)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to just import this type, its making this signature a bit long 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -1,34 +1,34 @@
use super::super::service;
use super::connection::Connection;
use crate::transport::Endpoint;
use std::hash::Hash;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Option<Pin<Box<dyn Future<Output = Result<Connection, crate::Error>> + Send + 'static>>>,
i: usize,
pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
changes: tokio::sync::mpsc::Receiver<Change<K, Endpoint>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here can we import this type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let e1 = Endpoint::from_static("http://[::1]:50051").timeout(std::time::Duration::from_secs(1));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the timeout here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just me playing around, removed

	modified:   examples/Cargo.toml
	modified:   examples/README.md
	renamed:    examples/src/load_balance_with_discovery/client.rs -> examples/src/dynamic_load_balance/client.rs
	renamed:    examples/src/load_balance_with_discovery/server.rs -> examples/src/dynamic_load_balance/server.rs
	modified:   tonic/src/transport/channel/mod.rs
	modified:   tonic/src/transport/service/discover.rs
Copy link
Contributor Author

@dawid-nowak dawid-nowak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fantastic! Thanks!

@LucioFranco LucioFranco changed the title Load balancing with dynamic discovery feat(transport): Dynamic load balancing May 15, 2020
@LucioFranco LucioFranco merged commit 85ae0a4 into hyperium:master May 15, 2020
@dawid-nowak
Copy link
Contributor Author

🤘

@dawid-nowak dawid-nowak deleted the load_balancing_with_dynamic_discovery branch May 15, 2020 19:14
hawkw pushed a commit to hawkw/tonic that referenced this pull request Jul 10, 2020
LucioFranco pushed a commit that referenced this pull request Jul 10, 2020
* examples: update to `tracing` 0.1.14, use `#[instrument]`

Now that `tracing-attributes`'s `#[instrument]` macro plays nicely with
`async-trait`, we can update the tracing example to use `instrument`.
This lets us simplify the events emitted in the example.

Signed-off-by: Eliza Weisman <[email protected]>

* feat(transport): Dynamic load balancing (#341)

* Fix typo (#356)

Co-authored-by: Dawid Nowak <[email protected]>
Co-authored-by: Paulo Duarte <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants