-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Sync optimization #1385
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,6 +100,7 @@ use io::SyncIo; | |
use time; | ||
use super::SyncConfig; | ||
use blocks::BlockCollection; | ||
use rand::{thread_rng, Rng}; | ||
|
||
known_heap_size!(0, PeerInfo); | ||
|
||
|
@@ -308,7 +309,6 @@ impl ChainSync { | |
} | ||
self.syncing_difficulty = From::from(0u64); | ||
self.state = SyncState::Idle; | ||
self.blocks.clear(); | ||
self.active_peers = self.peers.keys().cloned().collect(); | ||
} | ||
|
||
|
@@ -393,7 +393,7 @@ impl ChainSync { | |
self.clear_peer_download(peer_id); | ||
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash); | ||
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders }; | ||
if !self.reset_peer_asking(peer_id, expected_asking) { | ||
if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There might be still a request pending from a previous sync round. Ignore such requests. expected_hash is cleared in |
||
trace!(target: "sync", "Ignored unexpected headers"); | ||
self.continue_sync(io); | ||
return Ok(()); | ||
|
@@ -533,57 +533,52 @@ impl ChainSync { | |
let header_rlp = try!(block_rlp.at(0)); | ||
let h = header_rlp.as_raw().sha3(); | ||
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); | ||
if self.state != SyncState::Idle { | ||
trace!(target: "sync", "NewBlock ignored while seeking"); | ||
return Ok(()); | ||
} | ||
let header: BlockHeader = try!(header_rlp.as_val()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. try to import NewBlock even when downloading. Reduces syncing delays |
||
let mut unknown = false; | ||
{ | ||
let peer = self.peers.get_mut(&peer_id).unwrap(); | ||
peer.latest_hash = header.hash(); | ||
peer.latest_number = Some(header.number()); | ||
} | ||
if header.number <= self.last_imported_block + 1 { | ||
match io.chain().import_block(block_rlp.as_raw().to_vec()) { | ||
Err(Error::Import(ImportError::AlreadyInChain)) => { | ||
trace!(target: "sync", "New block already in chain {:?}", h); | ||
}, | ||
Err(Error::Import(ImportError::AlreadyQueued)) => { | ||
trace!(target: "sync", "New block already queued {:?}", h); | ||
}, | ||
Ok(_) => { | ||
if header.number == self.last_imported_block + 1 { | ||
self.last_imported_block = header.number; | ||
self.last_imported_hash = header.hash(); | ||
} | ||
trace!(target: "sync", "New block queued {:?} ({})", h, header.number); | ||
}, | ||
Err(Error::Block(BlockError::UnknownParent(p))) => { | ||
unknown = true; | ||
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h); | ||
}, | ||
Err(e) => { | ||
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); | ||
io.disable_peer(peer_id); | ||
match io.chain().import_block(block_rlp.as_raw().to_vec()) { | ||
Err(Error::Import(ImportError::AlreadyInChain)) => { | ||
trace!(target: "sync", "New block already in chain {:?}", h); | ||
}, | ||
Err(Error::Import(ImportError::AlreadyQueued)) => { | ||
trace!(target: "sync", "New block already queued {:?}", h); | ||
}, | ||
Ok(_) => { | ||
if header.number == self.last_imported_block + 1 { | ||
self.last_imported_block = header.number; | ||
self.last_imported_hash = header.hash(); | ||
} | ||
}; | ||
} | ||
else { | ||
unknown = true; | ||
} | ||
trace!(target: "sync", "New block queued {:?} ({})", h, header.number); | ||
}, | ||
Err(Error::Block(BlockError::UnknownParent(p))) => { | ||
unknown = true; | ||
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h); | ||
}, | ||
Err(e) => { | ||
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); | ||
io.disable_peer(peer_id); | ||
} | ||
}; | ||
if unknown { | ||
trace!(target: "sync", "New unknown block {:?}", h); | ||
//TODO: handle too many unknown blocks | ||
let difficulty: U256 = try!(r.val_at(1)); | ||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { | ||
if peer.difficulty.map_or(true, |pd| difficulty > pd) { | ||
//self.state = SyncState::ChainHead; | ||
peer.difficulty = Some(difficulty); | ||
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); | ||
if self.state != SyncState::Idle { | ||
trace!(target: "sync", "NewBlock ignored while seeking"); | ||
} else { | ||
trace!(target: "sync", "New unknown block {:?}", h); | ||
//TODO: handle too many unknown blocks | ||
let difficulty: U256 = try!(r.val_at(1)); | ||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { | ||
if peer.difficulty.map_or(true, |pd| difficulty > pd) { | ||
//self.state = SyncState::ChainHead; | ||
peer.difficulty = Some(difficulty); | ||
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); | ||
} | ||
} | ||
self.sync_peer(io, peer_id, true); | ||
} | ||
self.sync_peer(io, peer_id, true); | ||
} | ||
Ok(()) | ||
} | ||
|
@@ -661,7 +656,7 @@ impl ChainSync { | |
/// Resume downloading | ||
fn continue_sync(&mut self, io: &mut SyncIo) { | ||
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect(); | ||
peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating | ||
thread_rng().shuffle(&mut peers); //TODO: sort by rating | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This list is used to select helping peers. Random peer selection here to protect from high advertised TD attack, until there is a proper rating system. |
||
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); | ||
for (p, _) in peers { | ||
if self.active_peers.contains(&p) { | ||
|
@@ -687,7 +682,11 @@ impl ChainSync { | |
} | ||
|
||
/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. | ||
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { | ||
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { | ||
if !self.active_peers.contains(&peer_id) { | ||
trace!(target: "sync", "Skipping deactivated peer"); | ||
return; | ||
} | ||
let (peer_latest, peer_difficulty) = { | ||
let peer = self.peers.get_mut(&peer_id).unwrap(); | ||
if peer.asking != PeerAsking::Nothing { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -191,7 +191,7 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta | |
sessions: Arc<RwLock<Slab<SharedSession>>>, | ||
session: Option<SharedSession>, | ||
session_id: Option<StreamToken>, | ||
reserved_peers: &'s HashSet<NodeId>, | ||
_reserved_peers: &'s HashSet<NodeId>, | ||
} | ||
|
||
impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { | ||
|
@@ -207,7 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone | |
session_id: id, | ||
session: session, | ||
sessions: sessions, | ||
reserved_peers: reserved_peers, | ||
_reserved_peers: reserved_peers, | ||
} | ||
} | ||
|
||
|
@@ -837,9 +837,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |
let mut s = session.lock().unwrap(); | ||
if !s.expired() { | ||
if s.is_ready() { | ||
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be decremented once per session. Works now cause we have only one handler, but might break future code. |
||
for (p, _) in self.handlers.read().unwrap().iter() { | ||
if s.have_capability(p) { | ||
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); | ||
to_disconnect.push(p); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed a bug here: If a header is inserted with a number immediately following the chain head it would get updated incorrectly, which led to same blocks requested twice.