From 94c9510a66a7ba59abadfdd170f9e05a51d1a853 Mon Sep 17 00:00:00 2001 From: Anton Date: Fri, 12 May 2023 11:12:51 +0400 Subject: [PATCH] Upgrade to libp2p 0.51.3 (#13587) * client/network: upgrade to libp2p 0.51.0 * make discovery.rs compile * make peer_info.rs compile * changes to notifications and request-response proto * make service.rs compile * towards making request_responses.rs compile * make request_responses.rs compile * make request_responses.rs compile * fix notifications/behaviour.rs tests * fix warnings * remove old code * allow deprecated code (temporary) * upgrade to libp2p 0.51.1 * add TODO for behaviour tests * return empty vec if peer_id is absent https://github.com/paritytech/substrate/pull/13587#discussion_r1141695167 fyi: I don't really know what the old behaviour was. * update comment to reflect new defaults Closes #13338 * Revert "update comment to reflect new defaults" This reverts commit 7a981abd69308e9d522ec94905f181439a1b1dba. * remove config.rs (from wrong merge) * upgrade to libp2p 0.51.2 * fix formatting * use handle_pending_outbound_connection in networt_state RPC * update deps * use re-exports when we use other libp2p packages * Apply suggestions from code review Co-authored-by: Dmitry Markin * format code * handle potential errors in network_state RPC * only update libp2p crate * update libp2p-core * fix docs * use libp2p-identity instead of libp2p where it's possible. libp2p-identity is much smaller, hence makes sense to use it instead of larger libp2p crate. * Update client/network/src/discovery.rs Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com> * update Cargo.lock * add comment for per_connection_event_buffer_size current value is somewhat arbitrary and needs to be tweaked depending on memory usage and network worker sleep stats. * fix link format * update Cargo.lock * upgrade to libp2p 0.51.3 * deprecate mplex * Revert "deprecate mplex" This reverts commit 9e25820e706e464a0e962a8604861fcb2a7641eb. * Revert "upgrade to libp2p 0.51.3" This reverts commit 6544dd4138e2f89517bd7c7281fc78a638ec7040. * use new libp2p version in `statement` crate * pin version temporarily * libp2p 0.51.3 * deprecate mplex * deprecate legacy noise handshake * fix build error * update libp2p-identity * enable libp2p-identity:ed25519 feature in sc-consensus * enable ed25519 for peerset as well --------- Co-authored-by: Dmitry Markin Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com> Co-authored-by: parity-processbot <> --- bin/node/runtime/Cargo.toml | 1 - client/authority-discovery/Cargo.toml | 3 +- client/authority-discovery/src/error.rs | 2 +- client/authority-discovery/src/tests.rs | 17 +- client/authority-discovery/src/worker.rs | 18 +- .../src/worker/schema/tests.rs | 6 +- .../authority-discovery/src/worker/tests.rs | 2 +- client/cli/Cargo.toml | 2 +- client/cli/src/commands/generate_node_key.rs | 6 +- client/cli/src/commands/inspect_node_key.rs | 10 +- client/cli/src/params/node_key_params.rs | 13 +- client/consensus/common/Cargo.toml | 2 +- client/consensus/common/src/import_queue.rs | 2 +- .../common/src/import_queue/basic_queue.rs | 2 +- client/network-gossip/Cargo.toml | 2 +- client/network/Cargo.toml | 2 +- client/network/README.md | 2 - client/network/bitswap/Cargo.toml | 2 +- client/network/bitswap/src/lib.rs | 2 +- client/network/common/Cargo.toml | 2 +- client/network/common/src/sync.rs | 2 +- client/network/light/Cargo.toml | 2 +- .../src/light_client_requests/handler.rs | 2 +- client/network/src/behaviour.rs | 10 +- client/network/src/config.rs | 28 +- client/network/src/discovery.rs | 204 +++++----- client/network/src/event.rs | 2 +- client/network/src/lib.rs | 4 +- client/network/src/peer_info.rs | 167 +++++--- client/network/src/protocol.rs | 78 ++-- .../src/protocol/notifications/behaviour.rs | 357 ++++++++++-------- .../src/protocol/notifications/handler.rs | 96 ++--- .../src/protocol/notifications/tests.rs | 85 +++-- client/network/src/request_responses.rs | 292 +++++++------- client/network/src/service.rs | 112 +++--- client/network/src/service/signature.rs | 2 +- client/network/src/service/traits.rs | 2 +- client/network/src/transport.rs | 50 +-- client/network/statement/Cargo.toml | 2 +- client/network/sync/Cargo.toml | 2 +- client/network/test/Cargo.toml | 2 +- client/network/transactions/Cargo.toml | 2 +- client/offchain/Cargo.toml | 2 +- client/peerset/Cargo.toml | 2 +- client/peerset/src/lib.rs | 4 +- client/peerset/src/peersstate.rs | 4 +- client/peerset/tests/fuzz.rs | 2 +- client/telemetry/Cargo.toml | 2 +- 48 files changed, 867 insertions(+), 750 deletions(-) diff --git a/bin/node/runtime/Cargo.toml b/bin/node/runtime/Cargo.toml index 50944ee6de917..347e5272ab7a8 100644 --- a/bin/node/runtime/Cargo.toml +++ b/bin/node/runtime/Cargo.toml @@ -186,7 +186,6 @@ std = [ "pallet-staking-runtime-api/std", "pallet-state-trie-migration/std", "pallet-statement/std", - "pallet-salary/std", "sp-session/std", "pallet-sudo/std", "frame-support/std", diff --git a/client/authority-discovery/Cargo.toml b/client/authority-discovery/Cargo.toml index 900d9c59dfdae..8b9fb743b4ade 100644 --- a/client/authority-discovery/Cargo.toml +++ b/client/authority-discovery/Cargo.toml @@ -21,7 +21,8 @@ codec = { package = "parity-scale-codec", version = "3.2.2", default-features = futures = "0.3.21" futures-timer = "3.0.1" ip_network = "0.4.1" -libp2p = { version = "0.50.0", features = ["kad"] } +libp2p = { version = "0.51.3", features = ["kad", "ed25519"] } +multihash = { version = "0.17.0", default-features = false, features = ["std", "sha2"] } log = "0.4.17" prost = "0.11" rand = "0.8.5" diff --git a/client/authority-discovery/src/error.rs b/client/authority-discovery/src/error.rs index 13148bb338c2f..ca685115d4975 100644 --- a/client/authority-discovery/src/error.rs +++ b/client/authority-discovery/src/error.rs @@ -56,7 +56,7 @@ pub enum Error { ParsingMultiaddress(#[from] libp2p::core::multiaddr::Error), #[error("Failed to parse a libp2p key.")] - ParsingLibp2pIdentity(#[from] libp2p::identity::error::DecodingError), + ParsingLibp2pIdentity(#[from] libp2p::identity::DecodingError), #[error("Failed to sign: {0}.")] CannotSign(String), diff --git a/client/authority-discovery/src/tests.rs b/client/authority-discovery/src/tests.rs index d354f6a963e73..4fbc196c5ecd1 100644 --- a/client/authority-discovery/src/tests.rs +++ b/client/authority-discovery/src/tests.rs @@ -25,8 +25,9 @@ use crate::{ }; use futures::{channel::mpsc::channel, executor::LocalPool, task::LocalSpawn}; -use libp2p::core::{ - multiaddr::{Multiaddr, Protocol}, +use libp2p::{ + core::multiaddr::{Multiaddr, Protocol}, + identity::ed25519, PeerId, }; use std::{collections::HashSet, sync::Arc}; @@ -86,18 +87,16 @@ fn get_addresses_and_authority_id() { fn cryptos_are_compatible() { use sp_core::crypto::Pair; - let libp2p_secret = libp2p::identity::Keypair::generate_ed25519(); - let libp2p_public = libp2p_secret.public(); + let libp2p_keypair = ed25519::Keypair::generate(); + let libp2p_public = libp2p_keypair.public(); - let sp_core_secret = { - let libp2p::identity::Keypair::Ed25519(libp2p_ed_secret) = libp2p_secret.clone(); - sp_core::ed25519::Pair::from_seed_slice(&libp2p_ed_secret.secret().as_ref()).unwrap() - }; + let sp_core_secret = + { sp_core::ed25519::Pair::from_seed_slice(&libp2p_keypair.secret().as_ref()).unwrap() }; let sp_core_public = sp_core_secret.public(); let message = b"we are more powerful than not to be better"; - let libp2p_signature = libp2p_secret.sign(message).unwrap(); + let libp2p_signature = libp2p_keypair.sign(message); let sp_core_signature = sp_core_secret.sign(message); // no error expected... assert!(sp_core::ed25519::Pair::verify( diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index 590d1dd19dc12..a29e74df9accc 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -34,11 +34,9 @@ use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt} use addr_cache::AddrCache; use codec::{Decode, Encode}; use ip_network::IpNetwork; -use libp2p::{ - core::multiaddr, - multihash::{Multihash, MultihashDigest}, - Multiaddr, PeerId, -}; +use libp2p::{core::multiaddr, identity::PublicKey, multihash::Multihash, Multiaddr, PeerId}; +use multihash::{Code, MultihashDigest}; + use log::{debug, error, log_enabled}; use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64}; use prost::Message; @@ -551,10 +549,8 @@ where // properly signed by the owner of the PeerId if let Some(peer_signature) = peer_signature { - let public_key = libp2p::identity::PublicKey::from_protobuf_encoding( - &peer_signature.public_key, - ) - .map_err(Error::ParsingLibp2pIdentity)?; + let public_key = PublicKey::try_decode_protobuf(&peer_signature.public_key) + .map_err(Error::ParsingLibp2pIdentity)?; let signature = Signature { public_key, bytes: peer_signature.signature }; if !signature.verify(record, &remote_peer_id) { @@ -625,7 +621,7 @@ pub trait NetworkProvider: NetworkDHTProvider + NetworkStateInfo + NetworkSigner impl NetworkProvider for T where T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner {} fn hash_authority_id(id: &[u8]) -> KademliaKey { - KademliaKey::new(&libp2p::multihash::Code::Sha2_256.digest(id).digest()) + KademliaKey::new(&Code::Sha2_256.digest(id).digest()) } // Makes sure all values are the same and returns it @@ -662,7 +658,7 @@ fn sign_record_with_peer_id( let signature = network .sign_with_local_identity(serialized_record) .map_err(|e| Error::CannotSign(format!("{} (network packet)", e)))?; - let public_key = signature.public_key.to_protobuf_encoding(); + let public_key = signature.public_key.encode_protobuf(); let signature = signature.bytes; Ok(schema::PeerSignature { signature, public_key }) } diff --git a/client/authority-discovery/src/worker/schema/tests.rs b/client/authority-discovery/src/worker/schema/tests.rs index 89c921e0c9fda..c765e4e5384db 100644 --- a/client/authority-discovery/src/worker/schema/tests.rs +++ b/client/authority-discovery/src/worker/schema/tests.rs @@ -21,7 +21,7 @@ mod schema_v1 { } use super::*; -use libp2p::{multiaddr::Multiaddr, PeerId}; +use libp2p::{identity::Keypair, multiaddr::Multiaddr, PeerId}; use prost::Message; #[test] @@ -55,7 +55,7 @@ fn v2_decodes_v1() { #[test] fn v1_decodes_v2() { - let peer_secret = libp2p::identity::Keypair::generate_ed25519(); + let peer_secret = Keypair::generate_ed25519(); let peer_public = peer_secret.public(); let peer_id = peer_public.to_peer_id(); let multiaddress: Multiaddr = @@ -67,7 +67,7 @@ fn v1_decodes_v2() { let record_v2 = AuthorityRecord { addresses: vec_addresses.clone() }; let mut vec_record_v2 = vec![]; record_v2.encode(&mut vec_record_v2).unwrap(); - let vec_peer_public = peer_public.to_protobuf_encoding(); + let vec_peer_public = peer_public.encode_protobuf(); let peer_signature_v2 = PeerSignature { public_key: vec_peer_public, signature: vec_peer_signature }; let signed_record_v2 = SignedAuthorityRecord { diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index 49055fec51611..c29120881940c 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -31,7 +31,7 @@ use futures::{ }; use libp2p::{ core::multiaddr, - identity::{error::SigningError, Keypair}, + identity::{Keypair, SigningError}, kad::record::Key as KademliaKey, PeerId, }; diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index d6d438f33c06f..bfb2924e9366c 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -18,7 +18,7 @@ chrono = { version = "0.4", default-features = false } clap = { version = "4.0.9", features = ["derive", "string"] } fdlimit = "0.2.1" futures = "0.3.21" -libp2p = "0.50.0" +libp2p-identity = { version = "0.1.2", features = ["peerid", "ed25519"]} log = "0.4.17" names = { version = "0.13.0", default-features = false } parity-scale-codec = "3.2.2" diff --git a/client/cli/src/commands/generate_node_key.rs b/client/cli/src/commands/generate_node_key.rs index 2288cd4037773..6ba84ad9d634a 100644 --- a/client/cli/src/commands/generate_node_key.rs +++ b/client/cli/src/commands/generate_node_key.rs @@ -19,7 +19,7 @@ use crate::Error; use clap::Parser; -use libp2p::identity::{ed25519 as libp2p_ed25519, PublicKey}; +use libp2p_identity::{ed25519, Keypair}; use std::{ fs, io::{self, Write}, @@ -50,7 +50,7 @@ pub struct GenerateNodeKeyCmd { impl GenerateNodeKeyCmd { /// Run the command pub fn run(&self) -> Result<(), Error> { - let keypair = libp2p_ed25519::Keypair::generate(); + let keypair = ed25519::Keypair::generate(); let secret = keypair.secret(); @@ -65,7 +65,7 @@ impl GenerateNodeKeyCmd { None => io::stdout().lock().write_all(&file_data)?, } - eprintln!("{}", PublicKey::Ed25519(keypair.public()).to_peer_id()); + eprintln!("{}", Keypair::from(keypair).public().to_peer_id()); Ok(()) } diff --git a/client/cli/src/commands/inspect_node_key.rs b/client/cli/src/commands/inspect_node_key.rs index 2370f4a0989ba..4ff12f4e2255b 100644 --- a/client/cli/src/commands/inspect_node_key.rs +++ b/client/cli/src/commands/inspect_node_key.rs @@ -19,7 +19,7 @@ use crate::Error; use clap::Parser; -use libp2p::identity::{ed25519, PublicKey}; +use libp2p_identity::Keypair; use std::{ fs, io::{self, Read}, @@ -70,12 +70,10 @@ impl InspectNodeKeyCmd { .map_err(|_| "failed to decode secret as hex")?; } - let secret = - ed25519::SecretKey::from_bytes(&mut file_data).map_err(|_| "Bad node key file")?; + let keypair = + Keypair::ed25519_from_bytes(&mut file_data).map_err(|_| "Bad node key file")?; - let keypair = ed25519::Keypair::from(secret); - - println!("{}", PublicKey::Ed25519(keypair.public()).to_peer_id()); + println!("{}", keypair.public().to_peer_id()); Ok(()) } diff --git a/client/cli/src/params/node_key_params.rs b/client/cli/src/params/node_key_params.rs index 074b95bea0f3a..5366423aa37f3 100644 --- a/client/cli/src/params/node_key_params.rs +++ b/client/cli/src/params/node_key_params.rs @@ -113,7 +113,7 @@ fn invalid_node_key(e: impl std::fmt::Display) -> error::Error { /// Parse a Ed25519 secret key from a hex string into a `sc_network::Secret`. fn parse_ed25519_secret(hex: &str) -> error::Result { H256::from_str(hex).map_err(invalid_node_key).and_then(|bytes| { - ed25519::SecretKey::from_bytes(bytes) + ed25519::SecretKey::try_from_bytes(bytes) .map(sc_network::config::Secret::Input) .map_err(invalid_node_key) }) @@ -123,7 +123,7 @@ fn parse_ed25519_secret(hex: &str) -> error::Result {}, - _ => panic!("Invalid key"), + if let Ok(pair) = node_key.try_into_ed25519() { + if pair.secret().as_ref() != key.as_ref() { + panic!("Invalid key") + } + } else { + panic!("Invalid key") } } diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index d9e80e1e5ce99..e953d67965627 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] async-trait = "0.1.57" futures = { version = "0.3.21", features = ["thread-pool"] } futures-timer = "3.0.1" -libp2p = "0.50.0" +libp2p-identity = { version = "0.1.2", features = ["peerid", "ed25519"] } log = "0.4.17" mockall = "0.11.3" parking_lot = "0.12.1" diff --git a/client/consensus/common/src/import_queue.rs b/client/consensus/common/src/import_queue.rs index cec9aca47e29f..11ebbd4036a20 100644 --- a/client/consensus/common/src/import_queue.rs +++ b/client/consensus/common/src/import_queue.rs @@ -66,7 +66,7 @@ pub type BoxJustificationImport = Box + Send + Sync>; /// Maps to the RuntimeOrigin used by the network. -pub type RuntimeOrigin = libp2p::PeerId; +pub type RuntimeOrigin = libp2p_identity::PeerId; /// Block data used by the queue. #[derive(Debug, PartialEq, Eq, Clone)] diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index 653c88321554e..b93913703d39f 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -635,7 +635,7 @@ mod tests { let hash = Hash::random(); finality_sender .unbounded_send(worker_messages::ImportJustification( - libp2p::PeerId::random(), + libp2p_identity::PeerId::random(), hash, 1, (*b"TEST", Vec::new()), diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index 5c1bc91f105c8..40277c946a1d7 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -17,7 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"] ahash = "0.8.2" futures = "0.3.21" futures-timer = "3.0.1" -libp2p = "0.50.0" +libp2p = "0.51.3" log = "0.4.17" lru = "0.8.1" tracing = "0.1.29" diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 6fc4131f74e10..de4c4c14a2587 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -25,7 +25,7 @@ fnv = "1.0.6" futures = "0.3.21" futures-timer = "3.0.2" ip_network = "0.4.1" -libp2p = { version = "0.50.0", features = ["dns", "identify", "kad", "macros", "mdns", "mplex", "noise", "ping", "tcp", "tokio", "yamux", "websocket"] } +libp2p = { version = "0.51.3", features = ["dns", "identify", "kad", "macros", "mdns", "noise", "ping", "tcp", "tokio", "yamux", "websocket", "request-response"] } linked_hash_set = "0.1.3" log = "0.4.17" lru = "0.8.1" diff --git a/client/network/README.md b/client/network/README.md index d396747241ab1..be084816b441d 100644 --- a/client/network/README.md +++ b/client/network/README.md @@ -66,8 +66,6 @@ negotiated and applied. The exact handshake protocol is experimental and is subj The following multiplexing protocols are supported: -- [Mplex](https://github.com/libp2p/specs/tree/master/mplex). Support for mplex will likely -be deprecated in the future. - [Yamux](https://github.com/hashicorp/yamux/blob/master/spec.md). ## Substreams diff --git a/client/network/bitswap/Cargo.toml b/client/network/bitswap/Cargo.toml index ee2e0cfc79ff7..a953676ec160e 100644 --- a/client/network/bitswap/Cargo.toml +++ b/client/network/bitswap/Cargo.toml @@ -18,7 +18,7 @@ prost-build = "0.11" [dependencies] cid = "0.8.6" futures = "0.3.21" -libp2p = "0.50.0" +libp2p-identity = { version = "0.1.2", features = ["peerid"] } log = "0.4.17" prost = "0.11" thiserror = "1.0" diff --git a/client/network/bitswap/src/lib.rs b/client/network/bitswap/src/lib.rs index 5a7a7b51355c6..6ea7bc7e7a38a 100644 --- a/client/network/bitswap/src/lib.rs +++ b/client/network/bitswap/src/lib.rs @@ -22,7 +22,7 @@ use cid::{self, Version}; use futures::{channel::mpsc, StreamExt}; -use libp2p::core::PeerId; +use libp2p_identity::PeerId; use log::{debug, error, trace}; use prost::Message; use sc_client_api::BlockBackend; diff --git a/client/network/common/Cargo.toml b/client/network/common/Cargo.toml index 983342b014b82..d9769413b857f 100644 --- a/client/network/common/Cargo.toml +++ b/client/network/common/Cargo.toml @@ -25,7 +25,7 @@ codec = { package = "parity-scale-codec", version = "3.2.2", features = [ ] } futures = "0.3.21" futures-timer = "3.0.2" -libp2p = { version = "0.50.0", features = ["request-response", "kad"] } +libp2p-identity = { version = "0.1.2", features = ["peerid"] } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } smallvec = "1.8.0" sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index b01091ae01641..404a1aff91153 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -25,7 +25,7 @@ pub mod warp; use crate::role::Roles; use futures::Stream; -use libp2p::PeerId; +use libp2p_identity::PeerId; use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}; use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock}; diff --git a/client/network/light/Cargo.toml b/client/network/light/Cargo.toml index ed2a5d6cb4ec6..cd0dfbca50d2a 100644 --- a/client/network/light/Cargo.toml +++ b/client/network/light/Cargo.toml @@ -21,7 +21,7 @@ codec = { package = "parity-scale-codec", version = "3.2.2", features = [ "derive", ] } futures = "0.3.21" -libp2p = "0.50.0" +libp2p-identity = { version = "0.1.2", features = ["peerid"] } log = "0.4.16" prost = "0.11" sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } diff --git a/client/network/light/src/light_client_requests/handler.rs b/client/network/light/src/light_client_requests/handler.rs index db2630b79f498..2a68ebe9c2b23 100644 --- a/client/network/light/src/light_client_requests/handler.rs +++ b/client/network/light/src/light_client_requests/handler.rs @@ -25,7 +25,7 @@ use crate::schema; use codec::{self, Decode, Encode}; use futures::{channel::mpsc, prelude::*}; -use libp2p::PeerId; +use libp2p_identity::PeerId; use log::{debug, trace}; use prost::Message; use sc_client_api::{BlockBackend, ProofProvider}; diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index f068099928efc..ef967eee92686 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -28,10 +28,8 @@ use crate::{ use bytes::Bytes; use futures::channel::oneshot; use libp2p::{ - core::{Multiaddr, PeerId, PublicKey}, - identify::Info as IdentifyInfo, - kad::record, - swarm::NetworkBehaviour, + core::Multiaddr, identify::Info as IdentifyInfo, identity::PublicKey, kad::RecordKey, + swarm::NetworkBehaviour, PeerId, }; use sc_network_common::role::{ObservedRole, Roles}; @@ -256,13 +254,13 @@ impl Behaviour { /// Start querying a record from the DHT. Will later produce either a `ValueFound` or a /// `ValueNotFound` event. - pub fn get_value(&mut self, key: record::Key) { + pub fn get_value(&mut self, key: RecordKey) { self.discovery.get_value(key); } /// Starts putting a record into DHT. Will later produce either a `ValuePut` or a /// `ValuePutFailed` event. - pub fn put_value(&mut self, key: record::Key, value: Vec) { + pub fn put_value(&mut self, key: RecordKey, value: Vec) { self.discovery.put_value(key, value); } } diff --git a/client/network/src/config.rs b/client/network/src/config.rs index e80de13829152..17ca8335653de 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -29,8 +29,9 @@ pub use crate::{ types::ProtocolName, }; +pub use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId}; + use codec::Encode; -use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId}; use prometheus_endpoint::Registry; use zeroize::Zeroize; @@ -367,7 +368,7 @@ impl NodeKeyConfig { match self { Ed25519(Secret::New) => Ok(Keypair::generate_ed25519()), - Ed25519(Secret::Input(k)) => Ok(Keypair::Ed25519(k.into())), + Ed25519(Secret::Input(k)) => Ok(ed25519::Keypair::from(k).into()), Ed25519(Secret::File(f)) => get_secret( f, @@ -378,14 +379,14 @@ impl NodeKeyConfig { None } }) { - Some(s) => ed25519::SecretKey::from_bytes(s), - _ => ed25519::SecretKey::from_bytes(&mut b), + Some(s) => ed25519::SecretKey::try_from_bytes(s), + _ => ed25519::SecretKey::try_from_bytes(&mut b), }, ed25519::SecretKey::generate, |b| b.as_ref().to_vec(), ) .map(ed25519::Keypair::from) - .map(Keypair::Ed25519), + .map(Keypair::from), } } } @@ -769,9 +770,14 @@ mod tests { tempfile::Builder::new().prefix(prefix).tempdir().unwrap() } - fn secret_bytes(kp: &Keypair) -> Vec { - let Keypair::Ed25519(p) = kp; - p.secret().as_ref().iter().cloned().collect() + fn secret_bytes(kp: Keypair) -> Vec { + kp.try_into_ed25519() + .expect("ed25519 keypair") + .secret() + .as_ref() + .iter() + .cloned() + .collect() } #[test] @@ -781,7 +787,7 @@ mod tests { let file = tmp.path().join("x").to_path_buf(); let kp1 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap(); let kp2 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap(); - assert!(file.is_file() && secret_bytes(&kp1) == secret_bytes(&kp2)) + assert!(file.is_file() && secret_bytes(kp1) == secret_bytes(kp2)) } #[test] @@ -789,13 +795,13 @@ mod tests { let sk = ed25519::SecretKey::generate(); let kp1 = NodeKeyConfig::Ed25519(Secret::Input(sk.clone())).into_keypair().unwrap(); let kp2 = NodeKeyConfig::Ed25519(Secret::Input(sk)).into_keypair().unwrap(); - assert!(secret_bytes(&kp1) == secret_bytes(&kp2)); + assert!(secret_bytes(kp1) == secret_bytes(kp2)); } #[test] fn test_secret_new() { let kp1 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap(); let kp2 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap(); - assert!(secret_bytes(&kp1) != secret_bytes(&kp2)); + assert!(secret_bytes(kp1) != secret_bytes(kp2)); } } diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index 7100c0c70d525..708406bd15340 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -53,26 +53,24 @@ use futures::prelude::*; use futures_timer::Delay; use ip_network::IpNetwork; use libp2p::{ - core::{connection::ConnectionId, Multiaddr, PeerId, PublicKey}, + core::{Endpoint, Multiaddr}, kad::{ - handler::KademliaHandlerProto, - record::{ - self, - store::{MemoryStore, RecordStore}, - }, + handler::KademliaHandler, + record::store::{MemoryStore, RecordStore}, GetClosestPeersError, GetRecordOk, Kademlia, KademliaBucketInserts, KademliaConfig, - KademliaEvent, QueryId, QueryResult, Quorum, Record, + KademliaEvent, QueryId, QueryResult, Quorum, Record, RecordKey, }, mdns::{self, tokio::Behaviour as TokioMdns}, multiaddr::Protocol, swarm::{ behaviour::{ - toggle::{Toggle, ToggleIntoConnectionHandler}, + toggle::{Toggle, ToggleConnectionHandler}, DialFailure, FromSwarm, NewExternalAddr, }, - ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, + ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }, + PeerId, }; use log::{debug, info, trace, warn}; use sp_core::hexdisplay::HexDisplay; @@ -107,9 +105,9 @@ pub struct DiscoveryConfig { impl DiscoveryConfig { /// Create a default configuration with the given public key. - pub fn new(local_public_key: PublicKey) -> Self { + pub fn new(local_peer_id: PeerId) -> Self { Self { - local_peer_id: local_public_key.to_peer_id(), + local_peer_id, permanent_addresses: Vec::new(), dht_random_walk: true, allow_private_ip: true, @@ -235,7 +233,7 @@ impl DiscoveryConfig { allow_private_ip, discovery_only_if_under_num, mdns: if enable_mdns { - match TokioMdns::new(mdns::Config::default()) { + match TokioMdns::new(mdns::Config::default(), local_peer_id) { Ok(mdns) => Some(mdns), Err(err) => { warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err); @@ -374,7 +372,7 @@ impl DiscoveryBehaviour { /// Start fetching a record from the DHT. /// /// A corresponding `ValueFound` or `ValueNotFound` event will later be generated. - pub fn get_value(&mut self, key: record::Key) { + pub fn get_value(&mut self, key: RecordKey) { if let Some(k) = self.kademlia.as_mut() { k.get_record(key.clone()); } @@ -384,7 +382,7 @@ impl DiscoveryBehaviour { /// `get_value`. /// /// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated. - pub fn put_value(&mut self, key: record::Key, value: Vec) { + pub fn put_value(&mut self, key: RecordKey, value: Vec) { if let Some(k) = self.kademlia.as_mut() { if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) { warn!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e); @@ -460,22 +458,22 @@ pub enum DiscoveryOut { /// The DHT yielded results for the record request. /// /// Returning the result grouped in (key, value) pairs as well as the request duration. - ValueFound(Vec<(record::Key, Vec)>, Duration), + ValueFound(Vec<(RecordKey, Vec)>, Duration), /// The record requested was not found in the DHT. /// /// Returning the corresponding key as well as the request duration. - ValueNotFound(record::Key, Duration), + ValueNotFound(RecordKey, Duration), /// The record with a given key was successfully inserted into the DHT. /// /// Returning the corresponding key as well as the request duration. - ValuePut(record::Key, Duration), + ValuePut(RecordKey, Duration), /// Inserting a value into the DHT failed. /// /// Returning the corresponding key as well as the request duration. - ValuePutFailed(record::Key, Duration), + ValuePutFailed(RecordKey, Duration), /// Started a random Kademlia query. /// @@ -484,29 +482,83 @@ pub enum DiscoveryOut { } impl NetworkBehaviour for DiscoveryBehaviour { - type ConnectionHandler = ToggleIntoConnectionHandler>; + type ConnectionHandler = ToggleConnectionHandler>; type OutEvent = DiscoveryOut; - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.kademlia.new_handler() + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.kademlia.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + self.kademlia.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + ) + } + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.kademlia + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) } - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let Some(peer_id) = maybe_peer else { return Ok(Vec::new()); }; + let mut list = self .permanent_addresses .iter() - .filter_map(|(p, a)| if p == peer_id { Some(a.clone()) } else { None }) + .filter_map(|(p, a)| (*p == peer_id).then_some(a.clone())) .collect::>(); - if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(peer_id) { + if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) { list.extend(ephemeral_addresses.clone()); } { - let mut list_to_filter = self.kademlia.addresses_of_peer(peer_id); + let mut list_to_filter = self.kademlia.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + )?; if let Some(ref mut mdns) = self.mdns { - list_to_filter.extend(mdns.addresses_of_peer(peer_id)); + list_to_filter.extend(mdns.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + )?); } if !self.allow_private_ip { @@ -522,7 +574,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { trace!(target: "sub-libp2p", "Addresses of {:?}: {:?}", peer_id, list); - list + Ok(list) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -603,8 +655,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { &mut self, peer_id: PeerId, connection_id: ConnectionId, - event: <::Handler as - ConnectionHandler>::OutEvent, + event: THandlerOutEvent, ) { self.kademlia.on_connection_handler_event(peer_id, connection_id, event); } @@ -613,10 +664,10 @@ impl NetworkBehaviour for DiscoveryBehaviour { &mut self, cx: &mut Context, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { // Immediately process the content of `discovered`. if let Some(ev) = self.pending_events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + return Poll::Ready(ToSwarm::GenerateEvent(ev)) } // Poll the stream that fires when we need to start a random Kademlia query. @@ -650,7 +701,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { if actually_started { let ev = DiscoveryOut::RandomKademliaStarted; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + return Poll::Ready(ToSwarm::GenerateEvent(ev)) } } } @@ -658,18 +709,18 @@ impl NetworkBehaviour for DiscoveryBehaviour { while let Poll::Ready(ev) = self.kademlia.poll(cx, params) { match ev { - NetworkBehaviourAction::GenerateEvent(ev) => match ev { + ToSwarm::GenerateEvent(ev) => match ev { KademliaEvent::RoutingUpdated { peer, .. } => { let ev = DiscoveryOut::Discovered(peer); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + return Poll::Ready(ToSwarm::GenerateEvent(ev)) }, KademliaEvent::UnroutablePeer { peer, .. } => { let ev = DiscoveryOut::UnroutablePeer(peer); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + return Poll::Ready(ToSwarm::GenerateEvent(ev)) }, KademliaEvent::RoutablePeer { peer, .. } => { let ev = DiscoveryOut::Discovered(peer); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + return Poll::Ready(ToSwarm::GenerateEvent(ev)) }, KademliaEvent::PendingRoutablePeer { .. } | KademliaEvent::InboundRequest { .. } => { @@ -777,7 +828,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { ) }, }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + return Poll::Ready(ToSwarm::GenerateEvent(ev)) }, KademliaEvent::OutboundQueryProgressed { result: QueryResult::PutRecord(res), @@ -799,7 +850,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { ) }, }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + return Poll::Ready(ToSwarm::GenerateEvent(ev)) }, KademliaEvent::OutboundQueryProgressed { result: QueryResult::RepublishRecord(res), @@ -821,24 +872,13 @@ impl NetworkBehaviour for DiscoveryBehaviour { warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e) }, }, - NetworkBehaviourAction::Dial { opts, handler } => - return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }), - NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }), - NetworkBehaviourAction::ReportObservedAddr { address, score } => - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }), - NetworkBehaviourAction::CloseConnection { peer_id, connection } => - return Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }), + ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }), + ToSwarm::NotifyHandler { peer_id, handler, event } => + return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }), + ToSwarm::ReportObservedAddr { address, score } => + return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), + ToSwarm::CloseConnection { peer_id, connection } => + return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), } } @@ -846,7 +886,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { if let Some(ref mut mdns) = self.mdns { while let Poll::Ready(ev) = mdns.poll(cx, params) { match ev { - NetworkBehaviourAction::GenerateEvent(event) => match event { + ToSwarm::GenerateEvent(event) => match event { mdns::Event::Discovered(list) => { if self.num_connections >= self.discovery_only_if_under_num { continue @@ -855,25 +895,21 @@ impl NetworkBehaviour for DiscoveryBehaviour { self.pending_events .extend(list.map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id))); if let Some(ev) = self.pending_events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + return Poll::Ready(ToSwarm::GenerateEvent(ev)) } }, mdns::Event::Expired(_) => {}, }, - NetworkBehaviourAction::Dial { .. } => { + ToSwarm::Dial { .. } => { unreachable!("mDNS never dials!"); }, - NetworkBehaviourAction::NotifyHandler { event, .. } => match event {}, /* `event` is an enum with no variant */ - NetworkBehaviourAction::ReportObservedAddr { address, score } => - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }), - NetworkBehaviourAction::CloseConnection { peer_id, connection } => - return Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }), + ToSwarm::NotifyHandler { event, .. } => match event {}, /* `event` is an */ + // enum with no + // variant + ToSwarm::ReportObservedAddr { address, score } => + return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), + ToSwarm::CloseConnection { peer_id, connection } => + return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), } } } @@ -912,9 +948,9 @@ mod tests { transport::{MemoryTransport, Transport}, upgrade, }, - identity::{ed25519, Keypair}, + identity::Keypair, noise, - swarm::{Executor, Swarm, SwarmEvent}, + swarm::{Executor, Swarm, SwarmBuilder, SwarmEvent}, yamux, Multiaddr, }; use sp_core::hash::H256; @@ -941,17 +977,14 @@ mod tests { .map(|i| { let keypair = Keypair::generate_ed25519(); - let noise_keys = - noise::Keypair::::new().into_authentic(&keypair).unwrap(); - let transport = MemoryTransport::new() .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(yamux::YamuxConfig::default()) + .authenticate(noise::Config::new(&keypair).unwrap()) + .multiplex(yamux::Config::default()) .boxed(); let behaviour = { - let mut config = DiscoveryConfig::new(keypair.public()); + let mut config = DiscoveryConfig::new(keypair.public().to_peer_id()); config .with_permanent_addresses(first_swarm_peer_id_and_addr.clone()) .allow_private_ip(true) @@ -963,12 +996,13 @@ mod tests { }; let runtime = tokio::runtime::Runtime::new().unwrap(); - let mut swarm = Swarm::with_executor( + let mut swarm = SwarmBuilder::with_executor( transport, behaviour, keypair.public().to_peer_id(), TokioExecutor(runtime), - ); + ) + .build(); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); @@ -1070,7 +1104,7 @@ mod tests { let mut discovery = { let keypair = Keypair::generate_ed25519(); - let mut config = DiscoveryConfig::new(keypair.public()); + let mut config = DiscoveryConfig::new(keypair.public().to_peer_id()); config .allow_private_ip(true) .allow_non_globals_in_dht(true) @@ -1080,11 +1114,7 @@ mod tests { }; let predictable_peer_id = |bytes: &[u8; 32]| { - Keypair::Ed25519(ed25519::Keypair::from( - ed25519::SecretKey::from_bytes(bytes.to_owned()).unwrap(), - )) - .public() - .to_peer_id() + Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id() }; let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001"); diff --git a/client/network/src/event.rs b/client/network/src/event.rs index 975fde0e40a28..9c1034ea3dc61 100644 --- a/client/network/src/event.rs +++ b/client/network/src/event.rs @@ -23,7 +23,7 @@ use crate::{types::ProtocolName, NotificationsSink}; use bytes::Bytes; use futures::channel::oneshot; -use libp2p::{core::PeerId, kad::record::Key}; +use libp2p::{kad::record::Key, PeerId}; use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake}; use sp_runtime::traits::Block as BlockT; diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 5374ac13435be..79023923e8b05 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -87,8 +87,6 @@ //! //! The following multiplexing protocols are supported: //! -//! - [Mplex](https://github.com/libp2p/specs/tree/master/mplex). Support for mplex will likely -//! be deprecated in the future. //! - [Yamux](https://github.com/hashicorp/yamux/blob/master/spec.md). //! //! ## Substreams @@ -262,7 +260,7 @@ pub mod utils; pub use event::{DhtEvent, Event, SyncEvent}; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; -pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig}; +pub use request_responses::{Config, IfDisconnected, RequestFailure}; pub use sc_network_common::{ role::ObservedRole, sync::{ diff --git a/client/network/src/peer_info.rs b/client/network/src/peer_info.rs index 3f769736ff10e..e4a5c5753a00c 100644 --- a/client/network/src/peer_info.rs +++ b/client/network/src/peer_info.rs @@ -17,25 +17,27 @@ // along with this program. If not, see . use crate::utils::interval; +use either::Either; use fnv::FnvHashMap; use futures::prelude::*; use libp2p::{ - core::{connection::ConnectionId, either::EitherOutput, ConnectedPoint, PeerId, PublicKey}, + core::{ConnectedPoint, Endpoint}, identify::{ Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent, Info as IdentifyInfo, }, + identity::PublicKey, ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent, Success as PingSuccess}, swarm::{ behaviour::{ AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, ListenFailure, }, - ConnectionHandler, IntoConnectionHandler, IntoConnectionHandlerSelect, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, + ConnectionDenied, ConnectionHandler, ConnectionId, IntoConnectionHandlerSelect, + NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, - Multiaddr, + Multiaddr, PeerId, }; use log::{debug, error, trace}; use smallvec::SmallVec; @@ -182,14 +184,72 @@ impl NetworkBehaviour for PeerInfoBehaviour { >; type OutEvent = PeerInfoEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - IntoConnectionHandler::select(self.ping.new_handler(), self.identify.new_handler()) + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.ping + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?; + self.identify + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) } - fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { - // Only `Discovery::addresses_of_peer` must be returning addresses to ensure that we - // don't return unwanted addresses. - Vec::new() + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + // Only `Discovery::handle_pending_outbound_connection` must be returning addresses to + // ensure that we don't return unwanted addresses. + Ok(Vec::new()) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + let ping_handler = self.ping.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + )?; + let identify_handler = self.identify.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + )?; + Ok(ping_handler.select(identify_handler)) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + let ping_handler = self.ping.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + )?; + let identify_handler = self.identify.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + )?; + Ok(ping_handler.select(identify_handler)) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -249,34 +309,39 @@ impl NetworkBehaviour for PeerInfoBehaviour { "Unknown connection to {:?} closed: {:?}", peer_id, endpoint); } }, - FromSwarm::DialFailure(DialFailure { peer_id, handler, error }) => { - let (ping_handler, identity_handler) = handler.into_inner(); + FromSwarm::DialFailure(DialFailure { peer_id, error, connection_id }) => { self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, - handler: ping_handler, error, + connection_id, })); self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, - handler: identity_handler, error, + connection_id, })); }, FromSwarm::ListenerClosed(e) => { self.ping.on_swarm_event(FromSwarm::ListenerClosed(e)); self.identify.on_swarm_event(FromSwarm::ListenerClosed(e)); }, - FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, handler }) => { - let (ping_handler, identity_handler) = handler.into_inner(); + FromSwarm::ListenFailure(ListenFailure { + local_addr, + send_back_addr, + error, + connection_id, + }) => { self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, - handler: ping_handler, + error, + connection_id, })); self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, - handler: identity_handler, + error, + connection_id, })); }, FromSwarm::ListenerError(e) => { @@ -326,13 +391,12 @@ impl NetworkBehaviour for PeerInfoBehaviour { &mut self, peer_id: PeerId, connection_id: ConnectionId, - event: <::Handler as - ConnectionHandler>::OutEvent, + event: THandlerOutEvent, ) { match event { - EitherOutput::First(event) => + Either::Left(event) => self.ping.on_connection_handler_event(peer_id, connection_id, event), - EitherOutput::Second(event) => + Either::Right(event) => self.identify.on_connection_handler_event(peer_id, connection_id, event), } } @@ -341,47 +405,37 @@ impl NetworkBehaviour for PeerInfoBehaviour { &mut self, cx: &mut Context, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { loop { match self.ping.poll(cx, params) { Poll::Pending => break, - Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => { + Poll::Ready(ToSwarm::GenerateEvent(ev)) => { if let PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } = ev { self.handle_ping_report(&peer, rtt) } }, - Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => { - let handler = - IntoConnectionHandler::select(handler, self.identify.new_handler()); - return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) - }, - Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }), + Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) => + return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, - event: EitherOutput::First(event), - }), - Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }), - Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }) => - return Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, + event: Either::Left(event), }), + Poll::Ready(ToSwarm::ReportObservedAddr { address, score }) => + return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), + Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => + return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), } } loop { match self.identify.poll(cx, params) { Poll::Pending => break, - Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => match event { + Poll::Ready(ToSwarm::GenerateEvent(event)) => match event { IdentifyEvent::Received { peer_id, info, .. } => { self.handle_identify_report(&peer_id, &info); let event = PeerInfoEvent::Identified { peer_id, info }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) + return Poll::Ready(ToSwarm::GenerateEvent(event)) }, IdentifyEvent::Error { peer_id, error } => { debug!(target: "sub-libp2p", "Identification with peer {:?} failed => {}", peer_id, error) @@ -389,26 +443,17 @@ impl NetworkBehaviour for PeerInfoBehaviour { IdentifyEvent::Pushed { .. } => {}, IdentifyEvent::Sent { .. } => {}, }, - Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => { - let handler = IntoConnectionHandler::select(self.ping.new_handler(), handler); - return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) - }, - Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }), + Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) => + return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, - event: EitherOutput::Second(event), - }), - Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }), - Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }) => - return Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, + event: Either::Right(event), }), + Poll::Ready(ToSwarm::ReportObservedAddr { address, score }) => + return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), + Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => + return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), } } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 29a90c0bccff1..e7214d814dda8 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -26,10 +26,10 @@ use bytes::Bytes; use codec::{DecodeAll, Encode}; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; use libp2p::{ - core::connection::ConnectionId, + core::Endpoint, swarm::{ - behaviour::FromSwarm, ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, + behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, Multiaddr, PeerId, }; @@ -378,14 +378,46 @@ impl NetworkBehaviour for Protocol { type ConnectionHandler = ::ConnectionHandler; type OutEvent = CustomMessageOutcome; - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.behaviour.new_handler() + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.behaviour.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) } - fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { - // Only `Discovery::addresses_of_peer` must be returning addresses to ensure that we - // don't return unwanted addresses. - Vec::new() + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + self.behaviour.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + ) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + // Only `Discovery::handle_pending_outbound_connection` must be returning addresses to + // ensure that we don't return unwanted addresses. + Ok(Vec::new()) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -396,8 +428,7 @@ impl NetworkBehaviour for Protocol { &mut self, peer_id: PeerId, connection_id: ConnectionId, - event: <::Handler as - ConnectionHandler>::OutEvent, + event: THandlerOutEvent, ) { self.behaviour.on_connection_handler_event(peer_id, connection_id, event); } @@ -406,7 +437,7 @@ impl NetworkBehaviour for Protocol { &mut self, cx: &mut std::task::Context, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { while let Poll::Ready(Some(validation_result)) = self.sync_substream_validations.poll_next_unpin(cx) { @@ -426,19 +457,14 @@ impl NetworkBehaviour for Protocol { let event = match self.behaviour.poll(cx, params) { Poll::Pending => return Poll::Pending, - Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, - Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => - return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }), - Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }), - Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }), - Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }) => - return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }), + Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev, + Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }), + Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) => + return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }), + Poll::Ready(ToSwarm::ReportObservedAddr { address, score }) => + return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), + Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => + return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), }; let outcome = match event { @@ -634,7 +660,7 @@ impl NetworkBehaviour for Protocol { }; if !matches!(outcome, CustomMessageOutcome::None) { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome)) + return Poll::Ready(ToSwarm::GenerateEvent(outcome)) } // This block can only be reached if an event was pulled from the behaviour and that diff --git a/client/network/src/protocol/notifications/behaviour.rs b/client/network/src/protocol/notifications/behaviour.rs index 9e93389389d29..7e56793939b55 100644 --- a/client/network/src/protocol/notifications/behaviour.rs +++ b/client/network/src/protocol/notifications/behaviour.rs @@ -18,7 +18,7 @@ use crate::{ protocol::notifications::handler::{ - self, NotificationsSink, NotifsHandlerIn, NotifsHandlerOut, NotifsHandlerProto, + self, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut, }, types::ProtocolName, }; @@ -27,13 +27,13 @@ use bytes::BytesMut; use fnv::FnvHashMap; use futures::prelude::*; use libp2p::{ - core::{connection::ConnectionId, Multiaddr, PeerId}, + core::{ConnectedPoint, Endpoint, Multiaddr}, swarm::{ behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, - handler::ConnectionHandler, - DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, + ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, PollParameters, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, + PeerId, }; use log::{error, trace, warn}; use parking_lot::RwLock; @@ -136,7 +136,7 @@ pub struct Notifications { next_incoming_index: sc_peerset::IncomingIndex, /// Events to produce from `poll()`. - events: VecDeque>, + events: VecDeque>, } /// Configuration for a notifications protocol. @@ -454,14 +454,14 @@ impl Notifications { trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id); let event = NotificationsOut::CustomProtocolClosed { peer_id: *peer_id, set_id }; - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(ToSwarm::GenerateEvent(event)); } for (connec_id, connec_state) in connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Open(_))) { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id: *peer_id, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Close { protocol_index: set_id.into() }, @@ -473,7 +473,7 @@ impl Notifications { connections.iter_mut().filter(|(_, s)| matches!(s, ConnectionState::Opening)) { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id: *peer_id, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Close { protocol_index: set_id.into() }, @@ -515,7 +515,7 @@ impl Notifications { .filter(|(_, s)| matches!(s, ConnectionState::OpenDesiredByRemote)) { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", peer_id, *connec_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id: *peer_id, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Close { protocol_index: set_id.into() }, @@ -548,7 +548,6 @@ impl Notifications { /// Function that is called when the peerset wants us to connect to a peer. fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: sc_peerset::SetId) { // If `PeerId` is unknown to us, insert an entry, start dialing, and return early. - let handler = self.new_handler(); let mut occ_entry = match self.peers.entry((peer_id, set_id)) { Entry::Occupied(entry) => entry, Entry::Vacant(entry) => { @@ -560,10 +559,7 @@ impl Notifications { set_id, ); trace!(target: "sub-libp2p", "Libp2p <= Dial {}", entry.key().0); - self.events.push_back(NetworkBehaviourAction::Dial { - opts: entry.key().0.into(), - handler, - }); + self.events.push_back(ToSwarm::Dial { opts: entry.key().0.into() }); entry.insert(PeerState::Requested); return }, @@ -595,10 +591,7 @@ impl Notifications { set_id, ); trace!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key()); - self.events.push_back(NetworkBehaviourAction::Dial { - opts: occ_entry.key().0.into(), - handler, - }); + self.events.push_back(ToSwarm::Dial { opts: occ_entry.key().0.into() }); *occ_entry.into_mut() = PeerState::Requested; }, @@ -646,7 +639,7 @@ impl Notifications { trace!(target: "sub-libp2p", "PSM => Connect({}, {:?}): Enabling connections.", occ_entry.key().0, set_id); trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, *connec_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, @@ -720,7 +713,7 @@ impl Notifications { { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", occ_entry.key(), *connec_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id: occ_entry.key().0, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, @@ -803,7 +796,7 @@ impl Notifications { trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", entry.key().0, set_id); let event = NotificationsOut::CustomProtocolClosed { peer_id: entry.key().0, set_id }; - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(ToSwarm::GenerateEvent(event)); } for (connec_id, connec_state) in @@ -811,7 +804,7 @@ impl Notifications { { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", entry.key(), *connec_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id: entry.key().0, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Close { protocol_index: set_id.into() }, @@ -824,7 +817,7 @@ impl Notifications { { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", entry.key(), *connec_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id: entry.key().0, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Close { protocol_index: set_id.into() }, @@ -915,7 +908,7 @@ impl Notifications { { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", incoming.peer_id, *connec_id, incoming.set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id: incoming.peer_id, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() }, @@ -975,7 +968,7 @@ impl Notifications { { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Close({:?})", incoming.peer_id, connec_id, incoming.set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id: incoming.peer_id, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Close { protocol_index: incoming.set_id.into() }, @@ -993,15 +986,57 @@ impl Notifications { } impl NetworkBehaviour for Notifications { - type ConnectionHandler = NotifsHandlerProto; + type ConnectionHandler = NotifsHandler; type OutEvent = NotificationsOut; - fn new_handler(&mut self) -> Self::ConnectionHandler { - NotifsHandlerProto::new(self.notif_protocols.clone()) + fn handle_pending_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + Ok(()) } - fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { - Vec::new() + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(Vec::new()) + } + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(NotifsHandler::new( + peer, + ConnectedPoint::Listener { + local_addr: local_addr.clone(), + send_back_addr: remote_addr.clone(), + }, + self.notif_protocols.clone(), + )) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(NotifsHandler::new( + peer, + ConnectedPoint::Dialer { address: addr.clone(), role_override }, + self.notif_protocols.clone(), + )) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -1022,7 +1057,7 @@ impl NetworkBehaviour for Notifications { peer_id, set_id, endpoint ); trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, connection_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, @@ -1276,9 +1311,7 @@ impl NetworkBehaviour for Notifications { set_id, notifications_sink: replacement_sink, }; - self.events.push_back( - NetworkBehaviourAction::GenerateEvent(event), - ); + self.events.push_back(ToSwarm::GenerateEvent(event)); } } else { trace!( @@ -1289,9 +1322,7 @@ impl NetworkBehaviour for Notifications { peer_id, set_id, }; - self.events.push_back( - NetworkBehaviourAction::GenerateEvent(event), - ); + self.events.push_back(ToSwarm::GenerateEvent(event)); } } } else { @@ -1434,8 +1465,7 @@ impl NetworkBehaviour for Notifications { &mut self, peer_id: PeerId, connection_id: ConnectionId, - event: <::Handler as - ConnectionHandler>::OutEvent, + event: THandlerOutEvent, ) { match event { NotifsHandlerOut::OpenDesiredByRemote { protocol_index } => { @@ -1502,7 +1532,7 @@ impl NetworkBehaviour for Notifications { if let ConnectionState::Closed = *connec_state { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, connection_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, @@ -1583,7 +1613,7 @@ impl NetworkBehaviour for Notifications { if let ConnectionState::Closed = *connec_state { trace!(target: "sub-libp2p", "Handler({:?}, {:?}) <= Open({:?})", peer_id, connection_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, @@ -1668,7 +1698,7 @@ impl NetworkBehaviour for Notifications { connections[pos].1 = ConnectionState::Closing; trace!(target: "sub-libp2p", "Handler({}, {:?}) <= Close({:?})", peer_id, connection_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), event: NotifsHandlerIn::Close { protocol_index: set_id.into() }, @@ -1686,7 +1716,7 @@ impl NetworkBehaviour for Notifications { set_id, notifications_sink: replacement_sink, }; - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(ToSwarm::GenerateEvent(event)); } *entry.into_mut() = PeerState::Enabled { connections }; @@ -1706,7 +1736,7 @@ impl NetworkBehaviour for Notifications { trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id); let event = NotificationsOut::CustomProtocolClosed { peer_id, set_id }; - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(ToSwarm::GenerateEvent(event)); } }, @@ -1790,7 +1820,7 @@ impl NetworkBehaviour for Notifications { received_handshake, notifications_sink: notifications_sink.clone(), }; - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(ToSwarm::GenerateEvent(event)); } *connec_state = ConnectionState::Open(notifications_sink); } else if let Some((_, connec_state)) = @@ -1937,7 +1967,7 @@ impl NetworkBehaviour for Notifications { ); let event = NotificationsOut::Notification { peer_id, set_id, message }; - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(ToSwarm::GenerateEvent(event)); } else { trace!( target: "sub-libp2p", @@ -1956,7 +1986,7 @@ impl NetworkBehaviour for Notifications { &mut self, cx: &mut Context, _params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event) } @@ -1988,8 +2018,6 @@ impl NetworkBehaviour for Notifications { while let Poll::Ready(Some((delay_id, peer_id, set_id))) = Pin::new(&mut self.delays).poll_next(cx) { - let handler = self.new_handler(); - let peer_state = match self.peers.get_mut(&(peer_id, set_id)) { Some(s) => s, // We intentionally never remove elements from `delays`, and it may @@ -2005,8 +2033,7 @@ impl NetworkBehaviour for Notifications { PeerState::PendingRequest { timer, .. } if *timer == delay_id => { trace!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id); - self.events - .push_back(NetworkBehaviourAction::Dial { opts: peer_id.into(), handler }); + self.events.push_back(ToSwarm::Dial { opts: peer_id.into() }); *peer_state = PeerState::Requested; }, @@ -2019,7 +2046,7 @@ impl NetworkBehaviour for Notifications { { trace!(target: "sub-libp2p", "Handler({}, {:?}) <= Open({:?}) (ban expired)", peer_id, *connec_id, set_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(*connec_id), event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, @@ -2055,13 +2082,11 @@ impl NetworkBehaviour for Notifications { } #[cfg(test)] +#[allow(deprecated)] mod tests { use super::*; use crate::protocol::notifications::handler::tests::*; - use libp2p::{ - core::ConnectedPoint, - swarm::{behaviour::FromSwarm, AddressRecord}, - }; + use libp2p::swarm::AddressRecord; use std::{collections::HashSet, iter}; impl PartialEq for ConnectionState { @@ -2223,7 +2248,7 @@ mod tests { fn remote_opens_connection_and_substream() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2273,7 +2298,7 @@ mod tests { async fn disconnect_remote_substream_before_handled_by_peerset() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2310,7 +2335,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2344,8 +2369,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -2374,7 +2399,7 @@ mod tests { fn peerset_connect_incoming() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2409,7 +2434,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2455,7 +2480,7 @@ mod tests { fn peerset_disconnect_enabled() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2505,7 +2530,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2534,8 +2559,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -2557,7 +2582,7 @@ mod tests { fn peerset_accept_peer_not_alive() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2604,8 +2629,8 @@ mod tests { fn secondary_connection_peer_state_incoming() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); - let conn2 = ConnectionId::new(1usize); + let conn = ConnectionId::new_unchecked(0); + let conn2 = ConnectionId::new_unchecked(1); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2659,7 +2684,7 @@ mod tests { fn close_connection_for_disabled_peer() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2681,8 +2706,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -2693,7 +2718,7 @@ mod tests { fn close_connection_for_incoming_peer_one_connection() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2722,8 +2747,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -2738,8 +2763,8 @@ mod tests { fn close_connection_for_incoming_peer_two_connections() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); - let conn1 = ConnectionId::new(1usize); + let conn = ConnectionId::new_unchecked(0); + let conn1 = ConnectionId::new_unchecked(1); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2789,8 +2814,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -2807,7 +2832,7 @@ mod tests { fn connection_and_substream_open() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2851,7 +2876,7 @@ mod tests { assert!(std::matches!( notif.events[notif.events.len() - 1], - NetworkBehaviourAction::GenerateEvent(NotificationsOut::CustomProtocolOpen { .. }) + ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolOpen { .. }) )); } @@ -2859,8 +2884,8 @@ mod tests { fn connection_closed_sink_replaced() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn1 = ConnectionId::new(0usize); - let conn2 = ConnectionId::new(1usize); + let conn1 = ConnectionId::new_unchecked(0); + let conn2 = ConnectionId::new_unchecked(1); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -2931,8 +2956,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn1, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -2947,7 +2972,7 @@ mod tests { assert!(std::matches!( notif.events[notif.events.len() - 1], - NetworkBehaviourAction::GenerateEvent(NotificationsOut::CustomProtocolReplaced { .. }) + ToSwarm::GenerateEvent(NotificationsOut::CustomProtocolReplaced { .. }) )); } @@ -2963,8 +2988,8 @@ mod tests { notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure { peer_id: Some(peer), - handler: NotifsHandlerProto::new(vec![]), error: &libp2p::swarm::DialError::Banned, + connection_id: ConnectionId::new_unchecked(1337), })); if let Some(PeerState::Backoff { timer_deadline, .. }) = notif.peers.get(&(peer, set_id)) { @@ -2978,7 +3003,7 @@ mod tests { async fn write_notification() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3028,7 +3053,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3057,8 +3082,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3076,7 +3101,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); let set_id = sc_peerset::SetId::from(0); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3102,7 +3127,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3131,8 +3156,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3148,8 +3173,8 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn1 = ConnectionId::new(0usize); - let conn2 = ConnectionId::new(0usize); + let conn1 = ConnectionId::new_unchecked(0); + let conn2 = ConnectionId::new_unchecked(1); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3194,8 +3219,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn1, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected.clone(), vec![]), remaining_established: 0usize, }, )); @@ -3208,8 +3233,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn2, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3221,7 +3246,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); let set_id = sc_peerset::SetId::from(0); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3259,8 +3284,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3271,8 +3296,8 @@ mod tests { fn two_connections_inactive_connection_gets_closed_peer_state_is_still_incoming() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn1 = ConnectionId::new(0usize); - let conn2 = ConnectionId::new(1usize); + let conn1 = ConnectionId::new_unchecked(0); + let conn2 = ConnectionId::new_unchecked(1); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3314,8 +3339,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn2, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3326,8 +3351,8 @@ mod tests { fn two_connections_active_connection_gets_closed_peer_state_is_disabled() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn1 = ConnectionId::new(0usize); - let conn2 = ConnectionId::new(1usize); + let conn1 = ConnectionId::new_unchecked(0); + let conn2 = ConnectionId::new_unchecked(1); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3372,8 +3397,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn1, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3384,8 +3409,8 @@ mod tests { fn inject_connection_closed_for_active_connection() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn1 = ConnectionId::new(0usize); - let conn2 = ConnectionId::new(1usize); + let conn1 = ConnectionId::new_unchecked(0); + let conn2 = ConnectionId::new_unchecked(1); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3441,8 +3466,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn1, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3453,7 +3478,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3482,8 +3507,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3500,8 +3525,8 @@ mod tests { let now = Instant::now(); notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure { peer_id: Some(peer), - handler: NotifsHandlerProto::new(vec![]), error: &libp2p::swarm::DialError::Banned, + connection_id: ConnectionId::new_unchecked(0), })); if let Some(PeerState::PendingRequest { ref timer_deadline, .. }) = @@ -3516,8 +3541,8 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); let set_id = sc_peerset::SetId::from(0); - let conn1 = ConnectionId::new(0usize); - let conn2 = ConnectionId::new(1usize); + let conn1 = ConnectionId::new_unchecked(0); + let conn2 = ConnectionId::new_unchecked(1); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3570,7 +3595,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); let set_id = sc_peerset::SetId::from(0); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3599,8 +3624,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3647,7 +3672,7 @@ mod tests { async fn reschedule_disabled_pending_enable_when_connection_not_closed() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3763,7 +3788,7 @@ mod tests { fn peerset_report_connect_with_enabled_peer() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3815,7 +3840,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3872,7 +3897,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3901,8 +3926,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -3925,7 +3950,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); let set_id = sc_peerset::SetId::from(0); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3960,7 +3985,7 @@ mod tests { fn peerset_report_accept_incoming_peer() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4001,7 +4026,7 @@ mod tests { fn peerset_report_accept_not_incoming_peer() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4058,9 +4083,9 @@ mod tests { notif.on_swarm_event(FromSwarm::ConnectionClosed( libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, - connection_id: ConnectionId::new(0usize), - endpoint: &endpoint, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &endpoint), + connection_id: ConnectionId::new_unchecked(0), + endpoint: &endpoint.clone(), + handler: NotifsHandler::new(peer, endpoint, vec![]), remaining_established: 0usize, }, )); @@ -4102,7 +4127,7 @@ mod tests { fn reject_non_active_connection() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4140,7 +4165,7 @@ mod tests { fn reject_non_existent_peer_but_alive_connection() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4180,7 +4205,7 @@ mod tests { fn inject_non_existent_connection_closed_for_incoming_peer() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4209,9 +4234,9 @@ mod tests { notif.on_swarm_event(FromSwarm::ConnectionClosed( libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, - connection_id: ConnectionId::new(1337usize), - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + connection_id: ConnectionId::new_unchecked(1337), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -4224,7 +4249,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4244,9 +4269,9 @@ mod tests { notif.on_swarm_event(FromSwarm::ConnectionClosed( libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, - connection_id: ConnectionId::new(1337usize), - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + connection_id: ConnectionId::new_unchecked(1337), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -4259,7 +4284,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4295,9 +4320,9 @@ mod tests { notif.on_swarm_event(FromSwarm::ConnectionClosed( libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, - connection_id: ConnectionId::new(1337usize), - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + connection_id: ConnectionId::new_unchecked(1337), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -4309,7 +4334,7 @@ mod tests { fn inject_connection_closed_for_incoming_peer_state_mismatch() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4340,8 +4365,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -4353,7 +4378,7 @@ mod tests { fn inject_connection_closed_for_enabled_state_mismatch() { let (mut notif, _peerset) = development_notifs(); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let set_id = sc_peerset::SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4386,9 +4411,9 @@ mod tests { notif.on_swarm_event(FromSwarm::ConnectionClosed( libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, - connection_id: ConnectionId::new(1337usize), - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + connection_id: ConnectionId::new_unchecked(1337), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -4401,7 +4426,7 @@ mod tests { let (mut notif, _peerset) = development_notifs(); let set_id = sc_peerset::SetId::from(0); let peer = PeerId::random(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4430,8 +4455,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected.clone(), vec![]), remaining_established: 0usize, }, )); @@ -4441,8 +4466,8 @@ mod tests { libp2p::swarm::behaviour::ConnectionClosed { peer_id: peer, connection_id: conn, - endpoint: &connected, - handler: NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + endpoint: &connected.clone(), + handler: NotifsHandler::new(peer, connected, vec![]), remaining_established: 0usize, }, )); @@ -4453,7 +4478,7 @@ mod tests { #[cfg(debug_assertions)] fn open_result_ok_non_existent_peer() { let (mut notif, _peerset) = development_notifs(); - let conn = ConnectionId::new(0usize); + let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), diff --git a/client/network/src/protocol/notifications/handler.rs b/client/network/src/protocol/notifications/handler.rs index 9d8d98fd8cf27..665b646ecdcfa 100644 --- a/client/network/src/protocol/notifications/handler.rs +++ b/client/network/src/protocol/notifications/handler.rs @@ -72,11 +72,12 @@ use futures::{ prelude::*, }; use libp2p::{ - core::{ConnectedPoint, PeerId}, + core::ConnectedPoint, swarm::{ - handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, - KeepAlive, NegotiatedSubstream, SubstreamProtocol, + handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }, + PeerId, }; use log::error; use parking_lot::{Mutex, RwLock}; @@ -105,19 +106,6 @@ const OPEN_TIMEOUT: Duration = Duration::from_secs(10); /// open substreams. const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5); -/// Implements the `IntoConnectionHandler` trait of libp2p. -/// -/// Every time a connection with a remote starts, an instance of this struct is created and -/// sent to a background task dedicated to this connection. Once the connection is established, -/// it is turned into a [`NotifsHandler`]. -/// -/// See the documentation at the module level for more information. -pub struct NotifsHandlerProto { - /// Name of protocols, prototypes for upgrades for inbound substreams, and the message we - /// send or respond with in the handshake. - protocols: Vec, -} - /// The actual handler once the connection has been established. /// /// See the documentation at the module level for more information. @@ -140,6 +128,30 @@ pub struct NotifsHandler { >, } +impl NotifsHandler { + /// Creates new [`NotifsHandler`]. + pub fn new(peer_id: PeerId, endpoint: ConnectedPoint, protocols: Vec) -> Self { + Self { + protocols: protocols + .into_iter() + .map(|config| { + let in_upgrade = NotificationsIn::new( + config.name.clone(), + config.fallback_names.clone(), + config.max_notification_size, + ); + + Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } } + }) + .collect(), + peer_id, + endpoint, + when_connection_open: Instant::now(), + events_queue: VecDeque::with_capacity(16), + } + } +} + /// Configuration for a notifications protocol. #[derive(Debug, Clone)] pub struct ProtocolConfig { @@ -223,45 +235,6 @@ enum State { }, } -impl IntoConnectionHandler for NotifsHandlerProto { - type Handler = NotifsHandler; - - fn inbound_protocol(&self) -> UpgradeCollec { - self.protocols - .iter() - .map(|cfg| { - NotificationsIn::new( - cfg.name.clone(), - cfg.fallback_names.clone(), - cfg.max_notification_size, - ) - }) - .collect::>() - } - - fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { - NotifsHandler { - protocols: self - .protocols - .into_iter() - .map(|config| { - let in_upgrade = NotificationsIn::new( - config.name.clone(), - config.fallback_names.clone(), - config.max_notification_size, - ); - - Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } } - }) - .collect(), - peer_id: *peer_id, - endpoint: connected_point.clone(), - when_connection_open: Instant::now(), - events_queue: VecDeque::with_capacity(16), - } - } -} - /// Event that can be received by a `NotifsHandler`. #[derive(Debug, Clone)] pub enum NotifsHandlerIn { @@ -461,18 +434,6 @@ pub enum NotifsHandlerError { SyncNotificationsClogged, } -impl NotifsHandlerProto { - /// Builds a new handler. - /// - /// `list` is a list of notification protocols names, the message to send as part of the - /// handshake, and the maximum allowed size of a notification. At the moment, the message - /// is always the same whether we open a substream ourselves or respond to handshake from - /// the remote. - pub fn new(list: impl Into>) -> Self { - Self { protocols: list.into() } - } -} - impl ConnectionHandler for NotifsHandler { type InEvent = NotifsHandlerIn; type OutEvent = NotifsHandlerOut; @@ -954,7 +915,6 @@ pub mod tests { .await } } - struct MockSubstream { pub rx: mpsc::Receiver>, pub tx: mpsc::Sender>, diff --git a/client/network/src/protocol/notifications/tests.rs b/client/network/src/protocol/notifications/tests.rs index 9ca6974e4cdde..d13a4fcfa3809 100644 --- a/client/network/src/protocol/notifications/tests.rs +++ b/client/network/src/protocol/notifications/tests.rs @@ -22,11 +22,12 @@ use crate::protocol::notifications::{Notifications, NotificationsOut, ProtocolCo use futures::prelude::*; use libp2p::{ - core::{connection::ConnectionId, transport::MemoryTransport, upgrade}, + core::{transport::MemoryTransport, upgrade, Endpoint}, identity, noise, swarm::{ - behaviour::FromSwarm, ConnectionHandler, Executor, IntoConnectionHandler, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, Swarm, SwarmEvent, + behaviour::FromSwarm, ConnectionDenied, ConnectionId, Executor, NetworkBehaviour, + PollParameters, Swarm, SwarmBuilder, SwarmEvent, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, }, yamux, Multiaddr, PeerId, Transport, }; @@ -57,13 +58,10 @@ fn build_nodes() -> (Swarm, Swarm) { for index in 0..2 { let keypair = keypairs[index].clone(); - let noise_keys = - noise::Keypair::::new().into_authentic(&keypair).unwrap(); - let transport = MemoryTransport::new() .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(yamux::YamuxConfig::default()) + .authenticate(noise::Config::new(&keypair).unwrap()) + .multiplex(yamux::Config::default()) .timeout(Duration::from_secs(20)) .boxed(); @@ -105,12 +103,13 @@ fn build_nodes() -> (Swarm, Swarm) { }; let runtime = tokio::runtime::Runtime::new().unwrap(); - let mut swarm = Swarm::with_executor( + let mut swarm = SwarmBuilder::with_executor( transport, behaviour, keypairs[index].public().to_peer_id(), TokioExecutor(runtime), - ); + ) + .build(); swarm.listen_on(addrs[index].clone()).unwrap(); out.push(swarm); } @@ -146,18 +145,63 @@ impl NetworkBehaviour for CustomProtoWithAddr { type ConnectionHandler = ::ConnectionHandler; type OutEvent = ::OutEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.inner.new_handler() + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.inner + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) } - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - let mut list = self.inner.addresses_of_peer(peer_id); - for (p, a) in self.addrs.iter() { - if p == peer_id { - list.push(a.clone()); + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let mut list = self.inner.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + )?; + if let Some(peer_id) = maybe_peer { + for (p, a) in self.addrs.iter() { + if *p == peer_id { + list.push(a.clone()); + } } } - list + Ok(list) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + self.inner + .handle_established_outbound_connection(connection_id, peer, addr, role_override) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -168,8 +212,7 @@ impl NetworkBehaviour for CustomProtoWithAddr { &mut self, peer_id: PeerId, connection_id: ConnectionId, - event: <::Handler as - ConnectionHandler>::OutEvent, + event: THandlerOutEvent, ) { self.inner.on_connection_handler_event(peer_id, connection_id, event); } @@ -178,7 +221,7 @@ impl NetworkBehaviour for CustomProtoWithAddr { &mut self, cx: &mut Context, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { self.inner.poll(cx, params) } } diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 4628b0191a2e8..e0f4074e0a22e 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -18,8 +18,8 @@ //! Collection of request-response protocols. //! -//! The [`RequestResponse`] struct defined in this module provides support for zero or more -//! so-called "request-response" protocols. +//! The [`RequestResponsesBehaviour`] struct defined in this module provides support for zero or +//! more so-called "request-response" protocols. //! //! A request-response protocol works in the following way: //! @@ -41,17 +41,15 @@ use futures::{ prelude::*, }; use libp2p::{ - core::{connection::ConnectionId, Multiaddr, PeerId}, - request_response::{ - handler::RequestResponseHandler, ProtocolSupport, RequestResponse, RequestResponseCodec, - RequestResponseEvent, RequestResponseMessage, ResponseChannel, - }, + core::{Endpoint, Multiaddr}, + request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel}, swarm::{ - behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure}, + behaviour::{ConnectionClosed, FromSwarm}, handler::multi::MultiHandler, - ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, - PollParameters, + ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }, + PeerId, }; use sc_peerset::{PeersetHandle, BANNED_THRESHOLD}; @@ -64,9 +62,7 @@ use std::{ time::{Duration, Instant}, }; -pub use libp2p::request_response::{ - InboundFailure, OutboundFailure, RequestId, RequestResponseConfig, -}; +pub use libp2p::request_response::{Config, InboundFailure, OutboundFailure, RequestId}; /// Error in a request. #[derive(Debug, thiserror::Error)] @@ -260,14 +256,13 @@ impl From<(ProtocolName, RequestId)> for ProtocolRequestId { /// Implementation of `NetworkBehaviour` that provides support for request-response protocols. pub struct RequestResponsesBehaviour { /// The multiple sub-protocols, by name. - /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional + /// + /// Contains the underlying libp2p request-response [`Behaviour`], plus an optional /// "response builder" used to build responses for incoming requests. - protocols: HashMap< - ProtocolName, - (RequestResponse, Option>), - >, + protocols: + HashMap, Option>)>, - /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply. + /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. pending_requests: HashMap, RequestFailure>>)>, @@ -324,7 +319,7 @@ impl RequestResponsesBehaviour { ) -> Result { let mut protocols = HashMap::new(); for protocol in list { - let mut cfg = RequestResponseConfig::default(); + let mut cfg = Config::default(); cfg.set_connection_keep_alive(Duration::from_secs(10)); cfg.set_request_timeout(protocol.request_timeout); @@ -334,7 +329,7 @@ impl RequestResponsesBehaviour { ProtocolSupport::Outbound }; - let rq_rp = RequestResponse::new( + let rq_rp = Behaviour::new( GenericCodec { max_request_size: protocol.max_request_size, max_response_size: protocol.max_response_size, @@ -401,50 +396,79 @@ impl RequestResponsesBehaviour { ); } } - - fn new_handler_with_replacement( - &mut self, - protocol: String, - handler: RequestResponseHandler, - ) -> ::ConnectionHandler { - let mut handlers: HashMap<_, _> = self - .protocols - .iter_mut() - .map(|(p, (r, _))| (p.to_string(), NetworkBehaviour::new_handler(r))) - .collect(); - - if let Some(h) = handlers.get_mut(&protocol) { - *h = handler - } - - MultiHandler::try_from_iter(handlers).expect( - "Protocols are in a HashMap and there can be at most one handler per protocol name, \ - which is the only possible error; qed", - ) - } } impl NetworkBehaviour for RequestResponsesBehaviour { - type ConnectionHandler = MultiHandler< - String, - as NetworkBehaviour>::ConnectionHandler, - >; + type ConnectionHandler = + MultiHandler as NetworkBehaviour>::ConnectionHandler>; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - let iter = self - .protocols - .iter_mut() - .map(|(p, (r, _))| (p.to_string(), NetworkBehaviour::new_handler(r))); + fn handle_pending_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + Ok(()) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(Vec::new()) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { + if let Ok(handler) = r.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); - MultiHandler::try_from_iter(iter).expect( + Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ which is the only possible error; qed", - ) + )) } - fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { - Vec::new() + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { + if let Ok(handler) = + r.handle_established_outbound_connection(connection_id, peer, addr, role_override) + { + Some((p.to_string(), handler)) + } else { + None + } + }); + + Ok(MultiHandler::try_from_iter(iter).expect( + "Protocols are in a HashMap and there can be at most one handler per protocol name, \ + which is the only possible error; qed", + )) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -477,42 +501,17 @@ impl NetworkBehaviour for RequestResponsesBehaviour { ) } }, - FromSwarm::DialFailure(DialFailure { peer_id, error, handler }) => { - for (p_name, p_handler) in handler.into_iter() { - if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { - proto.on_swarm_event(FromSwarm::DialFailure(DialFailure { - peer_id, - handler: p_handler, - error, - })); - } else { - log::error!( - target: "sub-libp2p", - "on_swarm_event/dial_failure: no request-response instance registered for protocol {:?}", - p_name, - ) - } - } - }, + FromSwarm::DialFailure(e) => + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(p, FromSwarm::DialFailure(e)); + }, FromSwarm::ListenerClosed(e) => for (p, _) in self.protocols.values_mut() { NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerClosed(e)); }, - FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, handler }) => - for (p_name, p_handler) in handler.into_iter() { - if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { - proto.on_swarm_event(FromSwarm::ListenFailure(ListenFailure { - local_addr, - send_back_addr, - handler: p_handler, - })); - } else { - log::error!( - target: "sub-libp2p", - "on_swarm_event/listen_failure: no request-response instance registered for protocol {:?}", - p_name, - ) - } + FromSwarm::ListenFailure(e) => + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenFailure(e)); }, FromSwarm::ListenerError(e) => for (p, _) in self.protocols.values_mut() { @@ -549,25 +548,25 @@ impl NetworkBehaviour for RequestResponsesBehaviour { &mut self, peer_id: PeerId, connection_id: ConnectionId, - (p_name, event): <::Handler as - ConnectionHandler>::OutEvent, + event: THandlerOutEvent, ) { - if let Some((proto, _)) = self.protocols.get_mut(&*p_name) { - return proto.on_connection_handler_event(peer_id, connection_id, event) + let p_name = event.0; + if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { + return proto.on_connection_handler_event(peer_id, connection_id, event.1) + } else { + log::warn!( + target: "sub-libp2p", + "on_connection_handler_event: no request-response instance registered for protocol {:?}", + p_name + ); } - - log::warn!( - target: "sub-libp2p", - "on_connection_handler_event: no request-response instance registered for protocol {:?}", - p_name - ); } fn poll( &mut self, cx: &mut Context, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> { 'poll_all: loop { if let Some(message_request) = self.message_request.take() { // Now we can can poll `MessageRequest` until we get the reputation @@ -621,8 +620,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // initialization. if let Some(mut resp_builder) = resp_builder { // If the response builder is too busy, silently drop `tx`. This - // will be reported by the corresponding `RequestResponse` through - // an `InboundFailure::Omission` event. + // will be reported by the corresponding request-response [`Behaviour`] + // through an `InboundFailure::Omission` event. let _ = resp_builder.try_send(IncomingRequest { peer, payload: request, @@ -674,7 +673,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { if protocol.send_response(inner_channel, Ok(payload)).is_err() { // Note: Failure is handled further below when receiving - // `InboundFailure` event from `RequestResponse` behaviour. + // `InboundFailure` event from request-response [`Behaviour`]. log::debug!( target: "sub-libp2p", "Failed to send response for {:?} on protocol {:?} due to a \ @@ -690,9 +689,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } if !reputation_changes.is_empty() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - Event::ReputationChanges { peer, changes: reputation_changes }, - )) + return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges { + peer, + changes: reputation_changes, + })) } } @@ -701,44 +701,35 @@ impl NetworkBehaviour for RequestResponsesBehaviour { while let Poll::Ready(ev) = behaviour.poll(cx, params) { let ev = match ev { // Main events we are interested in. - NetworkBehaviourAction::GenerateEvent(ev) => ev, + ToSwarm::GenerateEvent(ev) => ev, // Other events generated by the underlying behaviour are transparently // passed through. - NetworkBehaviourAction::Dial { opts, handler } => { + ToSwarm::Dial { opts } => { if opts.get_peer_id().is_none() { log::error!( "The request-response isn't supposed to start dialing addresses" ); } - let protocol = protocol.to_string(); - let handler = self.new_handler_with_replacement(protocol, handler); - return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) + return Poll::Ready(ToSwarm::Dial { opts }) }, - NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler, event } => + return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event: ((*protocol).to_string(), event), }), - NetworkBehaviourAction::ReportObservedAddr { address, score } => - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }), - NetworkBehaviourAction::CloseConnection { peer_id, connection } => - return Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }), + ToSwarm::ReportObservedAddr { address, score } => + return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), + ToSwarm::CloseConnection { peer_id, connection } => + return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), }; match ev { // Received a request from a remote. - RequestResponseEvent::Message { + request_response::Event::Message { peer, - message: - RequestResponseMessage::Request { request_id, request, channel, .. }, + message: Message::Request { request_id, request, channel, .. }, } => { self.pending_responses_arrival_time .insert((protocol.clone(), request_id).into(), Instant::now()); @@ -765,9 +756,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { }, // Received a response from a remote to one of our requests. - RequestResponseEvent::Message { + request_response::Event::Message { peer, - message: RequestResponseMessage::Response { request_id, response }, + message: Message::Response { request_id, response }, .. } => { let (started, delivered) = match self @@ -798,12 +789,15 @@ impl NetworkBehaviour for RequestResponsesBehaviour { result: delivered, }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)) + return Poll::Ready(ToSwarm::GenerateEvent(out)) }, // One of our requests has failed. - RequestResponseEvent::OutboundFailure { - peer, request_id, error, .. + request_response::Event::OutboundFailure { + peer, + request_id, + error, + .. } => { let started = match self .pending_requests @@ -841,12 +835,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { result: Err(RequestFailure::Network(error)), }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)) + return Poll::Ready(ToSwarm::GenerateEvent(out)) }, // An inbound request failed, either while reading the request or due to // failing to send a response. - RequestResponseEvent::InboundFailure { + request_response::Event::InboundFailure { request_id, peer, error, .. } => { self.pending_responses_arrival_time @@ -857,11 +851,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour { protocol: protocol.clone(), result: Err(ResponseFailure::Network(error)), }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)) + return Poll::Ready(ToSwarm::GenerateEvent(out)) }, // A response to an inbound request has been sent. - RequestResponseEvent::ResponseSent { request_id, peer } => { + request_response::Event::ResponseSent { request_id, peer } => { let arrival_time = self .pending_responses_arrival_time .remove(&(protocol.clone(), request_id).into()) @@ -886,7 +880,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { result: Ok(arrival_time), }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)) + return Poll::Ready(ToSwarm::GenerateEvent(out)) }, }; } @@ -913,7 +907,7 @@ pub enum ResponseFailure { Network(InboundFailure), } -/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned +/// Implements the libp2p [`Codec`] trait. Defines how streams of bytes are turned /// into requests and responses and vice-versa. #[derive(Debug, Clone)] #[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler. @@ -923,7 +917,7 @@ pub struct GenericCodec { } #[async_trait::async_trait] -impl RequestResponseCodec for GenericCodec { +impl Codec for GenericCodec { type Protocol = Vec; type Request = Vec; type Response = Result, ()>; @@ -1054,7 +1048,7 @@ mod tests { }, identity::Keypair, noise, - swarm::{Executor, Swarm, SwarmEvent}, + swarm::{Executor, Swarm, SwarmBuilder, SwarmEvent}, Multiaddr, }; use sc_peerset::{Peerset, PeersetConfig, SetConfig}; @@ -1072,13 +1066,10 @@ mod tests { ) -> (Swarm, Multiaddr, Peerset) { let keypair = Keypair::generate_ed25519(); - let noise_keys = - noise::Keypair::::new().into_authentic(&keypair).unwrap(); - let transport = MemoryTransport::new() .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(libp2p::yamux::YamuxConfig::default()) + .authenticate(noise::Config::new(&keypair).unwrap()) + .multiplex(libp2p::yamux::Config::default()) .boxed(); let config = PeersetConfig { @@ -1096,12 +1087,13 @@ mod tests { let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); - let mut swarm = Swarm::with_executor( + let mut swarm = SwarmBuilder::with_executor( transport, behaviour, keypair.public().to_peer_id(), TokioExecutor(runtime), - ); + ) + .build(); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); swarm.listen_on(listen_addr.clone()).unwrap(); @@ -1117,7 +1109,7 @@ mod tests { let protocol_name = "/test/req-resp/1"; let mut pool = LocalPool::new(); - // Build swarms whose behaviour is `RequestResponsesBehaviour`. + // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = mpsc::channel::(64); @@ -1220,7 +1212,7 @@ mod tests { let protocol_name = "/test/req-resp/1"; let mut pool = LocalPool::new(); - // Build swarms whose behaviour is `RequestResponsesBehaviour`. + // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = mpsc::channel::(64); @@ -1322,10 +1314,10 @@ mod tests { } /// A [`RequestId`] is a unique identifier among either all inbound or all outbound requests for - /// a single [`RequestResponse`] behaviour. It is not guaranteed to be unique across multiple - /// [`RequestResponse`] behaviours. Thus when handling [`RequestId`] in the context of multiple - /// [`RequestResponse`] behaviours, one needs to couple the protocol name with the [`RequestId`] - /// to get a unique request identifier. + /// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across + /// multiple [`RequestResponsesBehaviour`] behaviours. Thus when handling [`RequestId`] in the + /// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the + /// protocol name with the [`RequestId`] to get a unique request identifier. /// /// This test ensures that two requests on different protocols can be handled concurrently /// without a [`RequestId`] collision. diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 75c9d66926623..cd8e18a2e7d9f 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -52,17 +52,19 @@ use crate::{ ReputationChange, }; +use either::Either; use futures::{channel::oneshot, prelude::*}; +#[allow(deprecated)] use libp2p::{ - core::{either::EitherError, upgrade, ConnectedPoint}, + connection_limits::Exceeded, + core::{upgrade, ConnectedPoint, Endpoint}, identify::Info as IdentifyInfo, kad::record::Key as KademliaKey, multiaddr, ping::Failure as PingFailure, swarm::{ - AddressScore, ConnectionError, ConnectionHandler, ConnectionLimits, DialError, Executor, - IntoConnectionHandler, NetworkBehaviour, PendingConnectionError, Swarm, SwarmBuilder, - SwarmEvent, + AddressScore, ConnectionError, ConnectionId, ConnectionLimits, DialError, Executor, + ListenError, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, THandlerErr, }, Multiaddr, PeerId, }; @@ -90,7 +92,7 @@ use std::{ }; pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; -pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; +pub use libp2p::identity::{DecodingError, Keypair, PublicKey}; pub use protocol::NotificationsSink; mod metrics; @@ -99,12 +101,6 @@ mod out_events; pub mod signature; pub mod traits; -/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. -/// Used as a template parameter of [`SwarmEvent`] below. -type ConnectionHandlerErr = - <<::ConnectionHandler as IntoConnectionHandler> - ::Handler as ConnectionHandler>::Error; - /// Substrate network service. Handles network IO and manages connectivity. pub struct NetworkService { /// Number of peers we're connected to. @@ -311,7 +307,7 @@ where format!("{} ({})", network_config.client_version, network_config.node_name); let discovery_config = { - let mut config = DiscoveryConfig::new(local_public.clone()); + let mut config = DiscoveryConfig::new(local_public.to_peer_id()); config.with_permanent_addresses(known_addresses); config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15); config.with_kademlia( @@ -374,6 +370,7 @@ where SpawnImpl(params.executor), ) }; + #[allow(deprecated)] let builder = builder .connection_limits( ConnectionLimits::default() @@ -384,7 +381,9 @@ where ) .substream_upgrade_protocol_override(upgrade::Version::V1Lazy) .notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed")) - .connection_event_buffer_size(1024) + // NOTE: 24 is somewhat arbitrary and should be tuned in the future if necessary. + // See + .per_connection_event_buffer_size(24) .max_negotiating_inbound_streams(2048); (builder.build(), bandwidth) @@ -509,15 +508,23 @@ where pub fn network_state(&mut self) -> NetworkState { let swarm = &mut self.network_service; let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::>(); - let connected_peers = { let swarm = &mut *swarm; open.iter() .filter_map(move |peer_id| { - let known_addresses = - NetworkBehaviour::addresses_of_peer(swarm.behaviour_mut(), peer_id) - .into_iter() - .collect(); + let known_addresses = if let Ok(addrs) = + NetworkBehaviour::handle_pending_outbound_connection( + swarm.behaviour_mut(), + ConnectionId::new_unchecked(0), // dummy value + Some(*peer_id), + &vec![], + Endpoint::Listener, + ) { + addrs.into_iter().collect() + } else { + error!(target: "sub-libp2p", "Was not able to get known addresses for {:?}", peer_id); + return None + }; let endpoint = if let Some(e) = swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint()) @@ -556,6 +563,20 @@ where .into_iter() .filter(|p| open.iter().all(|n| n != p)) .map(move |peer_id| { + let known_addresses = if let Ok(addrs) = + NetworkBehaviour::handle_pending_outbound_connection( + swarm.behaviour_mut(), + ConnectionId::new_unchecked(0), // dummy value + Some(peer_id), + &vec![], + Endpoint::Listener, + ) { + addrs.into_iter().collect() + } else { + error!(target: "sub-libp2p", "Was not able to get known addresses for {:?}", peer_id); + Default::default() + }; + ( peer_id.to_base58(), NetworkStateNotConnectedPeer { @@ -567,12 +588,7 @@ where .behaviour_mut() .node(&peer_id) .and_then(|i| i.latest_ping()), - known_addresses: NetworkBehaviour::addresses_of_peer( - swarm.behaviour_mut(), - &peer_id, - ) - .into_iter() - .collect(), + known_addresses, }, ) }) @@ -1340,10 +1356,7 @@ where } /// Process the next event coming from `Swarm`. - fn handle_swarm_event( - &mut self, - event: SwarmEvent>>, - ) { + fn handle_swarm_event(&mut self, event: SwarmEvent>>) { match event { SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => { if let Some(metrics) = self.metrics.as_ref() { @@ -1572,6 +1585,7 @@ where endpoint, num_established, concurrent_dial_errors, + .. } => { if let Some(errors) = concurrent_dial_errors { debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors); @@ -1600,11 +1614,11 @@ where }; let reason = match cause { Some(ConnectionError::IO(_)) => "transport-error", - Some(ConnectionError::Handler(EitherError::A(EitherError::A( - EitherError::B(EitherError::A(PingFailure::Timeout)), + Some(ConnectionError::Handler(Either::Left(Either::Left( + Either::Right(Either::Left(PingFailure::Timeout)), )))) => "ping-timeout", - Some(ConnectionError::Handler(EitherError::A(EitherError::A( - EitherError::A(NotifsHandlerError::SyncNotificationsClogged), + Some(ConnectionError::Handler(Either::Left(Either::Left( + Either::Left(NotifsHandlerError::SyncNotificationsClogged), )))) => "sync-notifications-clogged", Some(ConnectionError::Handler(_)) => "protocol-error", Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", @@ -1653,16 +1667,22 @@ where } if let Some(metrics) = self.metrics.as_ref() { + #[allow(deprecated)] let reason = match error { + DialError::Denied { cause } => + if cause.downcast::().is_ok() { + Some("limit-reached") + } else { + None + }, DialError::ConnectionLimit(_) => Some("limit-reached"), - DialError::InvalidPeerId(_) => Some("invalid-peer-id"), - DialError::Transport(_) | DialError::ConnectionIo(_) => - Some("transport-error"), + DialError::InvalidPeerId(_) | + DialError::WrongPeerId { .. } | + DialError::LocalPeerId { .. } => Some("invalid-peer-id"), + DialError::Transport(_) => Some("transport-error"), DialError::Banned | - DialError::LocalPeerId | DialError::NoAddresses | DialError::DialPeerConditionFalse(_) | - DialError::WrongPeerId { .. } | DialError::Aborted => None, // ignore them }; if let Some(reason) = reason { @@ -1687,12 +1707,19 @@ where local_addr, send_back_addr, error, ); if let Some(metrics) = self.metrics.as_ref() { + #[allow(deprecated)] let reason = match error { - PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"), - PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"), - PendingConnectionError::Transport(_) | PendingConnectionError::IO(_) => - Some("transport-error"), - PendingConnectionError::Aborted => None, // ignore it + ListenError::Denied { cause } => + if cause.downcast::().is_ok() { + Some("limit-reached") + } else { + None + }, + ListenError::ConnectionLimit(_) => Some("limit-reached"), + ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => + Some("invalid-peer-id"), + ListenError::Transport(_) => Some("transport-error"), + ListenError::Aborted => None, // ignore it }; if let Some(reason) = reason { @@ -1703,6 +1730,7 @@ where } } }, + #[allow(deprecated)] SwarmEvent::BannedPeer { peer_id, endpoint } => { debug!( target: "sub-libp2p", diff --git a/client/network/src/service/signature.rs b/client/network/src/service/signature.rs index e52dd6b1d2a29..024f60e4c466b 100644 --- a/client/network/src/service/signature.rs +++ b/client/network/src/service/signature.rs @@ -23,7 +23,7 @@ use libp2p::{ PeerId, }; -pub use libp2p::identity::error::SigningError; +pub use libp2p::identity::SigningError; /// A result of signing a message with a network identity. Since `PeerId` is potentially a hash of a /// `PublicKey`, you need to reveal the `PublicKey` next to the signature, so the verifier can check diff --git a/client/network/src/service/traits.rs b/client/network/src/service/traits.rs index 3f9b7e552027b..787ef4b5ae445 100644 --- a/client/network/src/service/traits.rs +++ b/client/network/src/service/traits.rs @@ -33,7 +33,7 @@ use sc_peerset::ReputationChange; use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc}; -pub use libp2p::{identity::error::SigningError, kad::record::Key as KademliaKey}; +pub use libp2p::{identity::SigningError, kad::record::Key as KademliaKey}; /// Signer with network identity pub trait NetworkSigner { diff --git a/client/network/src/transport.rs b/client/network/src/transport.rs index 3aa0bb48dec53..9e63ce98878a6 100644 --- a/client/network/src/transport.rs +++ b/client/network/src/transport.rs @@ -16,20 +16,18 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use either::Either; use libp2p::{ - bandwidth, core::{ - self, - either::EitherTransport, muxing::StreamMuxerBox, transport::{Boxed, OptionalTransport}, upgrade, }, - dns, identity, mplex, noise, tcp, websocket, PeerId, Transport, + dns, identity, noise, tcp, websocket, PeerId, Transport, TransportExt, }; use std::{sync::Arc, time::Duration}; -pub use self::bandwidth::BandwidthSinks; +pub use libp2p::bandwidth::BandwidthSinks; /// Builds the transport that serves as a common ground for all connections. /// @@ -59,7 +57,7 @@ pub fn build_transport( let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone()); let dns_init = dns::TokioDnsConfig::system(tcp_trans); - EitherTransport::Left(if let Ok(dns) = dns_init { + Either::Left(if let Ok(dns) = dns_init { // WS + WSS transport // // Main transport can't be used for `/wss` addresses because WSS transport needs @@ -68,47 +66,21 @@ pub fn build_transport( let tcp_trans = tcp::tokio::Transport::new(tcp_config); let dns_for_wss = dns::TokioDnsConfig::system(tcp_trans) .expect("same system_conf & resolver to work"); - EitherTransport::Left(websocket::WsConfig::new(dns_for_wss).or_transport(dns)) + Either::Left(websocket::WsConfig::new(dns_for_wss).or_transport(dns)) } else { // In case DNS can't be constructed, fallback to TCP + WS (WSS won't work) let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone()); let desktop_trans = websocket::WsConfig::new(tcp_trans) .or_transport(tcp::tokio::Transport::new(tcp_config)); - EitherTransport::Right(desktop_trans) + Either::Right(desktop_trans) }) } else { - EitherTransport::Right(OptionalTransport::some( - libp2p::core::transport::MemoryTransport::default(), - )) + Either::Right(OptionalTransport::some(libp2p::core::transport::MemoryTransport::default())) }; - let (transport, bandwidth) = bandwidth::BandwidthLogging::new(transport); - - let authentication_config = - { - // For more information about these two panics, see in "On the Importance of - // Checking Cryptographic Protocols for Faults" by Dan Boneh, Richard A. DeMillo, - // and Richard J. Lipton. - let noise_keypair = noise::Keypair::::new().into_authentic(&keypair) - .expect("can only fail in case of a hardware bug; since this signing is performed only \ - once and at initialization, we're taking the bet that the inconvenience of a very \ - rare panic here is basically zero"); - - // Legacy noise configurations for backward compatibility. - let noise_legacy = - noise::LegacyConfig { recv_legacy_handshake: true, ..Default::default() }; - - let mut xx_config = noise::NoiseConfig::xx(noise_keypair); - xx_config.set_legacy_config(noise_legacy); - xx_config.into_authenticated() - }; - + let authentication_config = noise::Config::new(&keypair).expect("Can create noise config. qed"); let multiplexing_config = { - let mut mplex_config = mplex::MplexConfig::new(); - mplex_config.set_max_buffer_behaviour(mplex::MaxBufferBehaviour::Block); - mplex_config.set_max_buffer_size(usize::MAX); - - let mut yamux_config = libp2p::yamux::YamuxConfig::default(); + let mut yamux_config = libp2p::yamux::Config::default(); // Enable proper flow-control: window updates are only sent when // buffered data has been consumed. yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read()); @@ -118,7 +90,7 @@ pub fn build_transport( yamux_config.set_receive_window_size(yamux_window_size); } - core::upgrade::SelectUpgrade::new(yamux_config, mplex_config) + yamux_config }; let transport = transport @@ -128,5 +100,5 @@ pub fn build_transport( .timeout(Duration::from_secs(20)) .boxed(); - (transport, bandwidth) + transport.with_bandwidth_logging() } diff --git a/client/network/statement/Cargo.toml b/client/network/statement/Cargo.toml index 36d8cb077210d..a81e6a916c05c 100644 --- a/client/network/statement/Cargo.toml +++ b/client/network/statement/Cargo.toml @@ -17,7 +17,7 @@ array-bytes = "4.1" async-channel = "1.8.0" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } futures = "0.3.21" -libp2p = "0.50.0" +libp2p = "0.51.3" log = "0.4.17" pin-project = "1.0.12" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index 298cbf1801f2e..ce713596011fb 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -21,7 +21,7 @@ async-trait = "0.1.58" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } futures = "0.3.21" futures-timer = "3.0.2" -libp2p = "0.50.0" +libp2p = "0.51.3" log = "0.4.17" lru = "0.8.1" mockall = "0.11.3" diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 9763feed5ea54..af519008dddaf 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -17,7 +17,7 @@ tokio = "1.22.0" async-trait = "0.1.57" futures = "0.3.21" futures-timer = "3.0.1" -libp2p = "0.50.0" +libp2p = "0.51.3" log = "0.4.17" parking_lot = "0.12.1" rand = "0.8.5" diff --git a/client/network/transactions/Cargo.toml b/client/network/transactions/Cargo.toml index 3616473d3baed..3ae1dc5908df4 100644 --- a/client/network/transactions/Cargo.toml +++ b/client/network/transactions/Cargo.toml @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] array-bytes = "4.1" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } futures = "0.3.21" -libp2p = "0.50.0" +libp2p = "0.51.3" log = "0.4.17" pin-project = "1.0.12" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } diff --git a/client/offchain/Cargo.toml b/client/offchain/Cargo.toml index 0307e3125f3ee..a2ab54ba5e638 100644 --- a/client/offchain/Cargo.toml +++ b/client/offchain/Cargo.toml @@ -21,7 +21,7 @@ futures = "0.3.21" futures-timer = "3.0.2" hyper = { version = "0.14.16", features = ["stream", "http2"] } hyper-rustls = { version = "0.23.0", features = ["http2"] } -libp2p = "0.50.0" +libp2p = "0.51.3" num_cpus = "1.13" once_cell = "1.8" parking_lot = "0.12.1" diff --git a/client/peerset/Cargo.toml b/client/peerset/Cargo.toml index a09508c831189..043f8a8352caa 100644 --- a/client/peerset/Cargo.toml +++ b/client/peerset/Cargo.toml @@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] futures = "0.3.21" -libp2p = "0.50.0" +libp2p-identity = { version = "0.1.2", features = ["peerid", "ed25519"] } log = "0.4.17" serde_json = "1.0.85" wasm-timer = "0.2" diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index e5393acbaa32f..e169be8e8ed5b 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -46,7 +46,7 @@ use std::{ }; use wasm_timer::Delay; -pub use libp2p::PeerId; +pub use libp2p_identity::PeerId; /// We don't accept nodes whose reputation is under this value. pub const BANNED_THRESHOLD: i32 = 82 * (i32::MIN / 100); @@ -781,7 +781,7 @@ mod tests { BANNED_THRESHOLD, }; use futures::prelude::*; - use libp2p::PeerId; + use libp2p_identity::PeerId; use std::{pin::Pin, task::Poll, thread, time::Duration}; fn assert_messages(mut peerset: Peerset, messages: Vec) -> Peerset { diff --git a/client/peerset/src/peersstate.rs b/client/peerset/src/peersstate.rs index 84907ac914b45..2d4a9295c24c9 100644 --- a/client/peerset/src/peersstate.rs +++ b/client/peerset/src/peersstate.rs @@ -28,7 +28,7 @@ //! > for example connecting to some nodes in priority should be added outside of this //! > module, rather than inside. -use libp2p::PeerId; +use libp2p_identity::PeerId; use log::error; use std::{ borrow::Cow, @@ -626,7 +626,7 @@ impl<'a> Drop for Reputation<'a> { #[cfg(test)] mod tests { use super::{Peer, PeersState, SetConfig}; - use libp2p::PeerId; + use libp2p_identity::PeerId; use std::iter; #[test] diff --git a/client/peerset/tests/fuzz.rs b/client/peerset/tests/fuzz.rs index 1f4bd053b553a..122f17062577d 100644 --- a/client/peerset/tests/fuzz.rs +++ b/client/peerset/tests/fuzz.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use futures::prelude::*; -use libp2p::PeerId; +use libp2p_identity::PeerId; use rand::{ distributions::{Distribution, Uniform, WeightedIndex}, seq::IteratorRandom, diff --git a/client/telemetry/Cargo.toml b/client/telemetry/Cargo.toml index d48d950bee542..cf51e029be9c5 100644 --- a/client/telemetry/Cargo.toml +++ b/client/telemetry/Cargo.toml @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] chrono = { version = "0.4", default-features = false } futures = "0.3.21" -libp2p = { version = "0.50.0", features = ["dns", "tcp", "tokio", "wasm-ext", "websocket"] } +libp2p = { version = "0.51.3", features = ["dns", "tcp", "tokio", "wasm-ext", "websocket"] } log = "0.4.17" parking_lot = "0.12.1" pin-project = "1.0.12"