Skip to content

Commit

Permalink
Use PeerSet in the connect stub.
Browse files Browse the repository at this point in the history
  • Loading branch information
hdevalence committed Oct 10, 2019
1 parent 7684cb5 commit 837f48e
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 6 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ members = [
"zebra-client",
"zebra-reactor",
"zebrad",
]
]

[patch.crates-io]
# Required because we pull tower-load from git
tower = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x" }
7 changes: 7 additions & 0 deletions zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ impl From<failure::Error> for PeerError {
PeerError::Inner(std::sync::Arc::new(e))
}
}

// XXX hack
impl Into<crate::BoxedStdError> for PeerError {
fn into(self) -> crate::BoxedStdError {
Box::new(format_err!("dropped error info").compat())
}
}
3 changes: 2 additions & 1 deletion zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub(super) struct ClientRequest(
impl Service<Request> for PeerClient {
type Response = Response;
type Error = PeerError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Err(_) = ready!(self.server_tx.poll_ready(cx)) {
Expand Down
11 changes: 9 additions & 2 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,15 @@ where
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
fn new() -> Self {
unimplemented!();
/// Construct a peerset which uses `discover` internally.
pub fn new(discover: D) -> Self {
Self {
discover,
ready_services: IndexMap::new(),
cancel_handles: HashMap::new(),
unready_services: FuturesUnordered::new(),
next_idx: None,
}
}

fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxedStdError>> {
Expand Down
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tracing-log = "0.1"
hyper = "=0.13.0-alpha.4"

tower = "=0.3.0-alpha.2"
tower-load = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x"}

zebra-chain = { path = "../zebra-chain" }
zebra-network = { path = "../zebra-network" }
Expand Down
66 changes: 64 additions & 2 deletions zebrad/src/commands/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl ConnectCmd {
async fn connect(&self) -> Result<(), failure::Error> {
use zebra_network::{
peer::PeerConnector,
peer_set::PeerSet,
protocol::internal::{Request, Response},
timestamp_collector::TimestampCollector,
Network,
Expand All @@ -79,8 +80,69 @@ impl ConnectCmd {
let mut client = pc.call(self.addr.clone()).await?;

client.ready().await?;
let rsp = client.call(Request::GetPeers).await?;
info!(?rsp);

let addrs = match client.call(Request::GetPeers).await? {
Response::Peers(addrs) => addrs,
_ => bail!("Got wrong response type"),
};
info!(
addrs.len = addrs.len(),
"got addresses from first connected peer"
);

use failure::Error;
use futures::{
future,
stream::{FuturesUnordered, StreamExt},
};
use std::time::Duration;
use tower::discover::{Change, ServiceStream};
use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument};

// construct a stream of services
let client_stream = PeakEwmaDiscover::new(
ServiceStream::new(
addrs
.into_iter()
.map(|meta| {
let svc_fut = pc.call(meta.addr);
async move { Ok::<_, Error>(Change::Insert(meta.addr, svc_fut.await?)) }
})
.collect::<FuturesUnordered<_>>()
// Discard any errored connections...
.filter(|result| future::ready(result.is_ok())),
),
Duration::from_secs(1), // default rtt estimate
Duration::from_secs(60), // decay time
NoInstrument,
);

info!("finished constructing discover");

let mut peer_set = PeerSet::new(client_stream);

info!("waiting for peer_set ready");
peer_set.ready().await.map_err(Error::from_boxed_compat)?;

info!("peer_set became ready, constructing addr requests");

let mut addr_reqs = FuturesUnordered::new();
for i in 0..10usize {
info!(i, "awaiting peer_set ready");
peer_set.ready().await.map_err(Error::from_boxed_compat)?;
info!(i, "calling peer_set");
addr_reqs.push(peer_set.call(Request::GetPeers));
}

let mut all_addrs = Vec::new();
while let Some(Ok(Response::Peers(addrs))) = addr_reqs.next().await {
info!(
all_addrs.len = all_addrs.len(),
addrs.len = addrs.len(),
"got address response"
);
all_addrs.extend(addrs);
}

loop {
// empty loop ensures we don't exit the application,
Expand Down

0 comments on commit 837f48e

Please sign in to comment.