Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Optimize collecting pending block requests (#5829)
Browse files Browse the repository at this point in the history
* Optimized collecting pending block requests

* Make sure request iterator is consumed
  • Loading branch information
arkpar authored Apr 30, 2020
1 parent 268450a commit b354529
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 25 deletions.
9 changes: 6 additions & 3 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1965,7 +1965,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
target: id,
request: r,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
self.pending_messages.push_back(event);
} else {
send_request(
&mut self.behaviour,
Expand All @@ -1982,7 +1982,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
target: id,
request: r,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
self.pending_messages.push_back(event);
} else {
send_request(
&mut self.behaviour,
Expand All @@ -2000,7 +2000,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
block_hash: r.block,
request: r.request,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
self.pending_messages.push_back(event);
} else {
send_request(
&mut self.behaviour,
Expand All @@ -2010,6 +2010,9 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
GenericMessage::FinalityProofRequest(r))
}
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
}

let event = match self.behaviour.poll(cx, params) {
Poll::Pending => return Poll::Pending,
Expand Down
88 changes: 66 additions & 22 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,50 @@ mod rep {
pub const UNKNOWN_ANCESTOR:Rep = Rep::new(-(1 << 16), "DB Error");
}

enum PendingRequests {
Some(HashSet<PeerId>),
All,
}

impl PendingRequests {
fn add(&mut self, id: &PeerId) {
match self {
PendingRequests::Some(set) => {
set.insert(id.clone());
}
PendingRequests::All => {},
}
}

fn take(&mut self) -> PendingRequests {
std::mem::replace(self, Default::default())
}

fn set_all(&mut self) {
*self = PendingRequests::All;
}

fn contains(&self, id: &PeerId) -> bool {
match self {
PendingRequests::Some(set) => set.contains(id),
PendingRequests::All => true,
}
}

fn is_empty(&self) -> bool {
match self {
PendingRequests::Some(set) => set.is_empty(),
PendingRequests::All => false,
}
}
}

impl Default for PendingRequests {
fn default() -> Self {
PendingRequests::Some(HashSet::default())
}
}

/// The main data structure which contains all the state for a chains
/// active syncing strategy.
pub struct ChainSync<B: BlockT> {
Expand Down Expand Up @@ -138,8 +182,8 @@ pub struct ChainSync<B: BlockT> {
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
/// Fork sync targets.
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
/// A flag that caches idle state with no pending requests.
is_idle: bool,
/// A set of peers for which there might be potential block requests
pending_requests: PendingRequests,
/// A type to check incoming block announcements.
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
Expand Down Expand Up @@ -327,7 +371,7 @@ impl<B: BlockT> ChainSync<B> {
queue_blocks: Default::default(),
request_builder,
fork_targets: Default::default(),
is_idle: false,
pending_requests: Default::default(),
block_announce_validator,
max_parallel_downloads,
processed_blocks: 0,
Expand Down Expand Up @@ -426,7 +470,7 @@ impl<B: BlockT> ChainSync<B> {
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
self.is_idle = false;
self.pending_requests.add(&who);
return Ok(None)
}

Expand All @@ -438,6 +482,7 @@ impl<B: BlockT> ChainSync<B> {
best_number
);

self.pending_requests.add(&who);
self.peers.insert(who, PeerSync {
common_number: Zero::zero(),
best_hash,
Expand All @@ -449,7 +494,6 @@ impl<B: BlockT> ChainSync<B> {
},
recently_announced: Default::default()
});
self.is_idle = false;

Ok(Some(ancestry_request::<B>(common_best)))
}
Expand All @@ -462,7 +506,7 @@ impl<B: BlockT> ChainSync<B> {
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
self.is_idle = false;
self.pending_requests.add(&who);
Ok(None)
}
}
Expand Down Expand Up @@ -516,7 +560,6 @@ impl<B: BlockT> ChainSync<B> {
}

trace!(target: "sync", "Downloading requested old fork {:?}", hash);
self.is_idle = false;
for peer_id in &peers {
if let Some(peer) = self.peers.get_mut(peer_id) {
if let PeerSyncState::AncestorSearch {..} = peer.state {
Expand All @@ -527,6 +570,7 @@ impl<B: BlockT> ChainSync<B> {
peer.best_number = number;
peer.best_hash = hash.clone();
}
self.pending_requests.add(peer_id);
}
}

Expand Down Expand Up @@ -590,7 +634,7 @@ impl<B: BlockT> ChainSync<B> {

/// Get an iterator over all block requests of all peers.
pub fn block_requests(&mut self) -> impl Iterator<Item = (PeerId, BlockRequest<B>)> + '_ {
if self.is_idle {
if self.pending_requests.is_empty() {
return Either::Left(std::iter::empty())
}
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
Expand All @@ -606,10 +650,13 @@ impl<B: BlockT> ChainSync<B> {
let best_queued = self.best_queued_number;
let client = &self.client;
let queue = &self.queue_blocks;
let pending_requests = self.pending_requests.take();
let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads };
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
if !peer.state.is_available() {
trace!(target: "sync", "Peer {} is busy", id);
return None
}
if !pending_requests.contains(id) {
return None
}
if let Some((range, req)) = peer_block_request(
Expand Down Expand Up @@ -652,9 +699,6 @@ impl<B: BlockT> ChainSync<B> {
None
}
});
if !have_requests {
self.is_idle = true;
}
Either::Right(iter)
}

Expand All @@ -675,7 +719,7 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Reversing incoming block list");
blocks.reverse()
}
self.is_idle = false;
self.pending_requests.add(&who);
if request.is_some() {
match &mut peer.state {
PeerSyncState::DownloadingNew(start_block) => {
Expand Down Expand Up @@ -859,7 +903,7 @@ impl<B: BlockT> ChainSync<B> {
return Ok(OnBlockJustification::Nothing)
};

self.is_idle = false;
self.pending_requests.add(&who);
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;

Expand Down Expand Up @@ -906,7 +950,7 @@ impl<B: BlockT> ChainSync<B> {
return Ok(OnBlockFinalityProof::Nothing)
};

self.is_idle = false;
self.pending_requests.add(&who);
if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state {
peer.state = PeerSyncState::Available;

Expand Down Expand Up @@ -1029,7 +1073,7 @@ impl<B: BlockT> ChainSync<B> {
};
}

self.is_idle = false;
self.pending_requests.set_all();
output.into_iter()
}

Expand All @@ -1038,12 +1082,12 @@ impl<B: BlockT> ChainSync<B> {
pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
self.extra_justifications.try_finalize_root((hash, number), finalization_result, true);
self.is_idle = false;
self.pending_requests.set_all();
}

pub fn on_finality_proof_import(&mut self, req: (B::Hash, NumberFor<B>), res: Result<(B::Hash, NumberFor<B>), ()>) {
self.extra_finality_proofs.try_finalize_root(req, res, true);
self.is_idle = false;
self.pending_requests.set_all();
}

/// Notify about finalization of the given block.
Expand Down Expand Up @@ -1101,7 +1145,7 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = new_common_number;
}
}
self.is_idle = false;
self.pending_requests.set_all();
}

/// Call when a node announces a new block.
Expand Down Expand Up @@ -1154,7 +1198,7 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = number - One::one();
}
}
self.is_idle = false;
self.pending_requests.add(&who);

// known block case
if known || self.is_already_downloading(&hash) {
Expand Down Expand Up @@ -1214,7 +1258,7 @@ impl<B: BlockT> ChainSync<B> {
self.peers.remove(&who);
self.extra_justifications.peer_disconnected(&who);
self.extra_finality_proofs.peer_disconnected(&who);
self.is_idle = false;
self.pending_requests.set_all();
}

/// Restart the sync process.
Expand All @@ -1224,7 +1268,7 @@ impl<B: BlockT> ChainSync<B> {
let info = self.client.info();
self.best_queued_hash = info.best_hash;
self.best_queued_number = std::cmp::max(info.best_number, self.best_imported_number);
self.is_idle = false;
self.pending_requests.set_all();
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::replace(&mut self.peers, HashMap::new());
old_peers.into_iter().filter_map(move |(id, p)| {
Expand Down

0 comments on commit b354529

Please sign in to comment.