Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Code cleanup in the sync module #11552

Merged
merged 6 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ethcore/client-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ pub trait IoClient: Sync + Send {
/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize);

/// Queue block import with transaction receipts. Does no sealing and transaction validation.
fn queue_ancient_block(&self, block_bytes: Unverified, receipts_bytes: Bytes) -> EthcoreResult<H256>;
/// Queue block import with transaction receipts. Does no sealing or transaction validation.
fn queue_ancient_block(&self, unverified: Unverified, receipts_bytes: Bytes) -> EthcoreResult<H256>;

/// Queue consensus engine message.
fn queue_consensus_message(&self, message: Bytes);
Expand Down
6 changes: 4 additions & 2 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ struct Importer {
}

/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
/// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue.
/// Call `import_block()` to import a block asynchronously.
pub struct Client {
/// Flag used to disable the client forever. Not to be confused with `liveness`.
///
Expand Down Expand Up @@ -870,7 +870,7 @@ impl Client {
*self.on_user_defaults_change.lock() = Some(Box::new(f));
}

/// Flush the block import queue.
/// Flush the block import queue. Used mostly for tests.
pub fn flush_queue(&self) {
self.importer.block_queue.flush();
while !self.importer.block_queue.is_empty() {
Expand Down Expand Up @@ -1444,6 +1444,7 @@ impl ImportBlock for Client {
return Err(EthcoreError::Block(BlockError::UnknownParent(unverified.parent_hash())));
}

// If the queue is empty we propagate the block in a `PriorityTask`.
let raw = if self.importer.block_queue.is_empty() {
Some((unverified.bytes.clone(), *unverified.header.difficulty()))
} else {
Expand Down Expand Up @@ -2729,6 +2730,7 @@ impl ImportExportBlocks for Client {
}
};
self.flush_queue();

Ok(())
}
}
Expand Down
10 changes: 5 additions & 5 deletions ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,9 @@ impl BlockDownloader {
trace_sync!(self, "Error decoding block receipts RLP: {:?}", e);
BlockDownloaderImportError::Invalid
})?;
receipts.push(receipt.as_raw().to_vec());
receipts.push(receipt.as_raw());
}
let hashes = self.blocks.insert_receipts(receipts);
let hashes = self.blocks.insert_receipts(&receipts);
if hashes.len() != item_count {
trace_sync!(self, "Deactivating peer for giving invalid block receipts");
return Err(BlockDownloaderImportError::Invalid);
Expand Down Expand Up @@ -501,15 +501,15 @@ impl BlockDownloader {
MAX_BODIES_TO_REQUEST_SMALL
};

let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request, false);
let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request);
if !needed_bodies.is_empty() {
return Some(BlockRequest::Bodies {
hashes: needed_bodies,
});
}

if self.download_receipts {
let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST, false);
let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST);
if !needed_receipts.is_empty() {
return Some(BlockRequest::Receipts {
hashes: needed_receipts,
Expand All @@ -518,7 +518,7 @@ impl BlockDownloader {
}

// find subchain to download
if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, false) {
if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST) {
return Some(BlockRequest::Headers {
start: h,
count: count as u64,
Expand Down
137 changes: 71 additions & 66 deletions ethcore/sync/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::{HashSet, HashMap, hash_map};
use bytes::Bytes;
use ethereum_types::H256;
use keccak_hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP};
use log::{trace, warn};
use log::{debug, trace, warn};
use parity_util_mem::MallocSizeOf;
use rlp::{Rlp, RlpStream, DecoderError};
use triehash_ethereum::ordered_trie_root;
Expand Down Expand Up @@ -103,7 +103,7 @@ fn unverified_from_sync(header: SyncHeader, body: Option<SyncBody>) -> Unverifie
header: header.header,
transactions: body.transactions,
uncles: body.uncles,
bytes: stream.out().to_vec(),
bytes: stream.out(),
}
}

Expand Down Expand Up @@ -196,11 +196,11 @@ impl BlockCollection {
}

/// Insert a collection of block receipts for previously downloaded headers.
pub fn insert_receipts(&mut self, receipts: Vec<Bytes>) -> Vec<Vec<H256>> {
pub fn insert_receipts(&mut self, receipts: &[&[u8]]) -> Vec<Vec<H256>> {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
if !self.need_receipts {
return Vec::new();
}
receipts.into_iter()
receipts.iter()
.filter_map(|r| {
self.insert_receipt(r)
.map_err(|e| trace!(target: "sync", "Ignored invalid receipt: {:?}", e))
Expand All @@ -210,24 +210,28 @@ impl BlockCollection {
}

/// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded.
pub fn needed_bodies(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
pub fn needed_bodies(&mut self, count: usize) -> Vec<H256> {
if self.head.is_none() {
return Vec::new();
}
let mut needed_bodies: Vec<H256> = Vec::new();
let mut needed_bodies: Vec<H256> = Vec::with_capacity(count);
let mut head = self.head;
while head.is_some() && needed_bodies.len() < count {
head = self.parents.get(&head.unwrap()).cloned();
if let Some(head) = head {
match self.blocks.get(&head) {
Some(block) if block.body.is_none() && !self.downloading_bodies.contains(&head) => {
self.downloading_bodies.insert(head.clone());
needed_bodies.push(head.clone());
while needed_bodies.len() < count {
head = match head {
Some(head) => {
match self.blocks.get(&head) {
Some(block) if block.body.is_none() && !self.downloading_bodies.contains(&head) => {
self.downloading_bodies.insert(head.clone());
needed_bodies.push(head.clone());
}
_ => (),
}
_ => (),
}
}
self.parents.get(&head).copied()
},
None => break
};
}

for h in self.header_ids.values() {
if needed_bodies.len() >= count {
break;
Expand All @@ -241,25 +245,28 @@ impl BlockCollection {
}

/// Returns a set of block hashes that require a receipt download. The returned set is marked as being downloaded.
pub fn needed_receipts(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
pub fn needed_receipts(&mut self, count: usize) -> Vec<H256> {
if self.head.is_none() || !self.need_receipts {
return Vec::new();
}
let mut needed_receipts: Vec<H256> = Vec::new();
let mut needed_receipts: Vec<H256> = Vec::with_capacity(count);
let mut head = self.head;
while head.is_some() && needed_receipts.len() < count {
head = self.parents.get(&head.unwrap()).cloned();
if let Some(head) = head {
match self.blocks.get(&head) {
Some(block) => {
if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) {
self.downloading_receipts.insert(block.receipts_root);
needed_receipts.push(head.clone());
while needed_receipts.len() < count {
head = match head {
Some(head) => {
match self.blocks.get(&head) {
Some(block) => {
if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) {
self.downloading_receipts.insert(block.receipts_root);
needed_receipts.push(head);
}
}
_ => (),
}
_ => (),
}
}
self.parents.get(&head).copied()
},
None => break
};
}
// If there are multiple blocks per receipt, only request one of them.
for (root, h) in self.receipt_ids.iter().map(|(root, hashes)| (root, hashes[0])) {
Expand All @@ -275,12 +282,12 @@ impl BlockCollection {
}

/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_headers(&mut self, count: usize, ignore_downloading: bool) -> Option<(H256, usize)> {
pub fn needed_headers(&mut self, count: usize) -> Option<(H256, usize)> {
// find subchain to download
let mut download = None;
{
for h in &self.heads {
if ignore_downloading || !self.downloading_headers.contains(h) {
if !self.downloading_headers.contains(h) {
self.downloading_headers.insert(h.clone());
download = Some(h.clone());
break;
Expand Down Expand Up @@ -317,42 +324,40 @@ impl BlockCollection {
return Vec::new();
}

let mut drained = Vec::new();
let mut hashes = Vec::new();
{
let mut blocks = Vec::new();
let mut head = self.head;
while let Some(h) = head {
head = self.parents.get(&h).cloned();
if let Some(head) = head {
match self.blocks.remove(&head) {
Some(block) => {
if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) {
blocks.push(block);
hashes.push(head);
self.head = Some(head);
} else {
self.blocks.insert(head, block);
break;
}
},
_ => {
let mut blocks = Vec::new();
let mut head = self.head;
while let Some(h) = head {
head = self.parents.get(&h).copied();
if let Some(head) = head {
match self.blocks.remove(&head) {
Some(block) => {
if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) {
blocks.push(block);
hashes.push(head);
self.head = Some(head);
} else {
self.blocks.insert(head, block);
break;
},
}
}
},
_ => {
break;
},
}
}
}

for block in blocks.into_iter() {
let unverified = unverified_from_sync(block.header, block.body);
drained.push(BlockAndReceipts {
block: unverified,
receipts: block.receipts.clone(),
});
}
let mut drained = Vec::with_capacity(blocks.len());
for block in blocks {
let unverified = unverified_from_sync(block.header, block.body);
drained.push(BlockAndReceipts {
block: unverified,
receipts: block.receipts,
});
}

trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
debug!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
drained
}

Expand Down Expand Up @@ -409,7 +414,7 @@ impl BlockCollection {
}
}

fn insert_receipt(&mut self, r: Bytes) -> Result<Vec<H256>, network::Error> {
fn insert_receipt(&mut self, r: &[u8]) -> Result<Vec<H256>, network::Error> {
let receipt_root = {
let receipts = Rlp::new(&r);
ordered_trie_root(receipts.iter().map(|r| r.as_raw()))
Expand All @@ -422,7 +427,7 @@ impl BlockCollection {
match self.blocks.get_mut(&h) {
Some(ref mut block) => {
trace!(target: "sync", "Got receipt {}", h);
block.receipts = Some(r.clone());
block.receipts = Some(r.to_vec());
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
},
None => {
warn!("Got receipt with no header {}", h);
Expand Down Expand Up @@ -581,11 +586,11 @@ mod test {
bc.reset_to(heads);
assert!(!bc.is_empty());
assert_eq!(hashes[0], bc.heads[0]);
assert!(bc.needed_bodies(1, false).is_empty());
assert!(bc.needed_bodies(1).is_empty());
assert!(!bc.contains(&hashes[0]));
assert!(!bc.is_downloading(&hashes[0]));

let (h, n) = bc.needed_headers(6, false).unwrap();
let (h, n) = bc.needed_headers(6).unwrap();
assert!(bc.is_downloading(&hashes[0]));
assert_eq!(hashes[0], h);
assert_eq!(n, 6);
Expand All @@ -608,9 +613,9 @@ mod test {
assert!(!bc.contains(&hashes[0]));
assert_eq!(hashes[5], bc.head.unwrap());

let (h, _) = bc.needed_headers(6, false).unwrap();
let (h, _) = bc.needed_headers(6).unwrap();
assert_eq!(hashes[5], h);
let (h, _) = bc.needed_headers(6, false).unwrap();
let (h, _) = bc.needed_headers(6).unwrap();
assert_eq!(hashes[20], h);
bc.insert_headers(headers[10..16].into_iter().map(Clone::clone).collect());
assert!(bc.drain().is_empty());
Expand Down