Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Sync fixes and tweaks #1164

Merged
merged 5 commits into from
May 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<V> Client<V> where V: Verifier {

/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
let max_blocks_to_import = 128;
let max_blocks_to_import = 64;

let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fn can_collect_garbage() {
fn can_handle_long_fork() {
let client_result = generate_dummy_client(1200);
let client = client_result.reference();
for _ in 0..10 {
for _ in 0..20 {
client.import_verified_blocks(&IoChannel::disconnected());
}
assert_eq!(1200, client.chain_info().best_block_number);
Expand All @@ -124,7 +124,7 @@ fn can_handle_long_fork() {
push_blocks_to_client(client, 49, 1201, 800);
push_blocks_to_client(client, 53, 1201, 600);

for _ in 0..20 {
for _ in 0..40 {
client.import_verified_blocks(&IoChannel::disconnected());
}
assert_eq!(2000, client.chain_info().best_block_number);
Expand Down
80 changes: 40 additions & 40 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ const MAX_HEADERS_TO_SEND: usize = 512;
const MAX_NODE_DATA_TO_SEND: usize = 1024;
const MAX_RECEIPTS_TO_SEND: usize = 1024;
const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
const MAX_HEADERS_TO_REQUEST: usize = 256;
const MAX_HEADERS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST: usize = 64;
const MIN_PEERS_PROPAGATION: usize = 4;
const MAX_PEERS_PROPAGATION: usize = 128;
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
const SUBCHAIN_SIZE: usize = 64;

const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
Expand All @@ -133,7 +134,7 @@ const NODE_DATA_PACKET: u8 = 0x0e;
const GET_RECEIPTS_PACKET: u8 = 0x0f;
const RECEIPTS_PACKET: u8 = 0x10;

const CONNECTION_TIMEOUT_SEC: f64 = 10f64;
const CONNECTION_TIMEOUT_SEC: f64 = 15f64;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
Expand Down Expand Up @@ -639,7 +640,7 @@ impl ChainSync {
self.sync_peer(io, p, false);
}
}
if !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
self.complete_sync();
}
}
Expand All @@ -665,7 +666,7 @@ impl ChainSync {
return;
}
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for block queue");
trace!(target: "sync", "Waiting for the block queue");
return;
}
(peer.latest_hash.clone(), peer.difficulty.clone())
Expand All @@ -689,7 +690,7 @@ impl ChainSync {
// Request subchain headers
trace!(target: "sync", "Starting sync with better chain");
let last = self.last_imported_hash.clone();
self.request_headers_by_hash(io, peer_id, &last, 128, 255, false, PeerAsking::Heads);
self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 1, false, PeerAsking::Heads);
},
SyncState::Blocks | SyncState::NewBlocks => {
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown {
Expand All @@ -704,6 +705,8 @@ impl ChainSync {
fn start_sync_round(&mut self, io: &mut SyncIo) {
self.state = SyncState::ChainHead;
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 {
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
Some(h) => {
Expand Down Expand Up @@ -781,9 +784,13 @@ impl ChainSync {

match io.chain().import_block(block) {
Err(Error::Import(ImportError::AlreadyInChain)) => {
self.last_imported_block = number;
self.last_imported_hash = h.clone();
trace!(target: "sync", "Block already in chain {:?}", h);
},
Err(Error::Import(ImportError::AlreadyQueued)) => {
self.last_imported_block = number;
self.last_imported_hash = h.clone();
trace!(target: "sync", "Block already queued {:?}", h);
},
Ok(_) => {
Expand Down Expand Up @@ -856,22 +863,15 @@ impl ChainSync {

/// Generic request sender
fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
{
let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing {
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
}
let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing {
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taking this code out of a nested block would prevent a second hashmap lookup

match sync.send(peer_id, packet_id, packet) {
Err(e) => {
debug!(target:"sync", "Error sending request: {:?}", e);
sync.disable_peer(peer_id);
}
Ok(_) => {
let mut peer = self.peers.get_mut(&peer_id).unwrap();
peer.asking = asking;
peer.ask_time = time::precise_time_s();
}
peer.asking = asking;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always set the asking status so that requested blocks would be unmarked as being downloaded on disconnect.

peer.ask_time = time::precise_time_s();
if let Err(e) = sync.send(peer_id, packet_id, packet) {
debug!(target:"sync", "Error sending request: {:?}", e);
sync.disable_peer(peer_id);
}
}

Expand Down Expand Up @@ -1099,6 +1099,7 @@ impl ChainSync {
let tick = time::precise_time_s();
for (peer_id, peer) in &self.peers {
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
trace!(target:"sync", "Timeouted {}", peer_id);
io.disconnect_peer(*peer_id);
}
}
Expand Down Expand Up @@ -1164,24 +1165,23 @@ impl ChainSync {
.collect::<Vec<_>>()
}

/// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let updated_peers = {
let lagging_peers = self.get_lagging_peers(chain_info, io);

// sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let lucky_peers = match lagging_peers.len() {
0 ... MIN_PEERS_PROPAGATION => lagging_peers,
_ => lagging_peers.into_iter().filter(|_| ::rand::random::<u32>() < fraction).collect::<Vec<_>>()
};

// taking at max of MAX_PEERS_PROPAGATION
lucky_peers.iter().map(|&(id, _)| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
};
fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> {
use rand::Rng;
let mut lagging_peers = self.get_lagging_peers(chain_info, io);
// take sqrt(x) peers
let mut count = (self.peers.len() as f64).powf(0.5).round() as usize;
count = min(count, MAX_PEERS_PROPAGATION);
count = max(count, MIN_PEERS_PROPAGATION);
::rand::thread_rng().shuffle(&mut lagging_peers);
lagging_peers.into_iter().take(count).collect::<Vec<_>>()
}

/// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let lucky_peers = self.select_lagging_peers(chain_info, io);
let mut sent = 0;
for peer_id in updated_peers {
for (peer_id, _) in lucky_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone();
Expand All @@ -1193,12 +1193,12 @@ impl ChainSync {

/// propagates new known hashes to all peers
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let updated_peers = self.get_lagging_peers(chain_info, io);
let lucky_peers = self.select_lagging_peers(chain_info, io);
let mut sent = 0;
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
for (peer_id, peer_number) in updated_peers {
for (peer_id, peer_number) in lucky_peers {
let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone();
if chain_info.best_block_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber {
if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber {
// If we think peer is too far behind just send one latest hash
peer_best = last_parent.clone();
}
Expand Down Expand Up @@ -1259,15 +1259,15 @@ impl ChainSync {
}

fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the propagation order here to give uncles prioriy

self.propagate_new_transactions(io);
let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagate_blocks(&chain_info, io);
let hashes = self.propagate_new_hashes(&chain_info, io);
let blocks = self.propagate_blocks(&chain_info, io);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
}
}
self.propagate_new_transactions(io);
self.last_sent_block_number = chain_info.best_block_number;
}

Expand Down
5 changes: 3 additions & 2 deletions sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ fn propagate_hashes() {

#[test]
fn propagate_blocks() {
let mut net = TestNet::new(2);
let mut net = TestNet::new(20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it also work for a small number of peers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the NewHashes message is sent out first, than NewBlock to some of the remaining peers. So this test requires more peers

net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.sync();

Expand All @@ -169,7 +169,8 @@ fn propagate_blocks() {

assert!(!net.peer(0).queue.is_empty());
// NEW_BLOCK_PACKET
assert_eq!(0x07, net.peer(0).queue[0].packet_id);
let blocks = net.peer(0).queue.iter().filter(|p| p.packet_id == 0x7).count();
assert!(blocks > 0);
}

#[test]
Expand Down