-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Sync fixes and tweaks #1164
Changes from all commits
f85e409
d1fc5a5
0e905a0
1e8bf8c
7f3ba85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,11 +113,12 @@ const MAX_HEADERS_TO_SEND: usize = 512; | |
const MAX_NODE_DATA_TO_SEND: usize = 1024; | ||
const MAX_RECEIPTS_TO_SEND: usize = 1024; | ||
const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; | ||
const MAX_HEADERS_TO_REQUEST: usize = 256; | ||
const MAX_HEADERS_TO_REQUEST: usize = 128; | ||
const MAX_BODIES_TO_REQUEST: usize = 64; | ||
const MIN_PEERS_PROPAGATION: usize = 4; | ||
const MAX_PEERS_PROPAGATION: usize = 128; | ||
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; | ||
const SUBCHAIN_SIZE: usize = 64; | ||
|
||
const STATUS_PACKET: u8 = 0x00; | ||
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; | ||
|
@@ -133,7 +134,7 @@ const NODE_DATA_PACKET: u8 = 0x0e; | |
const GET_RECEIPTS_PACKET: u8 = 0x0f; | ||
const RECEIPTS_PACKET: u8 = 0x10; | ||
|
||
const CONNECTION_TIMEOUT_SEC: f64 = 10f64; | ||
const CONNECTION_TIMEOUT_SEC: f64 = 15f64; | ||
|
||
#[derive(Copy, Clone, Eq, PartialEq, Debug)] | ||
/// Sync state | ||
|
@@ -639,7 +640,7 @@ impl ChainSync { | |
self.sync_peer(io, p, false); | ||
} | ||
} | ||
if !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { | ||
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { | ||
self.complete_sync(); | ||
} | ||
} | ||
|
@@ -665,7 +666,7 @@ impl ChainSync { | |
return; | ||
} | ||
if self.state == SyncState::Waiting { | ||
trace!(target: "sync", "Waiting for block queue"); | ||
trace!(target: "sync", "Waiting for the block queue"); | ||
return; | ||
} | ||
(peer.latest_hash.clone(), peer.difficulty.clone()) | ||
|
@@ -689,7 +690,7 @@ impl ChainSync { | |
// Request subchain headers | ||
trace!(target: "sync", "Starting sync with better chain"); | ||
let last = self.last_imported_hash.clone(); | ||
self.request_headers_by_hash(io, peer_id, &last, 128, 255, false, PeerAsking::Heads); | ||
self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 1, false, PeerAsking::Heads); | ||
}, | ||
SyncState::Blocks | SyncState::NewBlocks => { | ||
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown { | ||
|
@@ -704,6 +705,8 @@ impl ChainSync { | |
fn start_sync_round(&mut self, io: &mut SyncIo) { | ||
self.state = SyncState::ChainHead; | ||
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); | ||
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even | ||
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round. | ||
if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 { | ||
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { | ||
Some(h) => { | ||
|
@@ -781,9 +784,13 @@ impl ChainSync { | |
|
||
match io.chain().import_block(block) { | ||
Err(Error::Import(ImportError::AlreadyInChain)) => { | ||
self.last_imported_block = number; | ||
self.last_imported_hash = h.clone(); | ||
trace!(target: "sync", "Block already in chain {:?}", h); | ||
}, | ||
Err(Error::Import(ImportError::AlreadyQueued)) => { | ||
self.last_imported_block = number; | ||
self.last_imported_hash = h.clone(); | ||
trace!(target: "sync", "Block already queued {:?}", h); | ||
}, | ||
Ok(_) => { | ||
|
@@ -856,22 +863,15 @@ impl ChainSync { | |
|
||
/// Generic request sender | ||
fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { | ||
{ | ||
let peer = self.peers.get_mut(&peer_id).unwrap(); | ||
if peer.asking != PeerAsking::Nothing { | ||
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); | ||
} | ||
let peer = self.peers.get_mut(&peer_id).unwrap(); | ||
if peer.asking != PeerAsking::Nothing { | ||
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); | ||
} | ||
match sync.send(peer_id, packet_id, packet) { | ||
Err(e) => { | ||
debug!(target:"sync", "Error sending request: {:?}", e); | ||
sync.disable_peer(peer_id); | ||
} | ||
Ok(_) => { | ||
let mut peer = self.peers.get_mut(&peer_id).unwrap(); | ||
peer.asking = asking; | ||
peer.ask_time = time::precise_time_s(); | ||
} | ||
peer.asking = asking; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Always set the asking status so that requested blocks would be unmarked as being downloaded on disconnect. |
||
peer.ask_time = time::precise_time_s(); | ||
if let Err(e) = sync.send(peer_id, packet_id, packet) { | ||
debug!(target:"sync", "Error sending request: {:?}", e); | ||
sync.disable_peer(peer_id); | ||
} | ||
} | ||
|
||
|
@@ -1099,6 +1099,7 @@ impl ChainSync { | |
let tick = time::precise_time_s(); | ||
for (peer_id, peer) in &self.peers { | ||
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { | ||
trace!(target:"sync", "Timeouted {}", peer_id); | ||
io.disconnect_peer(*peer_id); | ||
} | ||
} | ||
|
@@ -1164,24 +1165,23 @@ impl ChainSync { | |
.collect::<Vec<_>>() | ||
} | ||
|
||
/// propagates latest block to lagging peers | ||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { | ||
let updated_peers = { | ||
let lagging_peers = self.get_lagging_peers(chain_info, io); | ||
|
||
// sqrt(x)/x scaled to max u32 | ||
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; | ||
let lucky_peers = match lagging_peers.len() { | ||
0 ... MIN_PEERS_PROPAGATION => lagging_peers, | ||
_ => lagging_peers.into_iter().filter(|_| ::rand::random::<u32>() < fraction).collect::<Vec<_>>() | ||
}; | ||
|
||
// taking at max of MAX_PEERS_PROPAGATION | ||
lucky_peers.iter().map(|&(id, _)| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>() | ||
}; | ||
fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> { | ||
use rand::Rng; | ||
let mut lagging_peers = self.get_lagging_peers(chain_info, io); | ||
// take sqrt(x) peers | ||
let mut count = (self.peers.len() as f64).powf(0.5).round() as usize; | ||
count = min(count, MAX_PEERS_PROPAGATION); | ||
count = max(count, MIN_PEERS_PROPAGATION); | ||
::rand::thread_rng().shuffle(&mut lagging_peers); | ||
lagging_peers.into_iter().take(count).collect::<Vec<_>>() | ||
} | ||
|
||
/// propagates latest block to lagging peers | ||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { | ||
let lucky_peers = self.select_lagging_peers(chain_info, io); | ||
let mut sent = 0; | ||
for peer_id in updated_peers { | ||
for (peer_id, _) in lucky_peers { | ||
let rlp = ChainSync::create_latest_block_rlp(io.chain()); | ||
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); | ||
self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone(); | ||
|
@@ -1193,12 +1193,12 @@ impl ChainSync { | |
|
||
/// propagates new known hashes to all peers | ||
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { | ||
let updated_peers = self.get_lagging_peers(chain_info, io); | ||
let lucky_peers = self.select_lagging_peers(chain_info, io); | ||
let mut sent = 0; | ||
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); | ||
for (peer_id, peer_number) in updated_peers { | ||
for (peer_id, peer_number) in lucky_peers { | ||
let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone(); | ||
if chain_info.best_block_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber { | ||
if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber { | ||
// If we think peer is too far behind just send one latest hash | ||
peer_best = last_parent.clone(); | ||
} | ||
|
@@ -1259,15 +1259,15 @@ impl ChainSync { | |
} | ||
|
||
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the propagation order here to give uncles prioriy |
||
self.propagate_new_transactions(io); | ||
let chain_info = io.chain().chain_info(); | ||
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { | ||
let blocks = self.propagate_blocks(&chain_info, io); | ||
let hashes = self.propagate_new_hashes(&chain_info, io); | ||
let blocks = self.propagate_blocks(&chain_info, io); | ||
if blocks != 0 || hashes != 0 { | ||
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); | ||
} | ||
} | ||
self.propagate_new_transactions(io); | ||
self.last_sent_block_number = chain_info.best_block_number; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,7 +159,7 @@ fn propagate_hashes() { | |
|
||
#[test] | ||
fn propagate_blocks() { | ||
let mut net = TestNet::new(2); | ||
let mut net = TestNet::new(20); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't it also work for a small number of peers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now the NewHashes message is sent out first, than NewBlock to some of the remaining peers. So this test requires more peers |
||
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); | ||
net.sync(); | ||
|
||
|
@@ -169,7 +169,8 @@ fn propagate_blocks() { | |
|
||
assert!(!net.peer(0).queue.is_empty()); | ||
// NEW_BLOCK_PACKET | ||
assert_eq!(0x07, net.peer(0).queue[0].packet_id); | ||
let blocks = net.peer(0).queue.iter().filter(|p| p.packet_id == 0x7).count(); | ||
assert!(blocks > 0); | ||
} | ||
|
||
#[test] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taking this code out of a nested block would prevent a second hashmap lookup