Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Code cleanup in the sync module #11552

Merged
merged 6 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ethcore/client-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ pub trait IoClient: Sync + Send {
/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize);

/// Queue block import with transaction receipts. Does no sealing and transaction validation.
fn queue_ancient_block(&self, block_bytes: Unverified, receipts_bytes: Bytes) -> EthcoreResult<H256>;
/// Queue block import with transaction receipts. Does no sealing or transaction validation.
fn queue_ancient_block(&self, unverified: Unverified, receipts_bytes: Bytes) -> EthcoreResult<H256>;

/// Queue consensus engine message.
fn queue_consensus_message(&self, message: Bytes);
Expand Down
8 changes: 6 additions & 2 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ struct Importer {
}

/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
/// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue.
/// Call `import_block()` to import a block asynchronously.
pub struct Client {
/// Flag used to disable the client forever. Not to be confused with `liveness`.
///
Expand Down Expand Up @@ -871,6 +871,7 @@ impl Client {
}

/// Flush the block import queue.
#[cfg(any(test, feature = "test-helpers"))]
pub fn flush_queue(&self) {
self.importer.block_queue.flush();
while !self.importer.block_queue.is_empty() {
Expand Down Expand Up @@ -1444,6 +1445,7 @@ impl ImportBlock for Client {
return Err(EthcoreError::Block(BlockError::UnknownParent(unverified.parent_hash())));
}

// If the queue is empty we propagate the block in a `PriorityTask`.
let raw = if self.importer.block_queue.is_empty() {
Some((unverified.bytes.clone(), *unverified.header.difficulty()))
} else {
Expand Down Expand Up @@ -2728,7 +2730,9 @@ impl ImportExportBlocks for Client {
}
}
};
self.flush_queue();
self.importer.block_queue.flush();
self.import_verified_blocks();
dvdplm marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
}
Expand Down
10 changes: 5 additions & 5 deletions ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,9 @@ impl BlockDownloader {
trace_sync!(self, "Error decoding block receipts RLP: {:?}", e);
BlockDownloaderImportError::Invalid
})?;
receipts.push(receipt.as_raw().to_vec());
receipts.push(receipt.as_raw());
}
let hashes = self.blocks.insert_receipts(receipts);
let hashes = self.blocks.insert_receipts(&receipts);
if hashes.len() != item_count {
trace_sync!(self, "Deactivating peer for giving invalid block receipts");
return Err(BlockDownloaderImportError::Invalid);
Expand Down Expand Up @@ -501,15 +501,15 @@ impl BlockDownloader {
MAX_BODIES_TO_REQUEST_SMALL
};

let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request, false);
let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request);
if !needed_bodies.is_empty() {
return Some(BlockRequest::Bodies {
hashes: needed_bodies,
});
}

if self.download_receipts {
let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST, false);
let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST);
if !needed_receipts.is_empty() {
return Some(BlockRequest::Receipts {
hashes: needed_receipts,
Expand All @@ -518,7 +518,7 @@ impl BlockDownloader {
}

// find subchain to download
if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, false) {
if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST) {
return Some(BlockRequest::Headers {
start: h,
count: count as u64,
Expand Down
90 changes: 44 additions & 46 deletions ethcore/sync/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::{HashSet, HashMap, hash_map};
use bytes::Bytes;
use ethereum_types::H256;
use keccak_hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP};
use log::{trace, warn};
use log::{debug, trace, warn};
use parity_util_mem::MallocSizeOf;
use rlp::{Rlp, RlpStream, DecoderError};
use triehash_ethereum::ordered_trie_root;
Expand Down Expand Up @@ -103,7 +103,7 @@ fn unverified_from_sync(header: SyncHeader, body: Option<SyncBody>) -> Unverifie
header: header.header,
transactions: body.transactions,
uncles: body.uncles,
bytes: stream.out().to_vec(),
bytes: stream.out(),
}
}

Expand Down Expand Up @@ -196,11 +196,11 @@ impl BlockCollection {
}

/// Insert a collection of block receipts for previously downloaded headers.
pub fn insert_receipts(&mut self, receipts: Vec<Bytes>) -> Vec<Vec<H256>> {
pub fn insert_receipts(&mut self, receipts: &[&[u8]]) -> Vec<Vec<H256>> {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
if !self.need_receipts {
return Vec::new();
}
receipts.into_iter()
receipts.iter()
.filter_map(|r| {
self.insert_receipt(r)
.map_err(|e| trace!(target: "sync", "Ignored invalid receipt: {:?}", e))
Expand All @@ -210,11 +210,11 @@ impl BlockCollection {
}

/// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded.
pub fn needed_bodies(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
pub fn needed_bodies(&mut self, count: usize) -> Vec<H256> {
if self.head.is_none() {
return Vec::new();
}
let mut needed_bodies: Vec<H256> = Vec::new();
let mut needed_bodies: Vec<H256> = Vec::with_capacity(count);
let mut head = self.head;
while head.is_some() && needed_bodies.len() < count {
head = self.parents.get(&head.unwrap()).cloned();
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -241,11 +241,11 @@ impl BlockCollection {
}

/// Returns a set of block hashes that require a receipt download. The returned set is marked as being downloaded.
pub fn needed_receipts(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
pub fn needed_receipts(&mut self, count: usize) -> Vec<H256> {
if self.head.is_none() || !self.need_receipts {
return Vec::new();
}
let mut needed_receipts: Vec<H256> = Vec::new();
let mut needed_receipts: Vec<H256> = Vec::with_capacity(count);
let mut head = self.head;
while head.is_some() && needed_receipts.len() < count {
head = self.parents.get(&head.unwrap()).cloned();
Expand All @@ -254,7 +254,7 @@ impl BlockCollection {
Some(block) => {
if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) {
self.downloading_receipts.insert(block.receipts_root);
needed_receipts.push(head.clone());
needed_receipts.push(head);
}
}
_ => (),
Expand All @@ -275,12 +275,12 @@ impl BlockCollection {
}

/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_headers(&mut self, count: usize, ignore_downloading: bool) -> Option<(H256, usize)> {
pub fn needed_headers(&mut self, count: usize) -> Option<(H256, usize)> {
// find subchain to download
let mut download = None;
{
for h in &self.heads {
if ignore_downloading || !self.downloading_headers.contains(h) {
if !self.downloading_headers.contains(h) {
self.downloading_headers.insert(h.clone());
download = Some(h.clone());
break;
Expand Down Expand Up @@ -317,42 +317,40 @@ impl BlockCollection {
return Vec::new();
}

let mut drained = Vec::new();
let mut hashes = Vec::new();
{
let mut blocks = Vec::new();
let mut head = self.head;
while let Some(h) = head {
head = self.parents.get(&h).cloned();
if let Some(head) = head {
match self.blocks.remove(&head) {
Some(block) => {
if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) {
blocks.push(block);
hashes.push(head);
self.head = Some(head);
} else {
self.blocks.insert(head, block);
break;
}
},
_ => {
let mut blocks = Vec::new();
let mut head = self.head;
while let Some(h) = head {
head = self.parents.get(&h).cloned();
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
if let Some(head) = head {
match self.blocks.remove(&head) {
Some(block) => {
if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) {
blocks.push(block);
hashes.push(head);
self.head = Some(head);
} else {
self.blocks.insert(head, block);
break;
},
}
}
},
_ => {
break;
},
}
}
}

for block in blocks.into_iter() {
let unverified = unverified_from_sync(block.header, block.body);
drained.push(BlockAndReceipts {
block: unverified,
receipts: block.receipts.clone(),
});
}
let mut drained = Vec::with_capacity(blocks.len());
for block in blocks {
let unverified = unverified_from_sync(block.header, block.body);
drained.push(BlockAndReceipts {
block: unverified,
receipts: block.receipts,
});
}

trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
debug!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
drained
}

Expand Down Expand Up @@ -409,7 +407,7 @@ impl BlockCollection {
}
}

fn insert_receipt(&mut self, r: Bytes) -> Result<Vec<H256>, network::Error> {
fn insert_receipt(&mut self, r: &[u8]) -> Result<Vec<H256>, network::Error> {
let receipt_root = {
let receipts = Rlp::new(&r);
ordered_trie_root(receipts.iter().map(|r| r.as_raw()))
Expand All @@ -422,7 +420,7 @@ impl BlockCollection {
match self.blocks.get_mut(&h) {
Some(ref mut block) => {
trace!(target: "sync", "Got receipt {}", h);
block.receipts = Some(r.clone());
block.receipts = Some(r.to_vec());
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
},
None => {
warn!("Got receipt with no header {}", h);
Expand Down Expand Up @@ -581,11 +579,11 @@ mod test {
bc.reset_to(heads);
assert!(!bc.is_empty());
assert_eq!(hashes[0], bc.heads[0]);
assert!(bc.needed_bodies(1, false).is_empty());
assert!(bc.needed_bodies(1).is_empty());
assert!(!bc.contains(&hashes[0]));
assert!(!bc.is_downloading(&hashes[0]));

let (h, n) = bc.needed_headers(6, false).unwrap();
let (h, n) = bc.needed_headers(6).unwrap();
assert!(bc.is_downloading(&hashes[0]));
assert_eq!(hashes[0], h);
assert_eq!(n, 6);
Expand All @@ -608,9 +606,9 @@ mod test {
assert!(!bc.contains(&hashes[0]));
assert_eq!(hashes[5], bc.head.unwrap());

let (h, _) = bc.needed_headers(6, false).unwrap();
let (h, _) = bc.needed_headers(6).unwrap();
assert_eq!(hashes[5], h);
let (h, _) = bc.needed_headers(6, false).unwrap();
let (h, _) = bc.needed_headers(6).unwrap();
assert_eq!(hashes[20], h);
bc.insert_headers(headers[10..16].into_iter().map(Clone::clone).collect());
assert!(bc.drain().is_empty());
Expand Down