Skip to content

Commit

Permalink
feat(transport): Change channel connect to be async (#107)
Browse files Browse the repository at this point in the history
This makes it so you can check if the initial connection
is established. Before this we used reconnect which would
lazily attempt to connect. So if you were trying to connect
to a non existant Server you wouldn't find out until after
you attempted your first RPC. This simplifies everything
by allowing you connect before creating the RPC client.

BREAKING CHANGE: `Endpoint::channel` was removed in favor of
an async `Endpoint::connect`.
  • Loading branch information
LucioFranco authored Oct 31, 2019
1 parent 108bad0 commit 5c2f4db
Show file tree
Hide file tree
Showing 18 changed files with 243 additions and 38 deletions.
5 changes: 3 additions & 2 deletions tonic-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ fn generate_connect(service_ident: &syn::Ident) -> TokenStream {
quote! {
impl #service_ident<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
tonic::transport::Endpoint::new(dst).map(|c| Self::new(c.channel()))
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion tonic-examples/src/authentication/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
HeaderValue::from_static("Bearer some-secret-token"),
);
})
.channel();
.connect()
.await?;

let mut client = EchoClient::new(channel);

Expand Down
3 changes: 2 additions & 1 deletion tonic-examples/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
headers.insert("authorization", header_value.clone());
})
.tls_config(&tls_config)
.channel();
.connect()
.await?;

let mut service = PublisherClient::new(channel);

Expand Down
2 changes: 1 addition & 1 deletion tonic-examples/src/helloworld/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hello_world::{client::GreeterClient, HelloRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = GreeterClient::connect("http://[::1]:50051")?;
let mut client = GreeterClient::connect("http://[::1]:50051").await?;

let request = tonic::Request::new(HelloRequest {
name: "Tonic".into(),
Expand Down
4 changes: 3 additions & 1 deletion tonic-examples/src/multiplex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use tonic::transport::Endpoint;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let channel = Endpoint::from_static("http://[::1]:50051").channel();
let channel = Endpoint::from_static("http://[::1]:50051")
.connect()
.await?;

let mut greeter_client = GreeterClient::new(channel.clone());
let mut echo_client = EchoClient::new(channel);
Expand Down
2 changes: 1 addition & 1 deletion tonic-examples/src/routeguide/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Bo

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = RouteGuideClient::connect("http://[::1]:10000")?;
let mut client = RouteGuideClient::connect("http://[::1]:10000").await?;

println!("*** SIMPLE RPC ***");
let response = client
Expand Down
3 changes: 2 additions & 1 deletion tonic-examples/src/tls/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let channel = Channel::from_static("http://[::1]:50051")
.tls_config(&tls)
.channel();
.connect()
.await?;

let mut client = EchoClient::new(channel);
let request = tonic::Request::new(EchoRequest {
Expand Down
4 changes: 2 additions & 2 deletions tonic-examples/src/tls_client_auth/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let channel = Channel::from_static("http://[::1]:50051")
.tls_config(&tls)
.clone()
.channel();
.connect()
.await?;

let mut client = EchoClient::new(channel);

Expand Down
2 changes: 1 addition & 1 deletion tonic-interop/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
}

let channel = endpoint.channel();
let channel = endpoint.connect().await?;

let mut client = client::TestClient::new(channel.clone());
let mut unimplemented_client = client::UnimplementedClient::new(channel);
Expand Down
2 changes: 0 additions & 2 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ transport = [
"hyper",
"tokio",
"tower",
"tower-reconnect",
"tower-balance",
"tower-load",
]
Expand Down Expand Up @@ -68,7 +67,6 @@ hyper = { version = "=0.13.0-alpha.4", features = ["unstable-stream"], optional
tokio = { version = "=0.2.0-alpha.6", default-features = false, features = ["tcp"], optional = true }
tower = { version = "=0.3.0-alpha.2", optional = true}
tower-make = "=0.3.0-alpha.2a"
tower-reconnect = { version = "=0.3.0-alpha.2", optional = true }
tower-balance = { version = "=0.3.0-alpha.2", optional = true }
tower-load = { version = "=0.3.0-alpha.2", optional = true }

Expand Down
5 changes: 3 additions & 2 deletions tonic/benches/benchmarks/compiled_protos/helloworld.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ pub mod client {
}
impl GreeterClient<tonic::transport::Channel> {
#[doc = r" Attempt to create a new client by connecting to a given endpoint."]
pub fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
tonic::transport::Endpoint::new(dst).map(|c| Self::new(c.channel()))
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> GreeterClient<T>
Expand Down
10 changes: 6 additions & 4 deletions tonic/src/transport/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,20 @@ impl Channel {
Self::balance(discover, buffer_size, interceptor_headers)
}

pub(crate) fn connect(endpoint: Endpoint) -> Self {
pub(crate) async fn connect(endpoint: Endpoint) -> Result<Self, super::Error> {
let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE);
let interceptor_headers = endpoint.interceptor_headers.clone();

let svc = Connection::new(endpoint);
let svc = Connection::new(endpoint)
.await
.map_err(|e| super::Error::from_source(super::ErrorKind::Client, e))?;

let svc = Buffer::new(Either::A(svc), buffer_size);

Channel {
Ok(Channel {
svc,
interceptor_headers,
}
})
}

pub(crate) fn balance<D>(
Expand Down
4 changes: 2 additions & 2 deletions tonic/src/transport/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ impl Endpoint {
}

/// Create a channel from this config.
pub fn channel(&self) -> Channel {
Channel::connect(self.clone())
pub async fn connect(&self) -> Result<Channel, super::Error> {
Channel::connect(self.clone()).await
}
}

Expand Down
3 changes: 2 additions & 1 deletion tonic/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
//! .timeout(Duration::from_secs(5))
//! .rate_limit(5, Duration::from_secs(1))
//! .concurrency_limit(256)
//! .channel();
//! .connect()
//! .await?;
//!
//! channel.call(Request::new(BoxBody::empty())).await?;
//! # Ok(())
Expand Down
13 changes: 7 additions & 6 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{connector, layer::ServiceBuilderExt, AddOrigin};
use super::{connector, layer::ServiceBuilderExt, reconnect::Reconnect, AddOrigin};
use crate::{body::BoxBody, transport::Endpoint};
use hyper::client::conn::Builder;
use hyper::client::service::Connect as HyperConnect;
Expand All @@ -16,7 +16,6 @@ use tower::{
ServiceBuilder,
};
use tower_load::Load;
use tower_reconnect::Reconnect;
use tower_service::Service;

pub(crate) type Request = http::Request<BoxBody>;
Expand All @@ -27,7 +26,7 @@ pub(crate) struct Connection {
}

impl Connection {
pub(crate) fn new(endpoint: Endpoint) -> Self {
pub(crate) async fn new(endpoint: Endpoint) -> Result<Self, crate::Error> {
#[cfg(feature = "tls")]
let connector = connector(endpoint.tls.clone());

Expand All @@ -47,13 +46,15 @@ impl Connection {
.optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
.into_inner();

let conn = Reconnect::new(HyperConnect::new(connector, settings), endpoint.uri.clone());
let mut connector = HyperConnect::new(connector, settings);
let initial_conn = connector.call(endpoint.uri.clone()).await?;
let conn = Reconnect::new(initial_conn, connector, endpoint.uri.clone());

let inner = stack.layer(conn);

Self {
Ok(Self {
inner: BoxService::new(inner),
}
})
}
}

Expand Down
41 changes: 31 additions & 10 deletions tonic/src/transport/service/discover.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
use super::connection::Connection;
use crate::transport::Endpoint;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
collections::VecDeque,
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower::discover::{Change, Discover};

#[derive(Debug)]
pub(crate) struct ServiceList {
list: VecDeque<Endpoint>,
connecting:
Option<Pin<Box<dyn Future<Output = Result<Connection, crate::Error>> + Send + 'static>>>,
i: usize,
}

impl ServiceList {
pub(crate) fn new(list: Vec<Endpoint>) -> Self {
Self {
list: list.into(),
connecting: None,
i: 0,
}
}
Expand All @@ -27,19 +33,34 @@ impl Discover for ServiceList {

fn poll_discover(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
match self.list.pop_front() {
Some(endpoint) => {
loop {
if let Some(connecting) = &mut self.connecting {
let svc = futures_core::ready!(Pin::new(connecting).poll(cx))?;

let i = self.i;
self.i += 1;

let svc = Connection::new(endpoint);
let change = Ok(Change::Insert(i, svc));

Poll::Ready(change)
return Poll::Ready(change);
}

if let Some(endpoint) = self.list.pop_front() {
let fut = Connection::new(endpoint);
self.connecting = Some(Box::pin(fut));
} else {
return Poll::Pending;
}
None => Poll::Pending,
}
}
}

impl fmt::Debug for ServiceList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ServiceList")
.field("list", &self.list)
.finish()
}
}
1 change: 1 addition & 0 deletions tonic/src/transport/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod discover;
mod either;
mod io;
mod layer;
mod reconnect;
mod router;
#[cfg(feature = "tls")]
mod tls;
Expand Down
Loading

0 comments on commit 5c2f4db

Please sign in to comment.