diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 9813b2551fe..96636411137 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -323,6 +323,10 @@ impl SyncHandler { Ok(()) } + fn on_peer_confirmed(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { + sync.sync_peer(io, peer_id, false); + } + fn on_peer_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { { let peer = sync.peers.get_mut(&peer_id).expect("Is only called when peer is present in peers"); @@ -349,7 +353,7 @@ impl SyncHandler { io.chain_overlay().write().insert(fork_number, header.to_vec()); } } - sync.sync_peer(io, peer_id, false); + SyncHandler::on_peer_confirmed(sync, io, peer_id); return Ok(()); } @@ -659,7 +663,7 @@ impl SyncHandler { if let Some((fork_block, _)) = sync.fork_block { SyncRequester::request_fork_header(sync, io, peer_id, fork_block); } else { - sync.sync_peer(io, peer_id, false); + SyncHandler::on_peer_confirmed(sync, io, peer_id); } Ok(()) } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index c680516f660..19b49246f76 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -416,11 +416,7 @@ impl ChainSync { pub fn new(config: SyncConfig, chain: &BlockChainClient, private_tx_handler: Arc) -> ChainSync { let chain_info = chain.chain_info(); let best_block = chain.chain_info().best_block_number; - let state = match config.warp_sync { - WarpSync::Enabled => SyncState::WaitingPeers, - WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers, - _ => SyncState::Idle, - }; + let state = ChainSync::get_init_state(config.warp_sync, chain); let mut sync = ChainSync { state, @@ -445,6 +441,15 @@ impl ChainSync { sync } + fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState { + let best_block = chain.chain_info().best_block_number; + match warp_sync { + WarpSync::Enabled => SyncState::WaitingPeers, + WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers, + _ => SyncState::Idle, + } + } + /// Returns synchonization status pub fn status(&self) -> SyncStatus { let last_imported_number = self.new_blocks.last_imported_block_number(); @@ -511,7 +516,7 @@ impl ChainSync { } } } - self.state = SyncState::Idle; + self.state = ChainSync::get_init_state(self.warp_sync, io.chain()); // Reactivate peers only if some progress has been made // since the last sync round of if starting fresh. self.active_peers = self.peers.keys().cloned().collect(); @@ -646,23 +651,24 @@ impl ChainSync { /// Resume downloading fn continue_sync(&mut self, io: &mut SyncIo) { - let mut peers: Vec<(PeerId, U256, u8)> = self.peers.iter().filter_map(|(k, p)| - if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero), p.protocol_version)) } else { None }).collect(); + // Collect active peers that can sync + let mut peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)| + if peer.can_sync() && self.active_peers.contains(peer_id) { + Some((*peer_id, peer.protocol_version)) + } else { + None + } + ).collect(); random::new().shuffle(&mut peers); //TODO: sort by rating // prefer peers with higher protocol version - peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2)); + peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); trace!(target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), peers.len(), self.peers.len()); - for (p, _, _) in peers { - if self.active_peers.contains(&p) { - self.sync_peer(io, p, false); - } + for (peer_id, _) in peers { + self.sync_peer(io, peer_id, false); } if - self.state != SyncState::WaitingPeers && - self.state != SyncState::SnapshotWaiting && - self.state != SyncState::Waiting && - self.state != SyncState::Idle && + (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) { self.complete_sync(io); @@ -673,7 +679,6 @@ impl ChainSync { fn complete_sync(&mut self, io: &mut SyncIo) { trace!(target: "sync", "Sync complete"); self.reset(io); - self.state = SyncState::Idle; } /// Enter waiting state @@ -901,7 +906,7 @@ impl ChainSync { } fn check_resume(&mut self, io: &mut SyncIo) { - if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() && self.state == SyncState::Waiting { + if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() { self.state = SyncState::Blocks; self.continue_sync(io); } else if self.state == SyncState::SnapshotWaiting { @@ -909,7 +914,6 @@ impl ChainSync { RestorationStatus::Inactive => { trace!(target:"sync", "Snapshot restoration is complete"); self.restart(io); - self.continue_sync(io); }, RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => { if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {