Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Various networking fixes for v0.2 #789

Merged
merged 15 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 101 additions & 101 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions substrate/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub mod error;
pub mod informant;
mod panic_hook;

use network_libp2p::AddrComponent;
use network_libp2p::Protocol;
use runtime_primitives::traits::As;
use service::{
ServiceFactory, FactoryFullConfiguration, RuntimeGenesis,
Expand Down Expand Up @@ -298,8 +298,8 @@ where
};

config.network.listen_addresses = vec![
iter::once(AddrComponent::IP4(Ipv4Addr::new(0, 0, 0, 0)))
.chain(iter::once(AddrComponent::TCP(port)))
iter::once(Protocol::Ip4(Ipv4Addr::new(0, 0, 0, 0)))
.chain(iter::once(Protocol::Tcp(port)))
.collect()
];
config.network.public_addresses = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion substrate/network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev = "f2a5eee5e8363b5cf567206ab2c01c564943d2ef", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev = "ee9ff643a547f45113793315eaad7a4ed7fcb9b2", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethereum-types = "0.3"
parking_lot = "0.5"
libc = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions substrate/network-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use libp2p::PeerId;

pub use connection_filter::{ConnectionFilter, ConnectionDirection};
pub use error::{Error, ErrorKind, DisconnectReason};
pub use libp2p::{Multiaddr, multiaddr::AddrComponent};
pub use libp2p::{Multiaddr, multiaddr::Protocol};
pub use traits::*;

pub type TimerToken = usize;
Expand Down Expand Up @@ -87,7 +87,7 @@ pub fn validate_node_url(url: &str) -> Result<(), Error> {
pub(crate) fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> {
let mut addr: Multiaddr = addr_str.parse().map_err(|_| ErrorKind::AddressParse)?;
let who = match addr.pop() {
Some(AddrComponent::P2P(key)) =>
Some(Protocol::P2p(key)) =>
PeerId::from_multihash(key).map_err(|_| ErrorKind::AddressParse)?,
_ => return Err(ErrorKind::AddressParse.into()),
};
Expand Down
140 changes: 83 additions & 57 deletions substrate/network-libp2p/src/node_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use libp2p::core::nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHa
use libp2p::kad::{KadConnecConfig, KadFindNodeRespond, KadIncomingRequest, KadConnecController};
use libp2p::{identify, ping};
use parking_lot::Mutex;
use std::error::Error;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, Interval, Timeout, timeout::Error as TimeoutError};
use tokio_timer::{Delay, Interval};
use {Multiaddr, PacketId, ProtocolId};

/// Duration after which we consider that a ping failed.
Expand Down Expand Up @@ -60,11 +59,11 @@ pub struct SubstrateNodeHandler<TSubstream, TUserData> {
need_report_kad_open: bool,

/// Substream open for sending pings, if any.
ping_out_substream: Option<(ping::Pinger, Box<Future<Item = (), Error = IoError> + Send>)>,
/// Active pinging attempt. Includes the moment when we started the ping.
active_ping_out: Option<(Instant, Box<Future<Item = (), Error = TimeoutError<Box<Error + Send + Sync>>> + Send>)>,
ping_out_substream: Option<ping::PingDialer<TSubstream, Instant>>,
/// Active pinging attempt with the moment it expires.
active_ping_out: Option<Delay>,
/// Substreams open for receiving pings.
ping_in_substreams: Vec<Box<Future<Item = (), Error = IoError> + Send>>,
ping_in_substreams: Vec<ping::PingListener<TSubstream>>,
/// Future that fires when we need to ping the node again.
///
/// Every time we receive a pong, we reset the timer to the next time.
Expand Down Expand Up @@ -255,7 +254,7 @@ macro_rules! listener_upgrade {
upgrade::or(upgrade::or(upgrade::or(
upgrade::map((*$self.registered_custom).clone(), move |c| FinalUpgrade::Custom(c)),
upgrade::map(KadConnecConfig::new(), move |(c, s)| FinalUpgrade::Kad(c, s))),
upgrade::map(ping::Ping, move |p| FinalUpgrade::from(p))),
upgrade::map(ping::Ping::default(), move |p| FinalUpgrade::from(p))),
upgrade::map(identify::IdentifyProtocolConfig, move |i| FinalUpgrade::from(i)))
// TODO: meh for cloning a Vec here
)
Expand Down Expand Up @@ -363,7 +362,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
UpgradePurpose::Ping => {
let wanted = upgrade::map(ping::Ping, move |p| FinalUpgrade::from(p));
let wanted = upgrade::map(ping::Ping::default(), move |p| FinalUpgrade::from(p));
// TODO: shouldn't be future::empty() ; requires a change in libp2p
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer, future::empty::<Multiaddr, IoError>())
.map(|(out, _): (FinalUpgrade<TSubstream, TUserData>, _)| out);
Expand Down Expand Up @@ -423,12 +422,6 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
return Ok(Async::Ready(Some(NodeHandlerEvent::Custom(event))));
}

// Request new outbound substreams from the user if necessary.
if self.num_out_user_must_open >= 1 {
self.num_out_user_must_open -= 1;
return Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(()))));
}

match self.poll_upgrades_in_progress()? {
Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))),
Async::NotReady => (),
Expand All @@ -454,6 +447,12 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
Async::NotReady => (),
};

// Request new outbound substreams from the user if necessary.
if self.num_out_user_must_open >= 1 {
self.num_out_user_must_open -= 1;
return Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(()))));
}

// Nothing happened. Register our task to be notified and return.
self.to_notify = Some(task::current());
Ok(Async::NotReady)
Expand All @@ -476,7 +475,8 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
let proto = match self.custom_protocols_substreams.iter().find(|p| p.protocol_id == protocol) {
Some(proto) => proto,
None => {
error!(target: "sub-libp2p", "Protocol {:?} isn't open", protocol);
// We are processing a message event before we could report to the outside that
// we disconnected from the protocol. This is not an error.
return
},
};
Expand Down Expand Up @@ -504,6 +504,9 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
// Opening a new substream for Kademlia.
self.queued_dial_upgrades.push(UpgradePurpose::Kad);
self.num_out_user_must_open += 1;
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
}
}

Expand Down Expand Up @@ -547,10 +550,11 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
self.cancel_dial_upgrade(&UpgradePurpose::Identify);
Some(SubstrateOutEvent::Identified { info, observed_addr })
},
FinalUpgrade::PingDialer(pinger, ping_process) => {
FinalUpgrade::PingDialer(ping_dialer) => {
self.cancel_dial_upgrade(&UpgradePurpose::Ping);
// We always open the ping substream for a reason, which is to immediately ping.
self.ping_out_substream = Some((pinger, ping_process));
self.ping_out_substream = Some(ping_dialer);
self.active_ping_out = None;
if self.ping_remote() {
Some(SubstrateOutEvent::PingStart)
} else {
Expand Down Expand Up @@ -595,6 +599,9 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
if !self.has_upgrade_purpose(&UpgradePurpose::Identify) {
self.queued_dial_upgrades.push(UpgradePurpose::Identify);
self.num_out_user_must_open += 1;
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
}
}

Expand All @@ -611,16 +618,28 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
}

// If we have a ping open, ping it!
if let Some((ref mut pinger, _)) = self.ping_out_substream {
let future = Timeout::new(pinger.ping(), PING_TIMEOUT);
self.active_ping_out = Some((Instant::now(), Box::new(future) as Box<_>));
if let Some(ref mut pinger) = self.ping_out_substream {
let now = Instant::now();
pinger.ping(now);
let future = Delay::new(now + PING_TIMEOUT);
self.active_ping_out = Some(future);
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
return true;
}

// Otherwise, ensure we have an upgrade for a ping substream in queue.
if !self.has_upgrade_purpose(&UpgradePurpose::Ping) {
self.queued_dial_upgrades.push(UpgradePurpose::Ping);
self.num_out_user_must_open += 1;
// We also start the unresponsiveness counter when opening the substream, as a
// peer may not respond to our opening request.
let future = Delay::new(Instant::now() + PING_TIMEOUT);
self.active_ping_out = Some(future);
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
}

false
Expand Down Expand Up @@ -747,6 +766,23 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,

/// Polls the ping substreams.
fn poll_ping(&mut self) -> Poll<Option<SubstrateOutEvent<TSubstream>>, IoError> {
// Poll the future that fires when we need to ping the node again.
match self.next_ping.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(())) => {
// We reset `next_ping` to a very long time in the future so that we can poll
// it again without having an accident.
self.next_ping.reset(Instant::now() + Duration::from_secs(5 * 60));
if self.ping_remote() {
return Ok(Async::Ready(Some(SubstrateOutEvent::PingStart)));
}
},
Err(err) => {
warn!(target: "sub-libp2p", "Ping timer errored: {:?}", err);
return Err(IoError::new(IoErrorKind::Other, err));
}
}

// Poll for answering pings.
for n in (0 .. self.ping_in_substreams.len()).rev() {
let mut ping = self.ping_in_substreams.swap_remove(n);
Expand All @@ -758,41 +794,33 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
}

// Poll the ping substream.
// TODO: the pinging API would benefit from some improvements on the side of libp2p.
if let Some((pinger, mut future)) = self.ping_out_substream.take() {
match future.poll() {
Ok(Async::Ready(())) => {},
Ok(Async::NotReady) => self.ping_out_substream = Some((pinger, future)),
Err(_) => {},
}
}

// Poll the active ping attempt.
if let Some((started, mut ping_attempt)) = self.active_ping_out.take() {
match ping_attempt.poll() {
Ok(Async::Ready(())) => {
if let Some(mut ping_dialer) = self.ping_out_substream.take() {
match ping_dialer.poll() {
Ok(Async::Ready(Some(started))) => {
self.active_ping_out = None;
self.next_ping.reset(Instant::now() + DELAY_TO_NEXT_PING);
return Ok(Async::Ready(Some(SubstrateOutEvent::PingSuccess(started.elapsed()))));
},
Ok(Async::NotReady) => self.active_ping_out = Some((started, ping_attempt)),
Err(_) => return Ok(Async::Ready(Some(SubstrateOutEvent::Unresponsive))),
Ok(Async::Ready(None)) => {
// Try re-open ping if it got closed.
self.queued_dial_upgrades.push(UpgradePurpose::Ping);
self.num_out_user_must_open += 1;
},
Ok(Async::NotReady) => self.ping_out_substream = Some(ping_dialer),
Err(_) => {},
}
}

// Poll the future that fires when we need to ping the node again.
match self.next_ping.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(())) => {
// We reset `next_ping` to a very long time in the future so that we can poll
// it again without having an accident.
self.next_ping.reset(Instant::now() + Duration::from_secs(5 * 60));
if self.ping_remote() {
return Ok(Async::Ready(Some(SubstrateOutEvent::PingStart)));
}
},
Err(err) => {
warn!(target: "sub-libp2p", "Ping timer errored: {:?}", err);
return Err(IoError::new(IoErrorKind::Other, err));
// Poll the active ping attempt.
if let Some(mut deadline) = self.active_ping_out.take() {
match deadline.poll() {
Ok(Async::Ready(())) =>
return Ok(Async::Ready(Some(SubstrateOutEvent::Unresponsive))),
Ok(Async::NotReady) => self.active_ping_out = Some(deadline),
Err(err) => {
warn!(target: "sub-libp2p", "Active ping deadline errored: {:?}", err);
return Err(IoError::new(IoErrorKind::Other, err));
},
}
}

Expand Down Expand Up @@ -837,18 +865,16 @@ enum FinalUpgrade<TSubstream, TUserData> {
Kad(KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send>),
IdentifyListener(identify::IdentifySender<TSubstream>),
IdentifyDialer(identify::IdentifyInfo, Multiaddr),
PingDialer(ping::Pinger, Box<Future<Item = (), Error = IoError> + Send>),
PingListener(Box<Future<Item = (), Error = IoError> + Send>),
PingDialer(ping::PingDialer<TSubstream, Instant>),
PingListener(ping::PingListener<TSubstream>),
Custom(RegisteredProtocolOutput<TUserData>),
}

impl<TSubstream, TUserData> From<ping::PingOutput> for FinalUpgrade<TSubstream, TUserData> {
fn from(out: ping::PingOutput) -> Self {
impl<TSubstream, TUserData> From<ping::PingOutput<TSubstream, Instant>> for FinalUpgrade<TSubstream, TUserData> {
fn from(out: ping::PingOutput<TSubstream, Instant>) -> Self {
match out {
ping::PingOutput::Ponger(processing) =>
FinalUpgrade::PingListener(processing),
ping::PingOutput::Pinger { pinger, processing } =>
FinalUpgrade::PingDialer(pinger, processing),
ping::PingOutput::Ponger(ponger) => FinalUpgrade::PingListener(ponger),
ping::PingOutput::Pinger(pinger) => FinalUpgrade::PingDialer(pinger),
}
}
}
Expand Down
Loading