From 597f0f0d1434fc29f725190fea95dcc1efdab8be Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 21 Jun 2016 17:41:09 +0200 Subject: [PATCH 1/5] Minor sync fixes --- sync/src/chain.rs | 2 -- util/src/network/host.rs | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 01640ec4d57..5c1b6709b9b 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -308,7 +308,6 @@ impl ChainSync { } self.syncing_difficulty = From::from(0u64); self.state = SyncState::Idle; - self.blocks.clear(); self.active_peers = self.peers.keys().cloned().collect(); } @@ -316,7 +315,6 @@ impl ChainSync { pub fn restart(&mut self, io: &mut SyncIo) { trace!(target: "sync", "Restarting"); self.reset(); - self.start_sync_round(io); self.continue_sync(io); } diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 03ba07544a6..1318b94905e 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -191,7 +191,7 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta sessions: Arc>>, session: Option, session_id: Option, - reserved_peers: &'s HashSet, + _reserved_peers: &'s HashSet, } impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { @@ -207,7 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone session_id: id, session: session, sessions: sessions, - reserved_peers: reserved_peers, + _reserved_peers: reserved_peers, } } From 5df10c2f1a069c5531f3e539a3d4e359b7798c32 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 21 Jun 2016 17:47:27 +0200 Subject: [PATCH 2/5] Fixed session count sub --- util/src/network/host.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 1318b94905e..7de581b4d40 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -837,9 +837,9 @@ impl Host where Message: Send + Sync + Clone { let mut s = session.lock().unwrap(); if !s.expired() { if s.is_ready() { + self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); for (p, _) in self.handlers.read().unwrap().iter() { if s.have_capability(p) { - self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); to_disconnect.push(p); } } From e0468cf91f8621f44ad98f6bed965fab355401fa Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 21 Jun 2016 18:45:05 +0200 Subject: [PATCH 3/5] handle NewBlock when downloading --- sync/src/chain.rs | 46 ++++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 5c1b6709b9b..335fa8ea4c9 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -531,10 +531,6 @@ impl ChainSync { let header_rlp = try!(block_rlp.at(0)); let h = header_rlp.as_raw().sha3(); trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); - if self.state != SyncState::Idle { - trace!(target: "sync", "NewBlock ignored while seeking"); - return Ok(()); - } let header: BlockHeader = try!(header_rlp.as_val()); let mut unknown = false; { @@ -542,35 +538,41 @@ impl ChainSync { peer.latest_hash = header.hash(); peer.latest_number = Some(header.number()); } - if header.number <= self.last_imported_block + 1 { + if header.parent_hash() == &self.last_imported_hash { match io.chain().import_block(block_rlp.as_raw().to_vec()) { - Err(Error::Import(ImportError::AlreadyInChain)) => { - trace!(target: "sync", "New block already in chain {:?}", h); - }, - Err(Error::Import(ImportError::AlreadyQueued)) => { - trace!(target: "sync", "New block already queued {:?}", h); - }, Ok(_) => { - if header.number == self.last_imported_block + 1 { - self.last_imported_block = header.number; - self.last_imported_hash = header.hash(); - } + self.last_imported_block = header.number; + self.last_imported_hash = header.hash(); trace!(target: "sync", "New block queued {:?} ({})", h, header.number); }, - Err(Error::Block(BlockError::UnknownParent(p))) => { - unknown = true; - trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h); - }, Err(e) => { debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); io.disable_peer(peer_id); } }; - } - else { - unknown = true; + } else { + match io.chain().block_status(BlockID::Hash(h.clone())) { + BlockStatus::InChain => { + trace!(target: "sync", "New block already in chain {:?}", h); + }, + BlockStatus::Queued => { + trace!(target: "sync", "New block already queued {:?}", h); + }, + BlockStatus::Unknown => { + unknown = true; + }, + BlockStatus::Bad => { + debug!(target: "sync", "Bad new block {:?}", h); + io.disable_peer(peer_id); + return Ok(()); + } + } } if unknown { + if self.state != SyncState::Idle { + trace!(target: "sync", "Ignored unkown NewBlock while downloading"); + return Ok(()); + } trace!(target: "sync", "New unknown block {:?}", h); //TODO: handle too many unknown blocks let difficulty: U256 = try!(r.val_at(1)); From bfc543e8df913e1368bef465f6d3b204069c466c Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 21 Jun 2016 22:08:00 +0200 Subject: [PATCH 4/5] Accept new blocks right away --- sync/src/chain.rs | 49 ++++++++++++++++++----------------------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 335fa8ea4c9..6ca35cd8f7a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -538,41 +538,30 @@ impl ChainSync { peer.latest_hash = header.hash(); peer.latest_number = Some(header.number()); } - if header.parent_hash() == &self.last_imported_hash { - match io.chain().import_block(block_rlp.as_raw().to_vec()) { - Ok(_) => { + match io.chain().import_block(block_rlp.as_raw().to_vec()) { + Err(Error::Import(ImportError::AlreadyInChain)) => { + trace!(target: "sync", "New block already in chain {:?}", h); + }, + Err(Error::Import(ImportError::AlreadyQueued)) => { + trace!(target: "sync", "New block already queued {:?}", h); + }, + Ok(_) => { + if header.number == self.last_imported_block + 1 { self.last_imported_block = header.number; self.last_imported_hash = header.hash(); - trace!(target: "sync", "New block queued {:?} ({})", h, header.number); - }, - Err(e) => { - debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); - io.disable_peer(peer_id); - } - }; - } else { - match io.chain().block_status(BlockID::Hash(h.clone())) { - BlockStatus::InChain => { - trace!(target: "sync", "New block already in chain {:?}", h); - }, - BlockStatus::Queued => { - trace!(target: "sync", "New block already queued {:?}", h); - }, - BlockStatus::Unknown => { - unknown = true; - }, - BlockStatus::Bad => { - debug!(target: "sync", "Bad new block {:?}", h); - io.disable_peer(peer_id); - return Ok(()); } + trace!(target: "sync", "New block queued {:?} ({})", h, header.number); + }, + Err(Error::Block(BlockError::UnknownParent(p))) => { + unknown = true; + trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h); + }, + Err(e) => { + debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); + io.disable_peer(peer_id); } - } + }; if unknown { - if self.state != SyncState::Idle { - trace!(target: "sync", "Ignored unkown NewBlock while downloading"); - return Ok(()); - } trace!(target: "sync", "New unknown block {:?}", h); //TODO: handle too many unknown blocks let difficulty: U256 = try!(r.val_at(1)); From 45fa26ffacaf312bc4deee89fa4319a8b15683d4 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 22 Jun 2016 10:10:38 +0200 Subject: [PATCH 5/5] block collection update fixed --- sync/src/blocks.rs | 25 ++++++++++++++++++++++++- sync/src/chain.rs | 34 ++++++++++++++++++++++------------ 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/sync/src/blocks.rs b/sync/src/blocks.rs index acc6703d500..b48085d4307 100644 --- a/sync/src/blocks.rs +++ b/sync/src/blocks.rs @@ -295,6 +295,10 @@ impl BlockCollection { let old_subchains: HashSet<_> = { self.heads.iter().cloned().collect() }; for s in self.heads.drain(..) { let mut h = s.clone(); + if !self.blocks.contains_key(&h) { + new_heads.push(h); + continue; + } loop { match self.parents.get(&h) { Some(next) => { @@ -394,7 +398,7 @@ mod test { assert_eq!(&bc.drain()[..], &blocks[6..16]); assert_eq!(hashes[15], bc.heads[0]); - bc.insert_headers(headers[16..].to_vec()); + bc.insert_headers(headers[15..].to_vec()); bc.drain(); assert!(bc.is_empty()); } @@ -420,5 +424,24 @@ mod test { assert!(bc.head.is_some()); assert_eq!(hashes[21], bc.heads[0]); } + + #[test] + fn insert_headers_no_gap() { + let mut bc = BlockCollection::new(); + assert!(is_empty(&bc)); + let client = TestBlockChainClient::new(); + let nblocks = 200; + client.add_blocks(nblocks, EachBlockWith::Nothing); + let blocks: Vec<_> = (0 .. nblocks).map(|i| (&client as &BlockChainClient).block(BlockID::Number(i as BlockNumber)).unwrap()).collect(); + let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect(); + let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect(); + let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect(); + bc.reset_to(heads); + + bc.insert_headers(headers[1..2].to_vec()); + assert!(bc.drain().is_empty()); + bc.insert_headers(headers[0..1].to_vec()); + assert_eq!(bc.drain().len(), 2); + } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6ca35cd8f7a..55e4e93b2f5 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -100,6 +100,7 @@ use io::SyncIo; use time; use super::SyncConfig; use blocks::BlockCollection; +use rand::{thread_rng, Rng}; known_heap_size!(0, PeerInfo); @@ -315,6 +316,7 @@ impl ChainSync { pub fn restart(&mut self, io: &mut SyncIo) { trace!(target: "sync", "Restarting"); self.reset(); + self.start_sync_round(io); self.continue_sync(io); } @@ -391,7 +393,7 @@ impl ChainSync { self.clear_peer_download(peer_id); let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash); let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders }; - if !self.reset_peer_asking(peer_id, expected_asking) { + if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() { trace!(target: "sync", "Ignored unexpected headers"); self.continue_sync(io); return Ok(()); @@ -562,17 +564,21 @@ impl ChainSync { } }; if unknown { - trace!(target: "sync", "New unknown block {:?}", h); - //TODO: handle too many unknown blocks - let difficulty: U256 = try!(r.val_at(1)); - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - if peer.difficulty.map_or(true, |pd| difficulty > pd) { - //self.state = SyncState::ChainHead; - peer.difficulty = Some(difficulty); - trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + if self.state != SyncState::Idle { + trace!(target: "sync", "NewBlock ignored while seeking"); + } else { + trace!(target: "sync", "New unknown block {:?}", h); + //TODO: handle too many unknown blocks + let difficulty: U256 = try!(r.val_at(1)); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if peer.difficulty.map_or(true, |pd| difficulty > pd) { + //self.state = SyncState::ChainHead; + peer.difficulty = Some(difficulty); + trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + } } + self.sync_peer(io, peer_id, true); } - self.sync_peer(io, peer_id, true); } Ok(()) } @@ -650,7 +656,7 @@ impl ChainSync { /// Resume downloading fn continue_sync(&mut self, io: &mut SyncIo) { let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect(); - peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating + thread_rng().shuffle(&mut peers); //TODO: sort by rating trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); for (p, _) in peers { if self.active_peers.contains(&p) { @@ -676,7 +682,11 @@ impl ChainSync { } /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. - fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { + fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { + if !self.active_peers.contains(&peer_id) { + trace!(target: "sync", "Skipping deactivated peer"); + return; + } let (peer_latest, peer_difficulty) = { let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != PeerAsking::Nothing {