Skip to content

Commit

Permalink
feat(transport): Connect lazily in the load balanced channel (#493)
Browse files Browse the repository at this point in the history
  • Loading branch information
LukeMathWalker authored Nov 18, 2020
1 parent f1275b6 commit 2e964c7
Showing 1 changed file with 21 additions and 39 deletions.
60 changes: 21 additions & 39 deletions tonic/src/transport/service/discover.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::super::{service, BoxFuture};
use super::super::service;
use super::connection::Connection;
use crate::transport::Endpoint;

use std::{
future::Future,
hash::Hash,
pin::Pin,
task::{Context, Poll},
Expand All @@ -16,15 +15,11 @@ type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;

pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
changes: Receiver<Change<K, Endpoint>>,
connecting: Option<(K, BoxFuture<Connection, crate::Error>)>,
}

impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self {
Self {
changes,
connecting: None,
}
Self { changes }
}
}

Expand All @@ -37,39 +32,26 @@ impl<K: Hash + Eq + Clone> Discover for DynamicServiceStream<K> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<DiscoverResult<Self::Key, Self::Service, Self::Error>> {
loop {
if let Some((key, connecting)) = &mut self.connecting {
let svc = futures_core::ready!(Pin::new(connecting).poll(cx))?;
let key = key.to_owned();
self.connecting = None;
let change = Ok(Change::Insert(key, svc));
return Poll::Ready(change);
};

let c = &mut self.changes;
match Pin::new(&mut *c).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
return Poll::Pending;
let c = &mut self.changes;
match Pin::new(&mut *c).poll_next(cx) {
Poll::Pending | Poll::Ready(None) => Poll::Pending,
Poll::Ready(Some(change)) => match change {
Change::Insert(k, endpoint) => {
let mut http = hyper::client::connect::HttpConnector::new();
http.set_nodelay(endpoint.tcp_nodelay);
http.set_keepalive(endpoint.tcp_keepalive);
http.enforce_http(false);
#[cfg(feature = "tls")]
let connector = service::connector(http, endpoint.tls.clone());

#[cfg(not(feature = "tls"))]
let connector = service::connector(http);
let connection = Connection::lazy(connector, endpoint);
let change = Ok(Change::Insert(k, connection));
Poll::Ready(change)
}
Poll::Ready(Some(change)) => match change {
Change::Insert(k, endpoint) => {
let mut http = hyper::client::connect::HttpConnector::new();
http.set_nodelay(endpoint.tcp_nodelay);
http.set_keepalive(endpoint.tcp_keepalive);
http.enforce_http(false);
#[cfg(feature = "tls")]
let connector = service::connector(http, endpoint.tls.clone());

#[cfg(not(feature = "tls"))]
let connector = service::connector(http);
let fut = Connection::connect(connector, endpoint);
self.connecting = Some((k, Box::pin(fut)));
continue;
}
Change::Remove(k) => return Poll::Ready(Ok(Change::Remove(k))),
},
}
Change::Remove(k) => Poll::Ready(Ok(Change::Remove(k))),
},
}
}
}
Expand Down

0 comments on commit 2e964c7

Please sign in to comment.