diff --git a/Cargo.lock b/Cargo.lock index a22cfa8ba8dd6..ca0ebee0ac475 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,9 +340,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" [[package]] name = "async-trait" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" dependencies = [ "proc-macro2", "quote", @@ -8321,6 +8321,7 @@ version = "0.10.0-dev" dependencies = [ "array-bytes", "async-std", + "async-trait", "fork-tree", "futures", "libp2p", diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index dd216b2a5295a..bed9935698769 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -24,14 +24,16 @@ pub mod warp; use libp2p::PeerId; use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}; -use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_consensus::{ + import_queue::RuntimeOrigin, BlockImportError, BlockImportStatus, IncomingBlock, +}; use sp_consensus::BlockOrigin; use sp_runtime::{ traits::{Block as BlockT, NumberFor}, Justifications, }; use std::{any::Any, fmt, fmt::Formatter, task::Poll}; -use warp::{EncodedProof, WarpProofRequest, WarpSyncProgress}; +use warp::WarpSyncProgress; /// The sync status of a peer we are trying to sync with #[derive(Debug)] @@ -123,7 +125,7 @@ pub enum OnBlockJustification { }, } -/// Result of [`ChainSync::on_state_data`]. +/// Result of `ChainSync::on_state_data`. #[derive(Debug)] pub enum OnStateData { /// The block and state that should be imported. @@ -132,6 +134,20 @@ pub enum OnStateData { Continue, } +/// Block or justification request polled from `ChainSync` +#[derive(Debug)] +pub enum ImportResult { + BlockImport(BlockOrigin, Vec>), + JustificationImport(RuntimeOrigin, B::Hash, NumberFor, Justifications), +} + +/// Value polled from `ChainSync` +#[derive(Debug)] +pub enum PollResult { + Import(ImportResult), + Announce(PollBlockAnnounceValidation), +} + /// Result of [`ChainSync::poll_block_announce_validation`]. #[derive(Debug, Clone, PartialEq, Eq)] pub enum PollBlockAnnounceValidation { @@ -186,6 +202,13 @@ pub struct Metrics { pub justifications: metrics::Metrics, } +#[derive(Debug)] +pub enum PeerRequest { + Block(BlockRequest), + State, + WarpProof, +} + /// Wrapper for implementation-specific state request. /// /// NOTE: Implementation must be able to encode and decode it for network purposes. @@ -250,6 +273,9 @@ pub trait ChainSync: Send { /// Returns the current number of peers stored within this state machine. fn num_peers(&self) -> usize; + /// Returns the number of peers we're connected to and that are being queried. + fn num_active_peers(&self) -> usize; + /// Handle a new connected peer. /// /// Call this method whenever we connect to a new peer. @@ -277,22 +303,6 @@ pub trait ChainSync: Send { number: NumberFor, ); - /// Get an iterator over all scheduled justification requests. - fn justification_requests<'a>( - &'a mut self, - ) -> Box)> + 'a>; - - /// Get an iterator over all block requests of all peers. - fn block_requests<'a>( - &'a mut self, - ) -> Box)> + 'a>; - - /// Get a state request, if any. - fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)>; - - /// Get a warp sync request, if any. - fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)>; - /// Handle a response from the remote to a block request that we made. /// /// `request` must be the original request that triggered `response`. @@ -307,16 +317,6 @@ pub trait ChainSync: Send { response: BlockResponse, ) -> Result, BadPeer>; - /// Handle a response from the remote to a state request that we made. - fn on_state_data( - &mut self, - who: &PeerId, - response: OpaqueStateResponse, - ) -> Result, BadPeer>; - - /// Handle a response from the remote to a warp proof request that we made. - fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer>; - /// Handle a response from the remote to a justification request that we made. /// /// `request` must be the original request that triggered `response`. @@ -383,15 +383,6 @@ pub trait ChainSync: Send { /// Return some key metrics. fn metrics(&self) -> Metrics; - /// Create implementation-specific block request. - fn create_opaque_block_request(&self, request: &BlockRequest) -> OpaqueBlockRequest; - - /// Encode implementation-specific block request. - fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result, String>; - - /// Decode implementation-specific block response. - fn decode_block_response(&self, response: &[u8]) -> Result; - /// Access blocks from implementation-specific block response. fn block_response_into_blocks( &self, @@ -399,19 +390,13 @@ pub trait ChainSync: Send { response: OpaqueBlockResponse, ) -> Result>, String>; - /// Encode implementation-specific state request. - fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result, String>; - - /// Decode implementation-specific state response. - fn decode_state_response(&self, response: &[u8]) -> Result; - /// Advance the state of `ChainSync` /// /// Internally calls [`ChainSync::poll_block_announce_validation()`] and /// this function should be polled until it returns [`Poll::Pending`] to /// consume all pending events. - fn poll( - &mut self, - cx: &mut std::task::Context, - ) -> Poll>; + fn poll(&mut self, cx: &mut std::task::Context) -> Poll>; + + /// Send block request to peer + fn send_block_request(&mut self, who: PeerId, request: BlockRequest); } diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 2e646956e9d8c..48d6127f642c3 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -40,7 +40,6 @@ use sc_network_common::{ ProtocolName, }, request_responses::{IfDisconnected, ProtocolConfig, RequestFailure}, - sync::{warp::WarpProofRequest, OpaqueBlockRequest, OpaqueStateRequest}, }; use sc_peerset::{PeersetHandle, ReputationChange}; use sp_blockchain::HeaderBackend; @@ -163,36 +162,6 @@ pub enum BehaviourOut { messages: Vec<(ProtocolName, Bytes)>, }, - /// A new block request must be emitted. - BlockRequest { - /// Node we send the request to. - target: PeerId, - /// Opaque implementation-specific block request. - request: OpaqueBlockRequest, - /// One-shot channel to receive the response. - pending_response: oneshot::Sender, RequestFailure>>, - }, - - /// A new state request must be emitted. - StateRequest { - /// Node we send the request to. - target: PeerId, - /// Opaque implementation-specific state request. - request: OpaqueStateRequest, - /// One-shot channel to receive the response. - pending_response: oneshot::Sender, RequestFailure>>, - }, - - /// A new warp sync request must be emitted. - WarpSyncRequest { - /// Node we send the request to. - target: PeerId, - /// Warp sync request. - request: WarpProofRequest, - /// One-shot channel to receive the response. - pending_response: oneshot::Sender, RequestFailure>>, - }, - /// Now connected to a new peer for syncing purposes. SyncConnected(PeerId), @@ -230,21 +199,9 @@ where user_agent: String, local_public_key: PublicKey, disco_config: DiscoveryConfig, - block_request_protocol_config: ProtocolConfig, - state_request_protocol_config: ProtocolConfig, - warp_sync_protocol_config: Option, - light_client_request_protocol_config: ProtocolConfig, - // All remaining request protocol configs. - mut request_response_protocols: Vec, + request_response_protocols: Vec, peerset: PeersetHandle, ) -> Result { - if let Some(config) = warp_sync_protocol_config { - request_response_protocols.push(config); - } - request_response_protocols.push(block_request_protocol_config); - request_response_protocols.push(state_request_protocol_config); - request_response_protocols.push(light_client_request_protocol_config); - Ok(Self { substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), @@ -356,12 +313,6 @@ impl From> for BehaviourOut { BehaviourOut::BlockImport(origin, blocks), CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => BehaviourOut::JustificationImport(origin, hash, nb, justification), - CustomMessageOutcome::BlockRequest { target, request, pending_response } => - BehaviourOut::BlockRequest { target, request, pending_response }, - CustomMessageOutcome::StateRequest { target, request, pending_response } => - BehaviourOut::StateRequest { target, request, pending_response }, - CustomMessageOutcome::WarpSyncRequest { target, request, pending_response } => - BehaviourOut::WarpSyncRequest { target, request, pending_response }, CustomMessageOutcome::NotificationStreamOpened { remote, protocol, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 14f7e8ffbf76a..50d8e2baba60f 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -101,39 +101,6 @@ where /// Block announce protocol configuration pub block_announce_config: NonDefaultSetConfig, - /// Request response configuration for the block request protocol. - /// - /// [`RequestResponseConfig::name`] is used to tag outgoing block requests with the correct - /// protocol name. In addition all of [`RequestResponseConfig`] is used to handle incoming - /// block requests, if enabled. - /// - /// Can be constructed either via - /// `sc_network_sync::block_request_handler::generate_protocol_config` allowing outgoing but - /// not incoming requests, or constructed via `sc_network_sync::block_request_handler:: - /// BlockRequestHandler::new` allowing both outgoing and incoming requests. - pub block_request_protocol_config: RequestResponseConfig, - - /// Request response configuration for the light client request protocol. - /// - /// Can be constructed either via - /// `sc_network_light::light_client_requests::generate_protocol_config` allowing outgoing but - /// not incoming requests, or constructed via - /// `sc_network_light::light_client_requests::handler::LightClientRequestHandler::new` - /// allowing both outgoing and incoming requests. - pub light_client_request_protocol_config: RequestResponseConfig, - - /// Request response configuration for the state request protocol. - /// - /// Can be constructed either via - /// `sc_network_sync::state_request_handler::generate_protocol_config` allowing outgoing but - /// not incoming requests, or constructed via - /// `sc_network_sync::state_request_handler::StateRequestHandler::new` allowing - /// both outgoing and incoming requests. - pub state_request_protocol_config: RequestResponseConfig, - - /// Optional warp sync protocol config. - pub warp_sync_protocol_config: Option, - /// Request response protocol configurations pub request_response_protocol_configs: Vec, } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 63d060f423773..8c1dd39b49be3 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -20,10 +20,9 @@ use crate::config; use bytes::Bytes; use codec::{Decode, DecodeAll, Encode}; -use futures::{channel::oneshot, prelude::*}; +use futures::prelude::*; use libp2p::{ core::{connection::ConnectionId, transport::ListenerId, ConnectedPoint}, - request_response::OutboundFailure, swarm::{ ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, @@ -43,15 +42,9 @@ use sc_network_common::{ config::NonReservedPeerMode, error, protocol::{role::Roles, ProtocolName}, - request_responses::RequestFailure, sync::{ - message::{ - BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest, - BlockResponse, BlockState, - }, - warp::{EncodedProof, WarpProofRequest}, - BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest, - OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PollBlockAnnounceValidation, + message::{BlockAnnounce, BlockAnnouncesHandshake, BlockData, BlockResponse, BlockState}, + BadPeer, ChainSync, ImportResult, OnBlockData, PollBlockAnnounceValidation, PollResult, SyncStatus, }, utils::{interval, LruHashSet}, @@ -102,18 +95,12 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192; mod rep { use sc_peerset::ReputationChange as Rep; - /// Reputation change when a peer doesn't respond in time to our messages. - pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout"); - /// Reputation change when a peer refuses a request. - pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused"); /// Reputation change when we are a light client and a peer is behind us. pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer"); /// We received a message that failed to decode. pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); /// Peer has different genesis. pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch"); - /// Peer is on unsupported protocol version. - pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol"); /// Peer role does not match (e.g. light peer connecting to another light peer). pub const BAD_ROLE: Rep = Rep::new_fatal("Unsupported role"); /// Peer send us a block announcement that failed at validation. @@ -204,19 +191,10 @@ pub struct Protocol { block_announce_data_cache: LruCache>, } -#[derive(Debug)] -enum PeerRequest { - Block(BlockRequest), - State, - WarpProof, -} - /// Peer information #[derive(Debug)] struct Peer { info: PeerInfo, - /// Current request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`]. - request: Option<(PeerRequest, oneshot::Receiver, RequestFailure>>)>, /// Holds a set of blocks known to this peer. known_blocks: LruHashSet, } @@ -432,7 +410,7 @@ where /// Returns the number of peers we're connected to and that are being queried. pub fn num_active_peers(&self) -> usize { - self.peers.values().filter(|p| p.request.is_some()).count() + self.chain_sync.num_active_peers() } /// Current global sync state. @@ -521,106 +499,6 @@ where self.peerset_handle.report_peer(who, reputation) } - /// Must be called in response to a [`CustomMessageOutcome::BlockRequest`] being emitted. - /// Must contain the same `PeerId` and request that have been emitted. - pub fn on_block_response( - &mut self, - peer_id: PeerId, - request: BlockRequest, - response: OpaqueBlockResponse, - ) -> CustomMessageOutcome { - let blocks = match self.chain_sync.block_response_into_blocks(&request, response) { - Ok(blocks) => blocks, - Err(err) => { - debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); - return CustomMessageOutcome::None - }, - }; - - let block_response = BlockResponse:: { id: request.id, blocks }; - - let blocks_range = || match ( - block_response - .blocks - .first() - .and_then(|b| b.header.as_ref().map(|h| h.number())), - block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; - trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}", - block_response.id, - peer_id, - block_response.blocks.len(), - blocks_range(), - ); - - if request.fields == BlockAttributes::JUSTIFICATION { - match self.chain_sync.on_block_justification(peer_id, block_response) { - Ok(OnBlockJustification::Nothing) => CustomMessageOutcome::None, - Ok(OnBlockJustification::Import { peer, hash, number, justifications }) => - CustomMessageOutcome::JustificationImport(peer, hash, number, justifications), - Err(BadPeer(id, repu)) => { - self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); - self.peerset_handle.report_peer(id, repu); - CustomMessageOutcome::None - }, - } - } else { - match self.chain_sync.on_block_data(&peer_id, Some(request), block_response) { - Ok(OnBlockData::Import(origin, blocks)) => - CustomMessageOutcome::BlockImport(origin, blocks), - Ok(OnBlockData::Request(peer, req)) => - prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req), - Ok(OnBlockData::Continue) => CustomMessageOutcome::None, - Err(BadPeer(id, repu)) => { - self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); - self.peerset_handle.report_peer(id, repu); - CustomMessageOutcome::None - }, - } - } - } - - /// Must be called in response to a [`CustomMessageOutcome::StateRequest`] being emitted. - /// Must contain the same `PeerId` and request that have been emitted. - pub fn on_state_response( - &mut self, - peer_id: PeerId, - response: OpaqueStateResponse, - ) -> CustomMessageOutcome { - match self.chain_sync.on_state_data(&peer_id, response) { - Ok(OnStateData::Import(origin, block)) => - CustomMessageOutcome::BlockImport(origin, vec![block]), - Ok(OnStateData::Continue) => CustomMessageOutcome::None, - Err(BadPeer(id, repu)) => { - self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); - self.peerset_handle.report_peer(id, repu); - CustomMessageOutcome::None - }, - } - } - - /// Must be called in response to a [`CustomMessageOutcome::WarpSyncRequest`] being emitted. - /// Must contain the same `PeerId` and request that have been emitted. - pub fn on_warp_sync_response( - &mut self, - peer_id: PeerId, - response: EncodedProof, - ) -> CustomMessageOutcome { - match self.chain_sync.on_warp_sync_data(&peer_id, response) { - Ok(()) => CustomMessageOutcome::None, - Err(BadPeer(id, repu)) => { - self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); - self.peerset_handle.report_peer(id, repu); - CustomMessageOutcome::None - }, - } - } - /// Perform time based maintenance. /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. @@ -721,7 +599,6 @@ where best_hash: status.best_hash, best_number: status.best_number, }, - request: None, known_blocks: LruHashSet::new( NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"), ), @@ -750,12 +627,7 @@ where .push_back(CustomMessageOutcome::PeerNewBest(who, status.best_number)); if let Some(req) = req { - self.pending_messages.push_back(prepare_block_request( - self.chain_sync.as_ref(), - &mut self.peers, - who, - req, - )); + self.chain_sync.send_block_request(who, req); } Ok(()) @@ -921,8 +793,10 @@ where match blocks_to_import { Ok(OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), - Ok(OnBlockData::Request(peer, req)) => - prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req), + Ok(OnBlockData::Request(peer, req)) => { + self.chain_sync.send_block_request(peer, req); + CustomMessageOutcome::None + }, Ok(OnBlockData::Continue) => CustomMessageOutcome::None, Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); @@ -963,14 +837,7 @@ where let results = self.chain_sync.on_blocks_processed(imported, count, results); for result in results { match result { - Ok((id, req)) => { - self.pending_messages.push_back(prepare_block_request( - self.chain_sync.as_ref(), - &mut self.peers, - id, - req, - )); - }, + Ok((id, req)) => self.chain_sync.send_block_request(id, req), Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu) @@ -1096,16 +963,6 @@ where } } - /// Encode implementation-specific block request. - pub fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result, String> { - self.chain_sync.encode_block_request(request) - } - - /// Encode implementation-specific state request. - pub fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result, String> { - self.chain_sync.encode_state_request(request) - } - fn report_metrics(&self) { if let Some(metrics) = &self.metrics { let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX); @@ -1136,49 +993,6 @@ where } } -fn prepare_block_request( - chain_sync: &dyn ChainSync, - peers: &mut HashMap>, - who: PeerId, - request: BlockRequest, -) -> CustomMessageOutcome { - let (tx, rx) = oneshot::channel(); - - if let Some(ref mut peer) = peers.get_mut(&who) { - peer.request = Some((PeerRequest::Block(request.clone()), rx)); - } - - let request = chain_sync.create_opaque_block_request(&request); - - CustomMessageOutcome::BlockRequest { target: who, request, pending_response: tx } -} - -fn prepare_state_request( - peers: &mut HashMap>, - who: PeerId, - request: OpaqueStateRequest, -) -> CustomMessageOutcome { - let (tx, rx) = oneshot::channel(); - - if let Some(ref mut peer) = peers.get_mut(&who) { - peer.request = Some((PeerRequest::State, rx)); - } - CustomMessageOutcome::StateRequest { target: who, request, pending_response: tx } -} - -fn prepare_warp_sync_request( - peers: &mut HashMap>, - who: PeerId, - request: WarpProofRequest, -) -> CustomMessageOutcome { - let (tx, rx) = oneshot::channel(); - - if let Some(ref mut peer) = peers.get_mut(&who) { - peer.request = Some((PeerRequest::WarpProof, rx)); - } - CustomMessageOutcome::WarpSyncRequest { target: who, request, pending_response: tx } -} - /// Outcome of an incoming custom message. #[derive(Debug)] #[must_use] @@ -1210,24 +1024,6 @@ pub enum CustomMessageOutcome { remote: PeerId, messages: Vec<(ProtocolName, Bytes)>, }, - /// A new block request must be emitted. - BlockRequest { - target: PeerId, - request: OpaqueBlockRequest, - pending_response: oneshot::Sender, RequestFailure>>, - }, - /// A new storage request must be emitted. - StateRequest { - target: PeerId, - request: OpaqueStateRequest, - pending_response: oneshot::Sender, RequestFailure>>, - }, - /// A new warp sync request must be emitted. - WarpSyncRequest { - target: PeerId, - request: WarpProofRequest, - pending_response: oneshot::Sender, RequestFailure>>, - }, /// Peer has a reported a new head of chain. PeerNewBest(PeerId, NumberFor), /// Now connected to a new peer for syncing purposes. @@ -1305,165 +1101,35 @@ where return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)) } - // Check for finished outgoing requests. - let mut finished_block_requests = Vec::new(); - let mut finished_state_requests = Vec::new(); - let mut finished_warp_sync_requests = Vec::new(); - for (id, peer) in self.peers.iter_mut() { - if let Peer { request: Some((_, pending_response)), .. } = peer { - match pending_response.poll_unpin(cx) { - Poll::Ready(Ok(Ok(resp))) => { - let (req, _) = peer.request.take().unwrap(); - match req { - PeerRequest::Block(req) => { - let response = - match self.chain_sync.decode_block_response(&resp[..]) { - Ok(proto) => proto, - Err(e) => { - debug!( - target: "sync", - "Failed to decode block response from peer {:?}: {:?}.", - id, - e - ); - self.peerset_handle.report_peer(*id, rep::BAD_MESSAGE); - self.behaviour - .disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - continue - }, - }; - - finished_block_requests.push((*id, req, response)); - }, - PeerRequest::State => { - let response = - match self.chain_sync.decode_state_response(&resp[..]) { - Ok(proto) => proto, - Err(e) => { - debug!( - target: "sync", - "Failed to decode state response from peer {:?}: {:?}.", - id, - e - ); - self.peerset_handle.report_peer(*id, rep::BAD_MESSAGE); - self.behaviour - .disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - continue - }, - }; - - finished_state_requests.push((*id, response)); - }, - PeerRequest::WarpProof => { - finished_warp_sync_requests.push((*id, resp)); - }, - } - }, - Poll::Ready(Ok(Err(e))) => { - peer.request.take(); - debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e); - - match e { - RequestFailure::Network(OutboundFailure::Timeout) => { - self.peerset_handle.report_peer(*id, rep::TIMEOUT); - self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - }, - RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => { - self.peerset_handle.report_peer(*id, rep::BAD_PROTOCOL); - self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - }, - RequestFailure::Network(OutboundFailure::DialFailure) => { - self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - }, - RequestFailure::Refused => { - self.peerset_handle.report_peer(*id, rep::REFUSED); - self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - }, - RequestFailure::Network(OutboundFailure::ConnectionClosed) | - RequestFailure::NotConnected => { - self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - }, - RequestFailure::UnknownProtocol => { - debug_assert!( - false, - "Block request protocol should always be known." - ); - }, - RequestFailure::Obsolete => { - debug_assert!( - false, - "Can not receive `RequestFailure::Obsolete` after dropping the \ - response receiver.", - ); - }, - } - }, - Poll::Ready(Err(oneshot::Canceled)) => { - peer.request.take(); - trace!( - target: "sync", - "Request to peer {:?} failed due to oneshot being canceled.", - id, - ); - self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC); - }, - Poll::Pending => {}, - } - } - } - for (id, req, response) in finished_block_requests { - let ev = self.on_block_response(id, req, response); - self.pending_messages.push_back(ev); - } - for (id, response) in finished_state_requests { - let ev = self.on_state_response(id, response); - self.pending_messages.push_back(ev); - } - for (id, response) in finished_warp_sync_requests { - let ev = self.on_warp_sync_response(id, EncodedProof(response)); - self.pending_messages.push_back(ev); - } - - while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) { - self.tick(); - } - - for (id, request) in self - .chain_sync - .block_requests() - .map(|(peer_id, request)| (peer_id, request)) - .collect::>() - { - let event = - prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request); - self.pending_messages.push_back(event); - } - if let Some((id, request)) = self.chain_sync.state_request() { - let event = prepare_state_request(&mut self.peers, id, request); - self.pending_messages.push_back(event); - } - for (id, request) in self.chain_sync.justification_requests().collect::>() { - let event = - prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request); - self.pending_messages.push_back(event); - } - if let Some((id, request)) = self.chain_sync.warp_sync_request() { - let event = prepare_warp_sync_request(&mut self.peers, id, request); - self.pending_messages.push_back(event); - } - // Advance the state of `ChainSync` // // Process any received requests received from `NetworkService` and // check if there is any block announcement validation finished. while let Poll::Ready(result) = self.chain_sync.poll(cx) { - match self.process_block_announce_validation_result(result) { - CustomMessageOutcome::None => {}, - outcome => self.pending_messages.push_back(outcome), + match result { + PollResult::Import(import) => self.pending_messages.push_back(match import { + ImportResult::BlockImport(origin, blocks) => + CustomMessageOutcome::BlockImport(origin, blocks), + ImportResult::JustificationImport(origin, hash, number, justifications) => + CustomMessageOutcome::JustificationImport( + origin, + hash, + number, + justifications, + ), + }), + PollResult::Announce(announce) => + match self.process_block_announce_validation_result(announce) { + CustomMessageOutcome::None => {}, + outcome => self.pending_messages.push_back(outcome), + }, } } + while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) { + self.tick(); + } + if let Some(message) = self.pending_messages.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)) } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 5ffd36007f530..7d756ed2d1e88 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -38,7 +38,6 @@ use crate::{ transport, ChainSyncInterface, ReputationChange, }; -use codec::Encode; use futures::{channel::oneshot, prelude::*}; use libp2p::{ core::{either::EitherError, upgrade, ConnectedPoint, Executor}, @@ -264,11 +263,6 @@ where let num_connected = Arc::new(AtomicUsize::new(0)); let is_major_syncing = Arc::new(AtomicBool::new(false)); - let block_request_protocol_name = params.block_request_protocol_config.name.clone(); - let state_request_protocol_name = params.state_request_protocol_config.name.clone(); - let warp_sync_protocol_name = - params.warp_sync_protocol_config.as_ref().map(|c| c.name.clone()); - // Build the swarm. let (mut swarm, bandwidth): (Swarm>, _) = { let user_agent = format!( @@ -366,10 +360,6 @@ where user_agent, local_public, discovery_config, - params.block_request_protocol_config, - params.state_request_protocol_config, - params.warp_sync_protocol_config, - params.light_client_request_protocol_config, params.network_config.request_response_protocols, peerset_handle.clone(), ); @@ -466,9 +456,6 @@ where peers_notifications_sinks, metrics, boot_node_ids, - block_request_protocol_name, - state_request_protocol_name, - warp_sync_protocol_name, _marker: Default::default(), }) } @@ -1287,15 +1274,6 @@ where /// For each peer and protocol combination, an object that allows sending notifications to /// that peer. Shared with the [`NetworkService`]. peers_notifications_sinks: Arc>>, - /// Protocol name used to send out block requests via - /// [`crate::request_responses::RequestResponsesBehaviour`]. - block_request_protocol_name: ProtocolName, - /// Protocol name used to send out state requests via - /// [`crate::request_responses::RequestResponsesBehaviour`]. - state_request_protocol_name: ProtocolName, - /// Protocol name used to send out warp sync requests via - /// [`crate::request_responses::RequestResponsesBehaviour`]. - warp_sync_protocol_name: Option, /// Marker to pin the `H` generic. Serves no purpose except to not break backwards /// compatibility. _marker: PhantomData, @@ -1474,84 +1452,6 @@ where } this.import_queue.import_justifications(origin, hash, nb, justifications); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockRequest { - target, - request, - pending_response, - })) => { - match this - .network_service - .behaviour() - .user_protocol() - .encode_block_request(&request) - { - Ok(data) => { - this.network_service.behaviour_mut().send_request( - &target, - &this.block_request_protocol_name, - data, - pending_response, - IfDisconnected::ImmediateError, - ); - }, - Err(err) => { - log::warn!( - target: "sync", - "Failed to encode block request {:?}: {:?}", - request, err - ); - }, - } - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::StateRequest { - target, - request, - pending_response, - })) => { - match this - .network_service - .behaviour() - .user_protocol() - .encode_state_request(&request) - { - Ok(data) => { - this.network_service.behaviour_mut().send_request( - &target, - &this.state_request_protocol_name, - data, - pending_response, - IfDisconnected::ImmediateError, - ); - }, - Err(err) => { - log::warn!( - target: "sync", - "Failed to encode state request {:?}: {:?}", - request, err - ); - }, - } - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::WarpSyncRequest { - target, - request, - pending_response, - })) => match &this.warp_sync_protocol_name { - Some(name) => this.network_service.behaviour_mut().send_request( - &target, - &name, - request.encode(), - pending_response, - IfDisconnected::ImmediateError, - ), - None => { - log::warn!( - target: "sync", - "Trying to send warp sync request when no protocol is configured {:?}", - request, - ); - }, - }, Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, diff --git a/client/network/src/service/tests/chain_sync.rs b/client/network/src/service/tests/chain_sync.rs index 21149459413f4..b62fb36461860 100644 --- a/client/network/src/service/tests/chain_sync.rs +++ b/client/network/src/service/tests/chain_sync.rs @@ -27,8 +27,8 @@ use sc_block_builder::BlockBuilderProvider; use sc_client_api::HeaderBackend; use sc_consensus::JustificationSyncLink; use sc_network_common::{ - config::{MultiaddrWithPeerId, SetConfig}, - protocol::event::Event, + config::{MultiaddrWithPeerId, ProtocolId, SetConfig}, + protocol::{event::Event, role::Roles, ProtocolName}, service::NetworkSyncForkRequest, sync::{SyncState, SyncStatus}, }; @@ -39,7 +39,6 @@ use sp_runtime::{ traits::{Block as BlockT, Header as _}, }; use std::{ - iter, sync::{Arc, RwLock}, task::Poll, time::Duration, @@ -49,10 +48,6 @@ use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _ fn set_default_expecations_no_peers( chain_sync: &mut MockChainSync, ) { - chain_sync.expect_block_requests().returning(|| Box::new(iter::empty())); - chain_sync.expect_state_request().returning(|| None); - chain_sync.expect_justification_requests().returning(|| Box::new(iter::empty())); - chain_sync.expect_warp_sync_request().returning(|| None); chain_sync.expect_poll().returning(|_| Poll::Pending); chain_sync.expect_status().returning(|| SyncStatus { state: SyncState::Idle, @@ -342,13 +337,19 @@ async fn disconnect_peer_using_chain_sync_handle() { sc_network_sync::service::network::NetworkServiceProvider::new(); let handle_clone = chain_sync_network_handle.clone(); - let (chain_sync, chain_sync_service) = ChainSync::new( + let (chain_sync, chain_sync_service, _) = ChainSync::new( sc_network_common::sync::SyncMode::Full, client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&config::Role::Full), Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), 1u32, None, chain_sync_network_handle.clone(), + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); diff --git a/client/network/src/service/tests/mod.rs b/client/network/src/service/tests/mod.rs index ef25616a07b0d..1d91fc142672f 100644 --- a/client/network/src/service/tests/mod.rs +++ b/client/network/src/service/tests/mod.rs @@ -216,31 +216,6 @@ impl TestNetworkBuilder { None, ))); - let (chain_sync_network_provider, chain_sync_network_handle) = - self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); - - let (chain_sync, chain_sync_service) = self.chain_sync.unwrap_or({ - let (chain_sync, chain_sync_service) = ChainSync::new( - match network_config.sync_mode { - config::SyncMode::Full => sc_network_common::sync::SyncMode::Full, - config::SyncMode::Fast { skip_proofs, storage_chain_mode } => - sc_network_common::sync::SyncMode::LightState { - skip_proofs, - storage_chain_mode, - }, - config::SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, - }, - client.clone(), - Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), - network_config.max_parallel_downloads, - None, - chain_sync_network_handle, - ) - .unwrap(); - - (Box::new(chain_sync), chain_sync_service) - }); - let protocol_id = ProtocolId::from("test-protocol-name"); let fork_id = Some(String::from("test-fork-id")); @@ -289,6 +264,37 @@ impl TestNetworkBuilder { }, }; + let (chain_sync_network_provider, chain_sync_network_handle) = + self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + + let (chain_sync, chain_sync_service) = self.chain_sync.unwrap_or({ + let (chain_sync, chain_sync_service, _) = ChainSync::new( + match network_config.sync_mode { + config::SyncMode::Full => sc_network_common::sync::SyncMode::Full, + config::SyncMode::Fast { skip_proofs, storage_chain_mode } => + sc_network_common::sync::SyncMode::LightState { + skip_proofs, + storage_chain_mode, + }, + config::SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, + }, + client.clone(), + protocol_id.clone(), + &fork_id, + Roles::from(&config::Role::Full), + Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), + network_config.max_parallel_downloads, + None, + chain_sync_network_handle, + block_request_protocol_config.name.clone(), + state_request_protocol_config.name.clone(), + None, + ) + .unwrap(); + + (Box::new(chain_sync), chain_sync_service) + }); + let worker = NetworkWorker::< substrate_test_runtime_client::runtime::Block, substrate_test_runtime_client::runtime::Hash, @@ -305,11 +311,12 @@ impl TestNetworkBuilder { chain_sync, chain_sync_service, metrics_registry: None, - block_request_protocol_config, - state_request_protocol_config, - light_client_request_protocol_config, - warp_sync_protocol_config: None, - request_response_protocol_configs: Vec::new(), + request_response_protocol_configs: [ + block_request_protocol_config, + state_request_protocol_config, + light_client_request_protocol_config, + ] + .to_vec(), }) .unwrap(); diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index ce1dd8f895d61..841388c7a68ee 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -18,6 +18,7 @@ prost-build = "0.11" [dependencies] array-bytes = "4.1" +async-trait = "0.1.58" codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] } futures = "0.3.21" libp2p = "0.49.0" diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 75ecb9322ca78..697445334a073 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -49,8 +49,10 @@ use crate::{ }; use codec::{Decode, DecodeAll, Encode}; use extra_requests::ExtraRequests; -use futures::{stream::FuturesUnordered, task::Poll, Future, FutureExt, StreamExt}; -use libp2p::PeerId; +use futures::{ + channel::oneshot, stream::FuturesUnordered, task::Poll, Future, FutureExt, StreamExt, +}; +use libp2p::{request_response::OutboundFailure, PeerId}; use log::{debug, error, info, trace, warn}; use prost::Message; use sc_client_api::{BlockBackend, ProofProvider}; @@ -59,16 +61,18 @@ use sc_network_common::{ config::{ NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, }, - protocol::role::Roles, + protocol::{role::Roles, ProtocolName}, + request_responses::{IfDisconnected, RequestFailure}, sync::{ message::{ BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock, }, warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider}, - BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, OnStateData, - OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, - PollBlockAnnounceValidation, SyncMode, SyncState, SyncStatus, + BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, + OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, + OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, PollResult, + SyncMode, SyncState, SyncStatus, }, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; @@ -170,6 +174,18 @@ mod rep { /// Peer response data does not have requested bits. pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response"); + + /// Reputation change when a peer doesn't respond in time to our messages. + pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout"); + + /// Peer is on unsupported protocol version. + pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol"); + + /// Reputation change when a peer refuses a request. + pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused"); + + /// We received a message that failed to decode. + pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); } enum AllowedRequests { @@ -223,6 +239,18 @@ struct GapSync { target: NumberFor, } +type PendingResponse = Pin< + Box< + dyn Future< + Output = ( + PeerId, + PeerRequest, + Result, RequestFailure>, oneshot::Canceled>, + ), + > + Send, + >, +>; + /// The main data structure which contains all the state for a chains /// active syncing strategy. pub struct ChainSync { @@ -272,7 +300,17 @@ pub struct ChainSync { /// Channel for receiving service commands service_rx: TracingUnboundedReceiver>, /// Handle for communicating with `NetworkService` - _network_service: service::network::NetworkServiceHandle, + network_service: service::network::NetworkServiceHandle, + /// Protocol name used for block announcements + block_announce_protocol_name: ProtocolName, + /// Protocol name used to send out block requests + block_request_protocol_name: ProtocolName, + /// Protocol name used to send out state requests + state_request_protocol_name: ProtocolName, + /// Protocol name used to send out warp sync requests + warp_sync_protocol_name: Option, + /// Pending responses + pending_responses: FuturesUnordered>, } /// All the data we have about a Peer that we are trying to sync with @@ -470,6 +508,10 @@ where self.peers.len() } + fn num_active_peers(&self) -> usize { + self.pending_responses.len() + } + fn new_peer( &mut self, who: PeerId, @@ -661,222 +703,6 @@ where .extend(peers); } - fn justification_requests<'a>( - &'a mut self, - ) -> Box)> + 'a> { - let peers = &mut self.peers; - let mut matcher = self.extra_justifications.matcher(); - Box::new(std::iter::from_fn(move || { - if let Some((peer, request)) = matcher.next(peers) { - peers - .get_mut(&peer) - .expect( - "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed", - ) - .state = PeerSyncState::DownloadingJustification(request.0); - let req = BlockRequest:: { - id: 0, - fields: BlockAttributes::JUSTIFICATION, - from: FromBlock::Hash(request.0), - direction: Direction::Ascending, - max: Some(1), - }; - Some((peer, req)) - } else { - None - } - })) - } - - fn block_requests<'a>( - &'a mut self, - ) -> Box)> + 'a> { - if self.mode == SyncMode::Warp { - return Box::new(std::iter::once(self.warp_target_block_request()).flatten()) - } - - if self.allowed_requests.is_empty() || self.state_sync.is_some() { - return Box::new(std::iter::empty()) - } - - if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { - trace!(target: "sync", "Too many blocks in the queue."); - return Box::new(std::iter::empty()) - } - let is_major_syncing = self.status().state.is_major_syncing(); - let attrs = self.required_block_attributes(); - let blocks = &mut self.blocks; - let fork_targets = &mut self.fork_targets; - let last_finalized = - std::cmp::min(self.best_queued_number, self.client.info().finalized_number); - let best_queued = self.best_queued_number; - let client = &self.client; - let queue = &self.queue_blocks; - let allowed_requests = self.allowed_requests.take(); - let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads }; - let gap_sync = &mut self.gap_sync; - let iter = self.peers.iter_mut().filter_map(move |(&id, peer)| { - if !peer.state.is_available() || !allowed_requests.contains(&id) { - return None - } - - // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the - // common number, the peer best number is higher than our best queued and the common - // number is smaller than the last finalized block number, we should do an ancestor - // search to find a better common block. If the queue is full we wait till all blocks - // are imported though. - if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into() && - best_queued < peer.best_number && - peer.common_number < last_finalized && - queue.len() <= MAJOR_SYNC_BLOCKS.into() - { - trace!( - target: "sync", - "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.", - id, - peer.common_number, - best_queued, - ); - let current = std::cmp::min(peer.best_number, best_queued); - peer.state = PeerSyncState::AncestorSearch { - current, - start: best_queued, - state: AncestorSearchState::ExponentialBackoff(One::one()), - }; - Some((id, ancestry_request::(current))) - } else if let Some((range, req)) = peer_block_request( - &id, - peer, - blocks, - attrs, - max_parallel, - last_finalized, - best_queued, - ) { - peer.state = PeerSyncState::DownloadingNew(range.start); - trace!( - target: "sync", - "New block request for {}, (best:{}, common:{}) {:?}", - id, - peer.best_number, - peer.common_number, - req, - ); - Some((id, req)) - } else if let Some((hash, req)) = - fork_sync_request(&id, fork_targets, best_queued, last_finalized, attrs, |hash| { - if queue.contains(hash) { - BlockStatus::Queued - } else { - client.block_status(&BlockId::Hash(*hash)).unwrap_or(BlockStatus::Unknown) - } - }) { - trace!(target: "sync", "Downloading fork {:?} from {}", hash, id); - peer.state = PeerSyncState::DownloadingStale(hash); - Some((id, req)) - } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| { - peer_gap_block_request( - &id, - peer, - &mut sync.blocks, - attrs, - sync.target, - sync.best_queued_number, - ) - }) { - peer.state = PeerSyncState::DownloadingGap(range.start); - trace!( - target: "sync", - "New gap block request for {}, (best:{}, common:{}) {:?}", - id, - peer.best_number, - peer.common_number, - req, - ); - Some((id, req)) - } else { - None - } - }); - Box::new(iter) - } - - fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { - if self.allowed_requests.is_empty() { - return None - } - if (self.state_sync.is_some() || self.warp_sync.is_some()) && - self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) - { - // Only one pending state request is allowed. - return None - } - if let Some(sync) = &self.state_sync { - if sync.is_complete() { - return None - } - - for (id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.common_number >= sync.target_block_num() { - peer.state = PeerSyncState::DownloadingState; - let request = sync.next_request(); - trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); - self.allowed_requests.clear(); - return Some((*id, OpaqueStateRequest(Box::new(request)))) - } - } - } - if let Some(sync) = &self.warp_sync { - if sync.is_complete() { - return None - } - if let (Some(request), Some(target)) = - (sync.next_state_request(), sync.target_block_number()) - { - for (id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.best_number >= target { - trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); - peer.state = PeerSyncState::DownloadingState; - self.allowed_requests.clear(); - return Some((*id, OpaqueStateRequest(Box::new(request)))) - } - } - } - } - None - } - - fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { - if let Some(sync) = &self.warp_sync { - if self.allowed_requests.is_empty() || - sync.is_complete() || - self.peers - .iter() - .any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof) - { - // Only one pending state request is allowed. - return None - } - if let Some(request) = sync.next_warp_proof_request() { - let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect(); - if !targets.is_empty() { - targets.sort(); - let median = targets[targets.len() / 2]; - // Find a random peer that is synced as much as peer majority. - for (id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.best_number >= median { - trace!(target: "sync", "New WarpProofRequest for {}", id); - peer.state = PeerSyncState::DownloadingWarpProof; - self.allowed_requests.clear(); - return Some((*id, request)) - } - } - } - } - } - None - } - fn on_block_data( &mut self, who: &PeerId, @@ -1135,153 +961,55 @@ where Ok(self.validate_and_queue_blocks(new_blocks, gap)) } - fn on_state_data( + fn on_block_justification( &mut self, - who: &PeerId, - response: OpaqueStateResponse, - ) -> Result, BadPeer> { - let response: Box = response.0.downcast().map_err(|_error| { - error!( - target: "sync", - "Failed to downcast opaque state response, this is an implementation bug." - ); + who: PeerId, + response: BlockResponse, + ) -> Result, BadPeer> { + let peer = if let Some(peer) = self.peers.get_mut(&who) { + peer + } else { + error!(target: "sync", "💔 Called on_block_justification with a bad peer ID"); + return Ok(OnBlockJustification::Nothing) + }; - BadPeer(*who, rep::BAD_RESPONSE) - })?; + self.allowed_requests.add(&who); + if let PeerSyncState::DownloadingJustification(hash) = peer.state { + peer.state = PeerSyncState::Available; - if let Some(peer) = self.peers.get_mut(who) { - if let PeerSyncState::DownloadingState = peer.state { - peer.state = PeerSyncState::Available; - self.allowed_requests.set_all(); - } - } - let import_result = if let Some(sync) = &mut self.state_sync { - debug!( - target: "sync", - "Importing state data from {} with {} keys, {} proof nodes.", - who, - response.entries.len(), - response.proof.len(), - ); - sync.import(*response) - } else if let Some(sync) = &mut self.warp_sync { - debug!( - target: "sync", - "Importing state data from {} with {} keys, {} proof nodes.", - who, - response.entries.len(), - response.proof.len(), - ); - sync.import_state(*response) - } else { - debug!(target: "sync", "Ignored obsolete state response from {}", who); - return Err(BadPeer(*who, rep::NOT_REQUESTED)) - }; - - match import_result { - state::ImportResult::Import(hash, header, state, body, justifications) => { - let origin = BlockOrigin::NetworkInitialSync; - let block = IncomingBlock { - hash, - header: Some(header), - body, - indexed_body: None, - justifications, - origin: None, - allow_missing_state: true, - import_existing: true, - skip_execution: self.skip_execution(), - state: Some(state), - }; - debug!(target: "sync", "State download is complete. Import is queued"); - Ok(OnStateData::Import(origin, block)) - }, - state::ImportResult::Continue => Ok(OnStateData::Continue), - state::ImportResult::BadResponse => { - debug!(target: "sync", "Bad state data received from {}", who); - Err(BadPeer(*who, rep::BAD_BLOCK)) - }, - } - } - - fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer> { - if let Some(peer) = self.peers.get_mut(who) { - if let PeerSyncState::DownloadingWarpProof = peer.state { - peer.state = PeerSyncState::Available; - self.allowed_requests.set_all(); - } - } - let import_result = if let Some(sync) = &mut self.warp_sync { - debug!( - target: "sync", - "Importing warp proof data from {}, {} bytes.", - who, - response.0.len(), - ); - sync.import_warp_proof(response) - } else { - debug!(target: "sync", "Ignored obsolete warp sync response from {}", who); - return Err(BadPeer(*who, rep::NOT_REQUESTED)) - }; - - match import_result { - WarpProofImportResult::Success => Ok(()), - WarpProofImportResult::BadResponse => { - debug!(target: "sync", "Bad proof data received from {}", who); - Err(BadPeer(*who, rep::BAD_BLOCK)) - }, - } - } - - fn on_block_justification( - &mut self, - who: PeerId, - response: BlockResponse, - ) -> Result, BadPeer> { - let peer = if let Some(peer) = self.peers.get_mut(&who) { - peer - } else { - error!(target: "sync", "💔 Called on_block_justification with a bad peer ID"); - return Ok(OnBlockJustification::Nothing) - }; - - self.allowed_requests.add(&who); - if let PeerSyncState::DownloadingJustification(hash) = peer.state { - peer.state = PeerSyncState::Available; - - // We only request one justification at a time - let justification = if let Some(block) = response.blocks.into_iter().next() { - if hash != block.hash { - warn!( - target: "sync", - "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", - who, - hash, - block.hash, - ); - return Err(BadPeer(who, rep::BAD_JUSTIFICATION)) - } - - block - .justifications - .or_else(|| legacy_justification_mapping(block.justification)) - } else { - // we might have asked the peer for a justification on a block that we assumed it - // had but didn't (regardless of whether it had a justification for it or not). - trace!( - target: "sync", - "Peer {:?} provided empty response for justification request {:?}", - who, - hash, - ); - - None - }; - - if let Some((peer, hash, number, j)) = - self.extra_justifications.on_response(who, justification) - { - return Ok(OnBlockJustification::Import { peer, hash, number, justifications: j }) + // We only request one justification at a time + let justification = if let Some(block) = response.blocks.into_iter().next() { + if hash != block.hash { + warn!( + target: "sync", + "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", + who, + hash, + block.hash, + ); + return Err(BadPeer(who, rep::BAD_JUSTIFICATION)) + } + + block + .justifications + .or_else(|| legacy_justification_mapping(block.justification)) + } else { + // we might have asked the peer for a justification on a block that we assumed it + // had but didn't (regardless of whether it had a justification for it or not). + trace!( + target: "sync", + "Peer {:?} provided empty response for justification request {:?}", + who, + hash, + ); + + None + }; + + if let Some((peer, hash, number, j)) = + self.extra_justifications.on_response(who, justification) + { + return Ok(OnBlockJustification::Import { peer, hash, number, justifications: j }) } } @@ -1627,38 +1355,6 @@ where } } - /// Create implementation-specific block request. - fn create_opaque_block_request(&self, request: &BlockRequest) -> OpaqueBlockRequest { - OpaqueBlockRequest(Box::new(schema::v1::BlockRequest { - fields: request.fields.to_be_u32(), - from_block: match request.from { - FromBlock::Hash(h) => Some(schema::v1::block_request::FromBlock::Hash(h.encode())), - FromBlock::Number(n) => - Some(schema::v1::block_request::FromBlock::Number(n.encode())), - }, - direction: request.direction as i32, - max_blocks: request.max.unwrap_or(0), - support_multiple_justifications: true, - })) - } - - fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result, String> { - let request: &schema::v1::BlockRequest = request.0.downcast_ref().ok_or_else(|| { - "Failed to downcast opaque block response during encoding, this is an \ - implementation bug." - .to_string() - })?; - - Ok(request.encode_to_vec()) - } - - fn decode_block_response(&self, response: &[u8]) -> Result { - let response = schema::v1::BlockResponse::decode(response) - .map_err(|error| format!("Failed to decode block response: {error}"))?; - - Ok(OpaqueBlockResponse(Box::new(response))) - } - fn block_response_into_blocks( &self, request: &BlockRequest, @@ -1725,27 +1421,7 @@ where .map_err(|error: codec::Error| error.to_string()) } - fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result, String> { - let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { - "Failed to downcast opaque state response during encoding, this is an \ - implementation bug." - .to_string() - })?; - - Ok(request.encode_to_vec()) - } - - fn decode_state_response(&self, response: &[u8]) -> Result { - let response = StateResponse::decode(response) - .map_err(|error| format!("Failed to decode state response: {error}"))?; - - Ok(OpaqueStateResponse(Box::new(response))) - } - - fn poll( - &mut self, - cx: &mut std::task::Context, - ) -> Poll> { + fn poll(&mut self, cx: &mut std::task::Context) -> Poll> { while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { match event { ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { @@ -1753,8 +1429,46 @@ where }, } } + self.process_outbound_requests(); + + if let Poll::Ready(result) = self.poll_pending_responses(cx) { + return Poll::Ready(PollResult::Import(result)) + } + + if let Poll::Ready(announce) = self.poll_block_announce_validation(cx) { + return Poll::Ready(PollResult::Announce(announce)) + } - self.poll_block_announce_validation(cx) + Poll::Pending + } + + fn send_block_request(&mut self, who: PeerId, request: BlockRequest) { + let (tx, rx) = oneshot::channel(); + let opaque_req = self.create_opaque_block_request(&request); + + if self.peers.contains_key(&who) { + self.pending_responses + .push(Box::pin(async move { (who, PeerRequest::Block(request), rx.await) })); + } + + match self.encode_block_request(&opaque_req) { + Ok(data) => { + self.network_service.start_request( + who, + self.block_request_protocol_name.clone(), + data, + tx, + IfDisconnected::ImmediateError, + ); + }, + Err(err) => { + log::warn!( + target: "sync", + "Failed to encode block request {:?}: {:?}", + opaque_req, err + ); + }, + } } } @@ -1774,12 +1488,30 @@ where pub fn new( mode: SyncMode, client: Arc, + protocol_id: ProtocolId, + fork_id: &Option, + roles: Roles, block_announce_validator: Box + Send>, max_parallel_downloads: u32, warp_sync_provider: Option>>, - _network_service: service::network::NetworkServiceHandle, - ) -> Result<(Self, Box>), ClientError> { + network_service: service::network::NetworkServiceHandle, + block_request_protocol_name: ProtocolName, + state_request_protocol_name: ProtocolName, + warp_sync_protocol_name: Option, + ) -> Result<(Self, Box>, NonDefaultSetConfig), ClientError> { let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync"); + let block_announce_config = Self::get_block_announce_proto_config( + protocol_id, + fork_id, + roles, + client.info().best_number, + client.info().best_hash, + client + .block_hash(Zero::zero()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + ); let mut sync = Self { client, @@ -1803,10 +1535,19 @@ where import_existing: false, gap_sync: None, service_rx, - _network_service, + network_service, + block_request_protocol_name, + state_request_protocol_name, + warp_sync_protocol_name, + block_announce_protocol_name: block_announce_config + .notifications_protocol + .clone() + .into(), + pending_responses: Default::default(), }; + sync.reset_sync_start_point()?; - Ok((sync, Box::new(ChainSyncInterfaceHandle::new(tx)))) + Ok((sync, Box::new(ChainSyncInterfaceHandle::new(tx)), block_announce_config)) } /// Returns the median seen block number. @@ -2203,72 +1944,670 @@ where Ok(()) } - /// What is the status of the block corresponding to the given hash? - fn block_status(&self, hash: &B::Hash) -> Result { - if self.queue_blocks.contains(hash) { - return Ok(BlockStatus::Queued) + /// What is the status of the block corresponding to the given hash? + fn block_status(&self, hash: &B::Hash) -> Result { + if self.queue_blocks.contains(hash) { + return Ok(BlockStatus::Queued) + } + self.client.block_status(&BlockId::Hash(*hash)) + } + + /// Is the block corresponding to the given hash known? + fn is_known(&self, hash: &B::Hash) -> bool { + self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown) + } + + /// Is any peer downloading the given hash? + fn is_already_downloading(&self, hash: &B::Hash) -> bool { + self.peers + .iter() + .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) + } + + /// Get the set of downloaded blocks that are ready to be queued for import. + fn ready_blocks(&mut self) -> Vec> { + self.blocks + .ready_blocks(self.best_queued_number + One::one()) + .into_iter() + .map(|block_data| { + let justifications = block_data + .block + .justifications + .or_else(|| legacy_justification_mapping(block_data.block.justification)); + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + indexed_body: block_data.block.indexed_body, + justifications, + origin: block_data.origin, + allow_missing_state: true, + import_existing: self.import_existing, + skip_execution: self.skip_execution(), + state: None, + } + }) + .collect() + } + + /// Generate block request for downloading of the target block body during warp sync. + fn warp_target_block_request(&mut self) -> Option<(PeerId, BlockRequest)> { + let sync = &self.warp_sync.as_ref()?; + + if self.allowed_requests.is_empty() || + sync.is_complete() || + self.peers + .iter() + .any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpTargetBlock) + { + // Only one pending warp target block request is allowed. + return None + } + + if let Some((target_number, request)) = sync.next_target_block_request() { + // Find a random peer that has a block with the target number. + for (id, peer) in self.peers.iter_mut() { + if peer.state.is_available() && peer.best_number >= target_number { + trace!(target: "sync", "New warp target block request for {}", id); + peer.state = PeerSyncState::DownloadingWarpTargetBlock; + self.allowed_requests.clear(); + return Some((*id, request)) + } + } + } + + None + } + + /// Get config for the block announcement protocol + pub fn get_block_announce_proto_config( + protocol_id: ProtocolId, + fork_id: &Option, + roles: Roles, + best_number: NumberFor, + best_hash: B::Hash, + genesis_hash: B::Hash, + ) -> NonDefaultSetConfig { + let block_announces_protocol = { + let genesis_hash = genesis_hash.as_ref(); + if let Some(ref fork_id) = fork_id { + format!( + "/{}/{}/block-announces/1", + array_bytes::bytes2hex("", genesis_hash), + fork_id + ) + } else { + format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash)) + } + }; + + NonDefaultSetConfig { + notifications_protocol: block_announces_protocol.into(), + fallback_names: iter::once( + format!("/{}/block-announces/1", protocol_id.as_ref()).into(), + ) + .collect(), + max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE, + handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build( + roles, + best_number, + best_hash, + genesis_hash, + ))), + // NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement + // protocol is still hardcoded into the peerset. + set_config: SetConfig { + in_peers: 0, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Deny, + }, + } + } + + fn decode_block_response(response: &[u8]) -> Result { + let response = schema::v1::BlockResponse::decode(response) + .map_err(|error| format!("Failed to decode block response: {error}"))?; + + Ok(OpaqueBlockResponse(Box::new(response))) + } + + fn decode_state_response(response: &[u8]) -> Result { + let response = StateResponse::decode(response) + .map_err(|error| format!("Failed to decode state response: {error}"))?; + + Ok(OpaqueStateResponse(Box::new(response))) + } + + fn send_state_request(&mut self, who: PeerId, request: OpaqueStateRequest) { + let (tx, rx) = oneshot::channel(); + + if self.peers.contains_key(&who) { + self.pending_responses + .push(Box::pin(async move { (who, PeerRequest::State, rx.await) })); + } + + match self.encode_state_request(&request) { + Ok(data) => { + self.network_service.start_request( + who, + self.state_request_protocol_name.clone(), + data, + tx, + IfDisconnected::ImmediateError, + ); + }, + Err(err) => { + log::warn!( + target: "sync", + "Failed to encode state request {:?}: {:?}", + request, err + ); + }, + } + } + + fn send_warp_sync_request(&mut self, who: PeerId, request: WarpProofRequest) { + let (tx, rx) = oneshot::channel(); + + if self.peers.contains_key(&who) { + self.pending_responses + .push(Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) })); + } + + match &self.warp_sync_protocol_name { + Some(name) => self.network_service.start_request( + who, + name.clone(), + request.encode(), + tx, + IfDisconnected::ImmediateError, + ), + None => { + log::warn!( + target: "sync", + "Trying to send warp sync request when no protocol is configured {:?}", + request, + ); + }, + } + } + + fn on_block_response( + &mut self, + peer_id: PeerId, + request: BlockRequest, + response: OpaqueBlockResponse, + ) -> Option> { + let blocks = match self.block_response_into_blocks(&request, response) { + Ok(blocks) => blocks, + Err(err) => { + debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err); + self.network_service.report_peer(peer_id, rep::BAD_MESSAGE); + return None + }, + }; + + let block_response = BlockResponse:: { id: request.id, blocks }; + + let blocks_range = || match ( + block_response + .blocks + .first() + .and_then(|b| b.header.as_ref().map(|h| h.number())), + block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + trace!( + target: "sync", + "BlockResponse {} from {} with {} blocks {}", + block_response.id, + peer_id, + block_response.blocks.len(), + blocks_range(), + ); + + if request.fields == BlockAttributes::JUSTIFICATION { + match self.on_block_justification(peer_id, block_response) { + Ok(OnBlockJustification::Nothing) => None, + Ok(OnBlockJustification::Import { peer, hash, number, justifications }) => + Some(ImportResult::JustificationImport(peer, hash, number, justifications)), + Err(BadPeer(id, repu)) => { + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(id, repu); + None + }, + } + } else { + match self.on_block_data(&peer_id, Some(request), block_response) { + Ok(OnBlockData::Import(origin, blocks)) => + Some(ImportResult::BlockImport(origin, blocks)), + Ok(OnBlockData::Request(peer, req)) => { + self.send_block_request(peer, req); + None + }, + Ok(OnBlockData::Continue) => None, + Err(BadPeer(id, repu)) => { + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(id, repu); + None + }, + } + } + } + + pub fn on_state_response( + &mut self, + peer_id: PeerId, + response: OpaqueStateResponse, + ) -> Option> { + match self.on_state_data(&peer_id, response) { + Ok(OnStateData::Import(origin, block)) => + Some(ImportResult::BlockImport(origin, vec![block])), + Ok(OnStateData::Continue) => None, + Err(BadPeer(id, repu)) => { + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(id, repu); + None + }, + } + } + + pub fn on_warp_sync_response(&mut self, peer_id: PeerId, response: EncodedProof) { + if let Err(BadPeer(id, repu)) = self.on_warp_sync_data(&peer_id, response) { + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(id, repu); + } + } + + fn process_outbound_requests(&mut self) { + for (id, request) in self.block_requests() { + self.send_block_request(id, request); + } + + if let Some((id, request)) = self.state_request() { + self.send_state_request(id, request); + } + + for (id, request) in self.justification_requests().collect::>() { + self.send_block_request(id, request); + } + + if let Some((id, request)) = self.warp_sync_request() { + self.send_warp_sync_request(id, request); + } + } + + fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll> { + while let Poll::Ready(Some((id, request, response))) = + self.pending_responses.poll_next_unpin(cx) + { + match response { + Ok(Ok(resp)) => match request { + PeerRequest::Block(req) => { + let response = match Self::decode_block_response(&resp[..]) { + Ok(proto) => proto, + Err(e) => { + debug!( + target: "sync", + "Failed to decode block response from peer {:?}: {:?}.", + id, + e + ); + self.network_service.report_peer(id, rep::BAD_MESSAGE); + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + continue + }, + }; + + if let Some(import) = self.on_block_response(id, req, response) { + return Poll::Ready(import) + } + }, + PeerRequest::State => { + let response = match Self::decode_state_response(&resp[..]) { + Ok(proto) => proto, + Err(e) => { + debug!( + target: "sync", + "Failed to decode state response from peer {:?}: {:?}.", + id, + e + ); + self.network_service.report_peer(id, rep::BAD_MESSAGE); + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + continue + }, + }; + + if let Some(import) = self.on_state_response(id, response) { + return Poll::Ready(import) + } + }, + PeerRequest::WarpProof => { + self.on_warp_sync_response(id, EncodedProof(resp)); + }, + }, + Ok(Err(e)) => { + debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e); + + match e { + RequestFailure::Network(OutboundFailure::Timeout) => { + self.network_service.report_peer(id, rep::TIMEOUT); + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => { + self.network_service.report_peer(id, rep::BAD_PROTOCOL); + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::DialFailure) => { + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::Refused => { + self.network_service.report_peer(id, rep::REFUSED); + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::ConnectionClosed) | + RequestFailure::NotConnected => { + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::UnknownProtocol => { + debug_assert!(false, "Block request protocol should always be known."); + }, + RequestFailure::Obsolete => { + debug_assert!( + false, + "Can not receive `RequestFailure::Obsolete` after dropping the \ + response receiver.", + ); + }, + } + }, + Err(oneshot::Canceled) => { + trace!( + target: "sync", + "Request to peer {:?} failed due to oneshot being canceled.", + id, + ); + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + }, + } + } + + Poll::Pending + } + + /// Create implementation-specific block request. + fn create_opaque_block_request(&self, request: &BlockRequest) -> OpaqueBlockRequest { + OpaqueBlockRequest(Box::new(schema::v1::BlockRequest { + fields: request.fields.to_be_u32(), + from_block: match request.from { + FromBlock::Hash(h) => Some(schema::v1::block_request::FromBlock::Hash(h.encode())), + FromBlock::Number(n) => + Some(schema::v1::block_request::FromBlock::Number(n.encode())), + }, + direction: request.direction as i32, + max_blocks: request.max.unwrap_or(0), + support_multiple_justifications: true, + })) + } + + fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result, String> { + let request: &schema::v1::BlockRequest = request.0.downcast_ref().ok_or_else(|| { + "Failed to downcast opaque block response during encoding, this is an \ + implementation bug." + .to_string() + })?; + + Ok(request.encode_to_vec()) + } + + fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result, String> { + let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { + "Failed to downcast opaque state response during encoding, this is an \ + implementation bug." + .to_string() + })?; + + Ok(request.encode_to_vec()) + } + + fn justification_requests<'a>( + &'a mut self, + ) -> Box)> + 'a> { + let peers = &mut self.peers; + let mut matcher = self.extra_justifications.matcher(); + Box::new(std::iter::from_fn(move || { + if let Some((peer, request)) = matcher.next(peers) { + peers + .get_mut(&peer) + .expect( + "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed", + ) + .state = PeerSyncState::DownloadingJustification(request.0); + let req = BlockRequest:: { + id: 0, + fields: BlockAttributes::JUSTIFICATION, + from: FromBlock::Hash(request.0), + direction: Direction::Ascending, + max: Some(1), + }; + Some((peer, req)) + } else { + None + } + })) + } + + fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { + if self.mode == SyncMode::Warp { + return self + .warp_target_block_request() + .map_or_else(|| Vec::new(), |req| Vec::from([req])) + } + + if self.allowed_requests.is_empty() || self.state_sync.is_some() { + return Vec::new() + } + + if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { + trace!(target: "sync", "Too many blocks in the queue."); + return Vec::new() + } + let is_major_syncing = self.status().state.is_major_syncing(); + let attrs = self.required_block_attributes(); + let blocks = &mut self.blocks; + let fork_targets = &mut self.fork_targets; + let last_finalized = + std::cmp::min(self.best_queued_number, self.client.info().finalized_number); + let best_queued = self.best_queued_number; + let client = &self.client; + let queue = &self.queue_blocks; + let allowed_requests = self.allowed_requests.take(); + let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads }; + let gap_sync = &mut self.gap_sync; + self.peers + .iter_mut() + .filter_map(move |(&id, peer)| { + if !peer.state.is_available() || !allowed_requests.contains(&id) { + return None + } + + // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from + // the common number, the peer best number is higher than our best queued and the + // common number is smaller than the last finalized block number, we should do an + // ancestor search to find a better common block. If the queue is full we wait till + // all blocks are imported though. + if best_queued.saturating_sub(peer.common_number) > + MAX_BLOCKS_TO_LOOK_BACKWARDS.into() && + best_queued < peer.best_number && + peer.common_number < last_finalized && + queue.len() <= MAJOR_SYNC_BLOCKS.into() + { + trace!( + target: "sync", + "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.", + id, + peer.common_number, + best_queued, + ); + let current = std::cmp::min(peer.best_number, best_queued); + peer.state = PeerSyncState::AncestorSearch { + current, + start: best_queued, + state: AncestorSearchState::ExponentialBackoff(One::one()), + }; + Some((id, ancestry_request::(current))) + } else if let Some((range, req)) = peer_block_request( + &id, + peer, + blocks, + attrs, + max_parallel, + last_finalized, + best_queued, + ) { + peer.state = PeerSyncState::DownloadingNew(range.start); + trace!( + target: "sync", + "New block request for {}, (best:{}, common:{}) {:?}", + id, + peer.best_number, + peer.common_number, + req, + ); + Some((id, req)) + } else if let Some((hash, req)) = fork_sync_request( + &id, + fork_targets, + best_queued, + last_finalized, + attrs, + |hash| { + if queue.contains(hash) { + BlockStatus::Queued + } else { + client + .block_status(&BlockId::Hash(*hash)) + .unwrap_or(BlockStatus::Unknown) + } + }, + ) { + trace!(target: "sync", "Downloading fork {:?} from {}", hash, id); + peer.state = PeerSyncState::DownloadingStale(hash); + Some((id, req)) + } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| { + peer_gap_block_request( + &id, + peer, + &mut sync.blocks, + attrs, + sync.target, + sync.best_queued_number, + ) + }) { + peer.state = PeerSyncState::DownloadingGap(range.start); + trace!( + target: "sync", + "New gap block request for {}, (best:{}, common:{}) {:?}", + id, + peer.best_number, + peer.common_number, + req, + ); + Some((id, req)) + } else { + None + } + }) + .collect() + // Box::new(iter) + } + + fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { + if self.allowed_requests.is_empty() { + return None } - self.client.block_status(&BlockId::Hash(*hash)) - } - - /// Is the block corresponding to the given hash known? - fn is_known(&self, hash: &B::Hash) -> bool { - self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown) - } - - /// Is any peer downloading the given hash? - fn is_already_downloading(&self, hash: &B::Hash) -> bool { - self.peers - .iter() - .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) - } + if (self.state_sync.is_some() || self.warp_sync.is_some()) && + self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) + { + // Only one pending state request is allowed. + return None + } + if let Some(sync) = &self.state_sync { + if sync.is_complete() { + return None + } - /// Get the set of downloaded blocks that are ready to be queued for import. - fn ready_blocks(&mut self) -> Vec> { - self.blocks - .ready_blocks(self.best_queued_number + One::one()) - .into_iter() - .map(|block_data| { - let justifications = block_data - .block - .justifications - .or_else(|| legacy_justification_mapping(block_data.block.justification)); - IncomingBlock { - hash: block_data.block.hash, - header: block_data.block.header, - body: block_data.block.body, - indexed_body: block_data.block.indexed_body, - justifications, - origin: block_data.origin, - allow_missing_state: true, - import_existing: self.import_existing, - skip_execution: self.skip_execution(), - state: None, + for (id, peer) in self.peers.iter_mut() { + if peer.state.is_available() && peer.common_number >= sync.target_block_num() { + peer.state = PeerSyncState::DownloadingState; + let request = sync.next_request(); + trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); + self.allowed_requests.clear(); + return Some((*id, OpaqueStateRequest(Box::new(request)))) } - }) - .collect() + } + } + if let Some(sync) = &self.warp_sync { + if sync.is_complete() { + return None + } + if let (Some(request), Some(target)) = + (sync.next_state_request(), sync.target_block_number()) + { + for (id, peer) in self.peers.iter_mut() { + if peer.state.is_available() && peer.best_number >= target { + trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); + peer.state = PeerSyncState::DownloadingState; + self.allowed_requests.clear(); + return Some((*id, OpaqueStateRequest(Box::new(request)))) + } + } + } + } + None } - /// Generate block request for downloading of the target block body during warp sync. - fn warp_target_block_request(&mut self) -> Option<(PeerId, BlockRequest)> { + fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { if let Some(sync) = &self.warp_sync { if self.allowed_requests.is_empty() || sync.is_complete() || self.peers .iter() - .any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpTargetBlock) + .any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof) { - // Only one pending warp target block request is allowed. + // Only one pending state request is allowed. return None } - if let Some((target_number, request)) = sync.next_target_block_request() { - // Find a random peer that has a block with the target number. - for (id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.best_number >= target_number { - trace!(target: "sync", "New warp target block request for {}", id); - peer.state = PeerSyncState::DownloadingWarpTargetBlock; - self.allowed_requests.clear(); - return Some((*id, request)) + if let Some(request) = sync.next_warp_proof_request() { + let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect(); + if !targets.is_empty() { + targets.sort(); + let median = targets[targets.len() / 2]; + // Find a random peer that is synced as much as peer majority. + for (id, peer) in self.peers.iter_mut() { + if peer.state.is_available() && peer.best_number >= median { + trace!(target: "sync", "New WarpProofRequest for {}", id); + peer.state = PeerSyncState::DownloadingWarpProof; + self.allowed_requests.clear(); + return Some((*id, request)) + } } } } @@ -2276,49 +2615,100 @@ where None } - /// Get config for the block announcement protocol - pub fn get_block_announce_proto_config( - &self, - protocol_id: ProtocolId, - fork_id: &Option, - roles: Roles, - best_number: NumberFor, - best_hash: B::Hash, - genesis_hash: B::Hash, - ) -> NonDefaultSetConfig { - let block_announces_protocol = { - let genesis_hash = genesis_hash.as_ref(); - if let Some(ref fork_id) = fork_id { - format!( - "/{}/{}/block-announces/1", - array_bytes::bytes2hex("", genesis_hash), - fork_id - ) - } else { - format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash)) + fn on_state_data( + &mut self, + who: &PeerId, + response: OpaqueStateResponse, + ) -> Result, BadPeer> { + let response: Box = response.0.downcast().map_err(|_error| { + error!( + target: "sync", + "Failed to downcast opaque state response, this is an implementation bug." + ); + + BadPeer(*who, rep::BAD_RESPONSE) + })?; + + if let Some(peer) = self.peers.get_mut(who) { + if let PeerSyncState::DownloadingState = peer.state { + peer.state = PeerSyncState::Available; + self.allowed_requests.set_all(); } + } + let import_result = if let Some(sync) = &mut self.state_sync { + debug!( + target: "sync", + "Importing state data from {} with {} keys, {} proof nodes.", + who, + response.entries.len(), + response.proof.len(), + ); + sync.import(*response) + } else if let Some(sync) = &mut self.warp_sync { + debug!( + target: "sync", + "Importing state data from {} with {} keys, {} proof nodes.", + who, + response.entries.len(), + response.proof.len(), + ); + sync.import_state(*response) + } else { + debug!(target: "sync", "Ignored obsolete state response from {}", who); + return Err(BadPeer(*who, rep::NOT_REQUESTED)) }; - NonDefaultSetConfig { - notifications_protocol: block_announces_protocol.into(), - fallback_names: iter::once( - format!("/{}/block-announces/1", protocol_id.as_ref()).into(), - ) - .collect(), - max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE, - handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build( - roles, - best_number, - best_hash, - genesis_hash, - ))), - // NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement - // protocol is still hardcoded into the peerset. - set_config: SetConfig { - in_peers: 0, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: NonReservedPeerMode::Deny, + match import_result { + state::ImportResult::Import(hash, header, state, body, justifications) => { + let origin = BlockOrigin::NetworkInitialSync; + let block = IncomingBlock { + hash, + header: Some(header), + body, + indexed_body: None, + justifications, + origin: None, + allow_missing_state: true, + import_existing: true, + skip_execution: self.skip_execution(), + state: Some(state), + }; + debug!(target: "sync", "State download is complete. Import is queued"); + Ok(OnStateData::Import(origin, block)) + }, + state::ImportResult::Continue => Ok(OnStateData::Continue), + state::ImportResult::BadResponse => { + debug!(target: "sync", "Bad state data received from {}", who); + Err(BadPeer(*who, rep::BAD_BLOCK)) + }, + } + } + + fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer> { + if let Some(peer) = self.peers.get_mut(who) { + if let PeerSyncState::DownloadingWarpProof = peer.state { + peer.state = PeerSyncState::Available; + self.allowed_requests.set_all(); + } + } + let import_result = if let Some(sync) = &mut self.warp_sync { + debug!( + target: "sync", + "Importing warp proof data from {}, {} bytes.", + who, + response.0.len(), + ); + sync.import_warp_proof(response) + } else { + debug!(target: "sync", "Ignored obsolete warp sync response from {}", who); + return Err(BadPeer(*who, rep::NOT_REQUESTED)) + }; + + match import_result { + WarpProofImportResult::Success => Ok(()), + WarpProofImportResult::BadResponse => { + debug!(target: "sync", "Bad proof data received from {}", who); + Err(BadPeer(*who, rep::BAD_BLOCK)) }, } } @@ -2677,7 +3067,10 @@ mod test { use crate::service::network::NetworkServiceProvider; use futures::{executor::block_on, future::poll_fn}; use sc_block_builder::BlockBuilderProvider; - use sc_network_common::sync::message::{BlockData, BlockState, FromBlock}; + use sc_network_common::{ + protocol::role::Role, + sync::message::{BlockData, BlockState, FromBlock}, + }; use sp_blockchain::HeaderBackend; use sp_consensus::block_validation::DefaultBlockAnnounceValidator; use substrate_test_runtime_client::{ @@ -2698,13 +3091,19 @@ mod test { let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut sync, _) = ChainSync::new( + let (mut sync, _, _) = ChainSync::new( SyncMode::Full, client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), block_announce_validator, 1, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); @@ -2755,13 +3154,19 @@ mod test { let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut sync, _) = ChainSync::new( + let (mut sync, _, _) = ChainSync::new( SyncMode::Full, client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 1, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); @@ -2787,7 +3192,10 @@ mod test { // we wil send block requests to these peers // for these blocks we don't know about - assert!(sync.block_requests().all(|(p, _)| { p == peer_id1 || p == peer_id2 })); + assert!(sync + .block_requests() + .into_iter() + .all(|(p, _)| { p == peer_id1 || p == peer_id2 })); // add a new peer at a known block sync.new_peer(peer_id3, b1_hash, b1_number).unwrap(); @@ -2877,7 +3285,7 @@ mod test { max: u32, peer: &PeerId, ) -> BlockRequest { - let requests = sync.block_requests().collect::>(); + let requests = sync.block_requests(); log::trace!(target: "sync", "Requests: {:?}", requests); @@ -2925,13 +3333,19 @@ mod test { let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut sync, _) = ChainSync::new( + let (mut sync, _, _) = ChainSync::new( SyncMode::Full, client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 5, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); @@ -2967,7 +3381,7 @@ mod test { sync.update_chain_info(&block3.hash(), 3); // There should be no requests. - assert!(sync.block_requests().collect::>().is_empty()); + assert!(sync.block_requests().is_empty()); // Let peer2 announce a fork of block 3 send_block_announce(block3_fork.header().clone(), &peer_id2, &mut sync); @@ -3043,13 +3457,19 @@ mod test { NetworkServiceProvider::new(); let info = client.info(); - let (mut sync, _) = ChainSync::new( + let (mut sync, _, _) = ChainSync::new( SyncMode::Full, client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 5, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); @@ -3117,15 +3537,16 @@ mod test { // Let peer2 announce that it finished syncing send_block_announce(best_block.header().clone(), &peer_id2, &mut sync); - let (peer1_req, peer2_req) = sync.block_requests().fold((None, None), |res, req| { - if req.0 == peer_id1 { - (Some(req.1), res.1) - } else if req.0 == peer_id2 { - (res.0, Some(req.1)) - } else { - panic!("Unexpected req: {:?}", req) - } - }); + let (peer1_req, peer2_req) = + sync.block_requests().into_iter().fold((None, None), |res, req| { + if req.0 == peer_id1 { + (Some(req.1), res.1) + } else if req.0 == peer_id2 { + (res.0, Some(req.1)) + } else { + panic!("Unexpected req: {:?}", req) + } + }); // We should now do an ancestor search to find the correct common block. let peer2_req = peer2_req.unwrap(); @@ -3189,13 +3610,19 @@ mod test { let info = client.info(); - let (mut sync, _) = ChainSync::new( + let (mut sync, _, _) = ChainSync::new( SyncMode::Full, client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 5, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); @@ -3321,13 +3748,19 @@ mod test { let info = client.info(); - let (mut sync, _) = ChainSync::new( + let (mut sync, _, _) = ChainSync::new( SyncMode::Full, client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 5, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); @@ -3453,13 +3886,19 @@ mod test { let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::>(); - let (mut sync, _) = ChainSync::new( + let (mut sync, _, _) = ChainSync::new( SyncMode::Full, client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 1, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); @@ -3489,13 +3928,19 @@ mod test { let empty_client = Arc::new(TestClientBuilder::new().build()); - let (mut sync, _) = ChainSync::new( + let (mut sync, _, _) = ChainSync::new( SyncMode::Full, empty_client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 1, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); diff --git a/client/network/sync/src/mock.rs b/client/network/sync/src/mock.rs index fbb54bd5e998d..48d72c425bd03 100644 --- a/client/network/sync/src/mock.rs +++ b/client/network/sync/src/mock.rs @@ -24,10 +24,8 @@ use libp2p::PeerId; use sc_consensus::{BlockImportError, BlockImportStatus}; use sc_network_common::sync::{ message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}, - warp::{EncodedProof, WarpProofRequest}, - BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, OnStateData, - OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, - PollBlockAnnounceValidation, SyncStatus, + BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, + OpaqueBlockResponse, PeerInfo, PollBlockAnnounceValidation, PollResult, SyncStatus, }; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -40,6 +38,7 @@ mockall::mock! { fn num_sync_requests(&self) -> usize; fn num_downloaded_blocks(&self) -> usize; fn num_peers(&self) -> usize; + fn num_active_peers(&self) -> usize; fn new_peer( &mut self, who: PeerId, @@ -55,24 +54,12 @@ mockall::mock! { hash: &Block::Hash, number: NumberFor, ); - fn justification_requests<'a>( - &'a mut self, - ) -> Box)> + 'a>; - fn block_requests<'a>(&'a mut self) -> Box)> + 'a>; - fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)>; - fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)>; fn on_block_data( &mut self, who: &PeerId, request: Option>, response: BlockResponse, ) -> Result, BadPeer>; - fn on_state_data( - &mut self, - who: &PeerId, - response: OpaqueStateResponse, - ) -> Result, BadPeer>; - fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer>; fn on_block_justification( &mut self, who: PeerId, @@ -104,19 +91,19 @@ mockall::mock! { ) -> Poll>; fn peer_disconnected(&mut self, who: &PeerId) -> Option>; fn metrics(&self) -> Metrics; - fn create_opaque_block_request(&self, request: &BlockRequest) -> OpaqueBlockRequest; - fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result, String>; - fn decode_block_response(&self, response: &[u8]) -> Result; fn block_response_into_blocks( &self, request: &BlockRequest, response: OpaqueBlockResponse, ) -> Result>, String>; - fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result, String>; - fn decode_state_response(&self, response: &[u8]) -> Result; fn poll<'a>( &mut self, cx: &mut std::task::Context<'a>, - ) -> Poll>; + ) -> Poll>; + fn send_block_request( + &mut self, + who: PeerId, + request: BlockRequest, + ); } } diff --git a/client/network/sync/src/service/mock.rs b/client/network/sync/src/service/mock.rs index c146e1ec07b48..c8a29e1fba8ea 100644 --- a/client/network/sync/src/service/mock.rs +++ b/client/network/sync/src/service/mock.rs @@ -16,13 +16,16 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use sc_network_common::service::{NetworkPeers, NetworkSyncForkRequest}; -use sp_runtime::traits::{Block as BlockT, NumberFor}; - -pub use libp2p::{identity::error::SigningError, kad::record::Key as KademliaKey}; +use futures::channel::oneshot; use libp2p::{Multiaddr, PeerId}; -use sc_network_common::{config::MultiaddrWithPeerId, protocol::ProtocolName}; +use sc_network_common::{ + config::MultiaddrWithPeerId, + protocol::ProtocolName, + request_responses::{IfDisconnected, RequestFailure}, + service::{NetworkPeers, NetworkRequest, NetworkSyncForkRequest}, +}; use sc_peerset::ReputationChange; +use sp_runtime::traits::{Block as BlockT, NumberFor}; use std::collections::HashSet; mockall::mock! { @@ -72,4 +75,23 @@ mockall::mock! { fn remove_from_peers_set(&self, protocol: ProtocolName, peers: Vec); fn sync_num_connected(&self) -> usize; } + + #[async_trait::async_trait] + impl NetworkRequest for Network { + async fn request( + &self, + target: PeerId, + protocol: ProtocolName, + request: Vec, + connect: IfDisconnected, + ) -> Result, RequestFailure>; + fn start_request( + &self, + target: PeerId, + protocol: ProtocolName, + request: Vec, + tx: oneshot::Sender, RequestFailure>>, + connect: IfDisconnected, + ); + } } diff --git a/client/network/sync/src/service/network.rs b/client/network/sync/src/service/network.rs index 44ed177661264..43501baeec7be 100644 --- a/client/network/sync/src/service/network.rs +++ b/client/network/sync/src/service/network.rs @@ -16,17 +16,21 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use futures::StreamExt; +use futures::{channel::oneshot, StreamExt}; use libp2p::PeerId; -use sc_network_common::{protocol::ProtocolName, service::NetworkPeers}; +use sc_network_common::{ + protocol::ProtocolName, + request_responses::{IfDisconnected, RequestFailure}, + service::{NetworkPeers, NetworkRequest}, +}; use sc_peerset::ReputationChange; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use std::sync::Arc; /// Network-related services required by `sc-network-sync` -pub trait Network: NetworkPeers {} +pub trait Network: NetworkPeers + NetworkRequest {} -impl Network for T where T: NetworkPeers {} +impl Network for T where T: NetworkPeers + NetworkRequest {} /// Network service provider for `ChainSync` /// @@ -43,6 +47,15 @@ pub enum ToServiceCommand { /// Call `NetworkPeers::report_peer()` ReportPeer(PeerId, ReputationChange), + + /// Call `NetworkRequest::start_request()` + StartRequest( + PeerId, + ProtocolName, + Vec, + oneshot::Sender, RequestFailure>>, + IfDisconnected, + ), } /// Handle that is (temporarily) passed to `ChainSync` so it can @@ -67,6 +80,20 @@ impl NetworkServiceHandle { pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) { let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol)); } + + /// Send request to peer + pub fn start_request( + &self, + who: PeerId, + protocol: ProtocolName, + request: Vec, + tx: oneshot::Sender, RequestFailure>>, + connect: IfDisconnected, + ) { + let _ = self + .tx + .unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect)); + } } impl NetworkServiceProvider { @@ -85,6 +112,8 @@ impl NetworkServiceProvider { service.disconnect_peer(peer, protocol_name), ToServiceCommand::ReportPeer(peer, reputation_change) => service.report_peer(peer, reputation_change), + ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) => + service.start_request(peer, protocol, request, tx, connect), } } } diff --git a/client/network/sync/src/tests.rs b/client/network/sync/src/tests.rs index 479c78bfdea97..bd78c3b45226d 100644 --- a/client/network/sync/src/tests.rs +++ b/client/network/sync/src/tests.rs @@ -19,7 +19,15 @@ use crate::{service::network::NetworkServiceProvider, ChainSync, ForkTarget}; use libp2p::PeerId; -use sc_network_common::{service::NetworkSyncForkRequest, sync::ChainSync as ChainSyncT}; +use sc_network_common::{ + config::ProtocolId, + protocol::{ + role::{Role, Roles}, + ProtocolName, + }, + service::NetworkSyncForkRequest, + sync::ChainSync as ChainSyncT, +}; use sp_consensus::block_validation::DefaultBlockAnnounceValidator; use sp_core::H256; use std::{sync::Arc, task::Poll}; @@ -30,13 +38,19 @@ use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _ #[async_std::test] async fn delegate_to_chainsync() { let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut chain_sync, chain_sync_service) = ChainSync::new( + let (mut chain_sync, chain_sync_service, _) = ChainSync::new( sc_network_common::sync::SyncMode::Full, Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 1u32, None, chain_sync_network_handle, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, ) .unwrap(); diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 035fc0a972a59..975d902157310 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -77,7 +77,7 @@ use sp_core::H256; use sp_runtime::{ codec::{Decode, Encode}, generic::{BlockId, OpaqueDigestItemId}, - traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}, + traits::{Block as BlockT, Header as HeaderT, NumberFor}, Justification, Justifications, }; use substrate_test_runtime_client::AccountKeyring; @@ -869,7 +869,7 @@ where .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (chain_sync, chain_sync_service) = ChainSync::new( + let (chain_sync, chain_sync_service, block_announce_config) = ChainSync::new( match network_config.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, SyncMode::Fast { skip_proofs, storage_chain_mode } => @@ -880,24 +880,18 @@ where SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, }, client.clone(), + protocol_id.clone(), + &fork_id, + Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), block_announce_validator, network_config.max_parallel_downloads, Some(warp_sync), chain_sync_network_handle, + block_request_protocol_config.name.clone(), + state_request_protocol_config.name.clone(), + Some(warp_protocol_config.name.clone()), ) .unwrap(); - let block_announce_config = chain_sync.get_block_announce_proto_config( - protocol_id.clone(), - &fork_id, - Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), - client.info().best_number, - client.info().best_hash, - client - .block_hash(Zero::zero()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - ); let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, @@ -911,11 +905,13 @@ where chain_sync_service, metrics_registry: None, block_announce_config, - block_request_protocol_config, - state_request_protocol_config, - light_client_request_protocol_config, - warp_sync_protocol_config: Some(warp_protocol_config), - request_response_protocol_configs: Vec::new(), + request_response_protocol_configs: [ + block_request_protocol_config, + state_request_protocol_config, + light_client_request_protocol_config, + warp_protocol_config, + ] + .to_vec(), }) .unwrap(); @@ -994,6 +990,7 @@ where return Poll::Pending } } + Poll::Ready(()) } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 3cb064ec814c5..63d60fb06f471 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -846,7 +846,7 @@ where }; let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (chain_sync, chain_sync_service) = ChainSync::new( + let (chain_sync, chain_sync_service, block_announce_config) = ChainSync::new( match config.network.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, SyncMode::Fast { skip_proofs, storage_chain_mode } => @@ -854,25 +854,18 @@ where SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, }, client.clone(), + protocol_id.clone(), + &config.chain_spec.fork_id().map(ToOwned::to_owned), + Roles::from(&config.role), block_announce_validator, config.network.max_parallel_downloads, warp_sync_provider, chain_sync_network_handle, + block_request_protocol_config.name.clone(), + state_request_protocol_config.name.clone(), + warp_sync_protocol_config.as_ref().map(|config| config.name.clone()), )?; - let block_announce_config = chain_sync.get_block_announce_proto_config( - protocol_id.clone(), - &config.chain_spec.fork_id().map(ToOwned::to_owned), - Roles::from(&config.role.clone()), - client.info().best_number, - client.info().best_hash, - client - .block_hash(Zero::zero()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - ); - request_response_protocol_configs.push(config.network.ipfs_server.then(|| { let (handler, protocol_config) = BitswapRequestHandler::new(client.clone()); spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler.run()); @@ -896,12 +889,14 @@ where chain_sync_service, metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, - block_request_protocol_config, - state_request_protocol_config, - warp_sync_protocol_config, - light_client_request_protocol_config, request_response_protocol_configs: request_response_protocol_configs .into_iter() + .chain([ + Some(block_request_protocol_config), + Some(state_request_protocol_config), + Some(light_client_request_protocol_config), + warp_sync_protocol_config, + ]) .flatten() .collect::>(), };