Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Sync optimization #1385

Merged
merged 5 commits into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion sync/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ impl BlockCollection {
let old_subchains: HashSet<_> = { self.heads.iter().cloned().collect() };
for s in self.heads.drain(..) {
let mut h = s.clone();
if !self.blocks.contains_key(&h) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a bug here: If a header is inserted with a number immediately following the chain head it would get updated incorrectly, which led to same blocks requested twice.

new_heads.push(h);
continue;
}
loop {
match self.parents.get(&h) {
Some(next) => {
Expand Down Expand Up @@ -394,7 +398,7 @@ mod test {
assert_eq!(&bc.drain()[..], &blocks[6..16]);
assert_eq!(hashes[15], bc.heads[0]);

bc.insert_headers(headers[16..].to_vec());
bc.insert_headers(headers[15..].to_vec());
bc.drain();
assert!(bc.is_empty());
}
Expand All @@ -420,5 +424,24 @@ mod test {
assert!(bc.head.is_some());
assert_eq!(hashes[21], bc.heads[0]);
}

#[test]
fn insert_headers_no_gap() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
let nblocks = 200;
client.add_blocks(nblocks, EachBlockWith::Nothing);
let blocks: Vec<_> = (0 .. nblocks).map(|i| (&client as &BlockChainClient).block(BlockID::Number(i as BlockNumber)).unwrap()).collect();
let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect();
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect();
bc.reset_to(heads);

bc.insert_headers(headers[1..2].to_vec());
assert!(bc.drain().is_empty());
bc.insert_headers(headers[0..1].to_vec());
assert_eq!(bc.drain().len(), 2);
}
}

87 changes: 43 additions & 44 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ use io::SyncIo;
use time;
use super::SyncConfig;
use blocks::BlockCollection;
use rand::{thread_rng, Rng};

known_heap_size!(0, PeerInfo);

Expand Down Expand Up @@ -308,7 +309,6 @@ impl ChainSync {
}
self.syncing_difficulty = From::from(0u64);
self.state = SyncState::Idle;
self.blocks.clear();
self.active_peers = self.peers.keys().cloned().collect();
}

Expand Down Expand Up @@ -393,7 +393,7 @@ impl ChainSync {
self.clear_peer_download(peer_id);
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
if !self.reset_peer_asking(peer_id, expected_asking) {
if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be still a request pending from a previous sync round. Ignore such requests. expected_hash is cleared in restart.

trace!(target: "sync", "Ignored unexpected headers");
self.continue_sync(io);
return Ok(());
Expand Down Expand Up @@ -533,57 +533,52 @@ impl ChainSync {
let header_rlp = try!(block_rlp.at(0));
let h = header_rlp.as_raw().sha3();
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
if self.state != SyncState::Idle {
trace!(target: "sync", "NewBlock ignored while seeking");
return Ok(());
}
let header: BlockHeader = try!(header_rlp.as_val());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to import NewBlock even when downloading. Reduces syncing delays

let mut unknown = false;
{
let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest_hash = header.hash();
peer.latest_number = Some(header.number());
}
if header.number <= self.last_imported_block + 1 {
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(Error::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "New block already in chain {:?}", h);
},
Err(Error::Import(ImportError::AlreadyQueued)) => {
trace!(target: "sync", "New block already queued {:?}", h);
},
Ok(_) => {
if header.number == self.last_imported_block + 1 {
self.last_imported_block = header.number;
self.last_imported_hash = header.hash();
}
trace!(target: "sync", "New block queued {:?} ({})", h, header.number);
},
Err(Error::Block(BlockError::UnknownParent(p))) => {
unknown = true;
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h);
},
Err(e) => {
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
io.disable_peer(peer_id);
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(Error::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "New block already in chain {:?}", h);
},
Err(Error::Import(ImportError::AlreadyQueued)) => {
trace!(target: "sync", "New block already queued {:?}", h);
},
Ok(_) => {
if header.number == self.last_imported_block + 1 {
self.last_imported_block = header.number;
self.last_imported_hash = header.hash();
}
};
}
else {
unknown = true;
}
trace!(target: "sync", "New block queued {:?} ({})", h, header.number);
},
Err(Error::Block(BlockError::UnknownParent(p))) => {
unknown = true;
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h);
},
Err(e) => {
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
io.disable_peer(peer_id);
}
};
if unknown {
trace!(target: "sync", "New unknown block {:?}", h);
//TODO: handle too many unknown blocks
let difficulty: U256 = try!(r.val_at(1));
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
//self.state = SyncState::ChainHead;
peer.difficulty = Some(difficulty);
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
if self.state != SyncState::Idle {
trace!(target: "sync", "NewBlock ignored while seeking");
} else {
trace!(target: "sync", "New unknown block {:?}", h);
//TODO: handle too many unknown blocks
let difficulty: U256 = try!(r.val_at(1));
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
//self.state = SyncState::ChainHead;
peer.difficulty = Some(difficulty);
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
}
}
self.sync_peer(io, peer_id, true);
}
self.sync_peer(io, peer_id, true);
}
Ok(())
}
Expand Down Expand Up @@ -661,7 +656,7 @@ impl ChainSync {
/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating
thread_rng().shuffle(&mut peers); //TODO: sort by rating
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list is used to select helping peers. Random peer selection here to protect from high advertised TD attack, until there is a proper rating system.

trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
for (p, _) in peers {
if self.active_peers.contains(&p) {
Expand All @@ -687,7 +682,11 @@ impl ChainSync {
}

/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task.
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
if !self.active_peers.contains(&peer_id) {
trace!(target: "sync", "Skipping deactivated peer");
return;
}
let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing {
Expand Down
6 changes: 3 additions & 3 deletions util/src/network/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta
sessions: Arc<RwLock<Slab<SharedSession>>>,
session: Option<SharedSession>,
session_id: Option<StreamToken>,
reserved_peers: &'s HashSet<NodeId>,
_reserved_peers: &'s HashSet<NodeId>,
}

impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, {
Expand All @@ -207,7 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
session_id: id,
session: session,
sessions: sessions,
reserved_peers: reserved_peers,
_reserved_peers: reserved_peers,
}
}

Expand Down Expand Up @@ -837,9 +837,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut s = session.lock().unwrap();
if !s.expired() {
if s.is_ready() {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be decremented once per session. Works now cause we have only one handler, but might break future code.

for (p, _) in self.handlers.read().unwrap().iter() {
if s.have_capability(p) {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
to_disconnect.push(p);
}
}
Expand Down