diff --git a/ethcore/client-traits/src/lib.rs b/ethcore/client-traits/src/lib.rs index 79d050ae2c5..ed0ad0dedc4 100644 --- a/ethcore/client-traits/src/lib.rs +++ b/ethcore/client-traits/src/lib.rs @@ -197,8 +197,8 @@ pub trait IoClient: Sync + Send { /// Queue transactions for importing. fn queue_transactions(&self, transactions: Vec, peer_id: usize); - /// Queue block import with transaction receipts. Does no sealing and transaction validation. - fn queue_ancient_block(&self, block_bytes: Unverified, receipts_bytes: Bytes) -> EthcoreResult; + /// Queue block import with transaction receipts. Does no sealing or transaction validation. + fn queue_ancient_block(&self, unverified: Unverified, receipts_bytes: Bytes) -> EthcoreResult; /// Queue consensus engine message. fn queue_consensus_message(&self, message: Bytes); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index c51a13b09b7..4e6c24c45dc 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -178,7 +178,7 @@ struct Importer { } /// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue. -/// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue. +/// Call `import_block()` to import a block asynchronously. pub struct Client { /// Flag used to disable the client forever. Not to be confused with `liveness`. /// @@ -870,7 +870,7 @@ impl Client { *self.on_user_defaults_change.lock() = Some(Box::new(f)); } - /// Flush the block import queue. + /// Flush the block import queue. Used mostly for tests. pub fn flush_queue(&self) { self.importer.block_queue.flush(); while !self.importer.block_queue.is_empty() { @@ -1444,6 +1444,7 @@ impl ImportBlock for Client { return Err(EthcoreError::Block(BlockError::UnknownParent(unverified.parent_hash()))); } + // If the queue is empty we propagate the block in a `PriorityTask`. let raw = if self.importer.block_queue.is_empty() { Some((unverified.bytes.clone(), *unverified.header.difficulty())) } else { @@ -2729,6 +2730,7 @@ impl ImportExportBlocks for Client { } }; self.flush_queue(); + Ok(()) } } diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 81f55aac880..2a65e7ba265 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -407,9 +407,9 @@ impl BlockDownloader { trace_sync!(self, "Error decoding block receipts RLP: {:?}", e); BlockDownloaderImportError::Invalid })?; - receipts.push(receipt.as_raw().to_vec()); + receipts.push(receipt.as_raw()); } - let hashes = self.blocks.insert_receipts(receipts); + let hashes = self.blocks.insert_receipts(&receipts); if hashes.len() != item_count { trace_sync!(self, "Deactivating peer for giving invalid block receipts"); return Err(BlockDownloaderImportError::Invalid); @@ -501,7 +501,7 @@ impl BlockDownloader { MAX_BODIES_TO_REQUEST_SMALL }; - let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request, false); + let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request); if !needed_bodies.is_empty() { return Some(BlockRequest::Bodies { hashes: needed_bodies, @@ -509,7 +509,7 @@ impl BlockDownloader { } if self.download_receipts { - let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST, false); + let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST); if !needed_receipts.is_empty() { return Some(BlockRequest::Receipts { hashes: needed_receipts, @@ -518,7 +518,7 @@ impl BlockDownloader { } // find subchain to download - if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, false) { + if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST) { return Some(BlockRequest::Headers { start: h, count: count as u64, diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 44c1697557b..c9ec3b513cc 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -19,7 +19,7 @@ use std::collections::{HashSet, HashMap, hash_map}; use bytes::Bytes; use ethereum_types::H256; use keccak_hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP}; -use log::{trace, warn}; +use log::{debug, trace, warn}; use parity_util_mem::MallocSizeOf; use rlp::{Rlp, RlpStream, DecoderError}; use triehash_ethereum::ordered_trie_root; @@ -103,7 +103,7 @@ fn unverified_from_sync(header: SyncHeader, body: Option) -> Unverifie header: header.header, transactions: body.transactions, uncles: body.uncles, - bytes: stream.out().to_vec(), + bytes: stream.out(), } } @@ -196,11 +196,11 @@ impl BlockCollection { } /// Insert a collection of block receipts for previously downloaded headers. - pub fn insert_receipts(&mut self, receipts: Vec) -> Vec> { + pub fn insert_receipts(&mut self, receipts: &[&[u8]]) -> Vec> { if !self.need_receipts { return Vec::new(); } - receipts.into_iter() + receipts.iter() .filter_map(|r| { self.insert_receipt(r) .map_err(|e| trace!(target: "sync", "Ignored invalid receipt: {:?}", e)) @@ -210,24 +210,28 @@ impl BlockCollection { } /// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded. - pub fn needed_bodies(&mut self, count: usize, _ignore_downloading: bool) -> Vec { + pub fn needed_bodies(&mut self, count: usize) -> Vec { if self.head.is_none() { return Vec::new(); } - let mut needed_bodies: Vec = Vec::new(); + let mut needed_bodies: Vec = Vec::with_capacity(count); let mut head = self.head; - while head.is_some() && needed_bodies.len() < count { - head = self.parents.get(&head.unwrap()).cloned(); - if let Some(head) = head { - match self.blocks.get(&head) { - Some(block) if block.body.is_none() && !self.downloading_bodies.contains(&head) => { - self.downloading_bodies.insert(head.clone()); - needed_bodies.push(head.clone()); + while needed_bodies.len() < count { + head = match head { + Some(head) => { + match self.blocks.get(&head) { + Some(block) if block.body.is_none() && !self.downloading_bodies.contains(&head) => { + self.downloading_bodies.insert(head.clone()); + needed_bodies.push(head.clone()); + } + _ => (), } - _ => (), - } - } + self.parents.get(&head).copied() + }, + None => break + }; } + for h in self.header_ids.values() { if needed_bodies.len() >= count { break; @@ -241,25 +245,28 @@ impl BlockCollection { } /// Returns a set of block hashes that require a receipt download. The returned set is marked as being downloaded. - pub fn needed_receipts(&mut self, count: usize, _ignore_downloading: bool) -> Vec { + pub fn needed_receipts(&mut self, count: usize) -> Vec { if self.head.is_none() || !self.need_receipts { return Vec::new(); } - let mut needed_receipts: Vec = Vec::new(); + let mut needed_receipts: Vec = Vec::with_capacity(count); let mut head = self.head; - while head.is_some() && needed_receipts.len() < count { - head = self.parents.get(&head.unwrap()).cloned(); - if let Some(head) = head { - match self.blocks.get(&head) { - Some(block) => { - if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) { - self.downloading_receipts.insert(block.receipts_root); - needed_receipts.push(head.clone()); + while needed_receipts.len() < count { + head = match head { + Some(head) => { + match self.blocks.get(&head) { + Some(block) => { + if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) { + self.downloading_receipts.insert(block.receipts_root); + needed_receipts.push(head); + } } + _ => (), } - _ => (), - } - } + self.parents.get(&head).copied() + }, + None => break + }; } // If there are multiple blocks per receipt, only request one of them. for (root, h) in self.receipt_ids.iter().map(|(root, hashes)| (root, hashes[0])) { @@ -275,12 +282,12 @@ impl BlockCollection { } /// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded. - pub fn needed_headers(&mut self, count: usize, ignore_downloading: bool) -> Option<(H256, usize)> { + pub fn needed_headers(&mut self, count: usize) -> Option<(H256, usize)> { // find subchain to download let mut download = None; { for h in &self.heads { - if ignore_downloading || !self.downloading_headers.contains(h) { + if !self.downloading_headers.contains(h) { self.downloading_headers.insert(h.clone()); download = Some(h.clone()); break; @@ -317,42 +324,40 @@ impl BlockCollection { return Vec::new(); } - let mut drained = Vec::new(); let mut hashes = Vec::new(); - { - let mut blocks = Vec::new(); - let mut head = self.head; - while let Some(h) = head { - head = self.parents.get(&h).cloned(); - if let Some(head) = head { - match self.blocks.remove(&head) { - Some(block) => { - if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) { - blocks.push(block); - hashes.push(head); - self.head = Some(head); - } else { - self.blocks.insert(head, block); - break; - } - }, - _ => { + let mut blocks = Vec::new(); + let mut head = self.head; + while let Some(h) = head { + head = self.parents.get(&h).copied(); + if let Some(head) = head { + match self.blocks.remove(&head) { + Some(block) => { + if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) { + blocks.push(block); + hashes.push(head); + self.head = Some(head); + } else { + self.blocks.insert(head, block); break; - }, - } + } + }, + _ => { + break; + }, } } + } - for block in blocks.into_iter() { - let unverified = unverified_from_sync(block.header, block.body); - drained.push(BlockAndReceipts { - block: unverified, - receipts: block.receipts.clone(), - }); - } + let mut drained = Vec::with_capacity(blocks.len()); + for block in blocks { + let unverified = unverified_from_sync(block.header, block.body); + drained.push(BlockAndReceipts { + block: unverified, + receipts: block.receipts, + }); } - trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head); + debug!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head); drained } @@ -409,7 +414,7 @@ impl BlockCollection { } } - fn insert_receipt(&mut self, r: Bytes) -> Result, network::Error> { + fn insert_receipt(&mut self, r: &[u8]) -> Result, network::Error> { let receipt_root = { let receipts = Rlp::new(&r); ordered_trie_root(receipts.iter().map(|r| r.as_raw())) @@ -422,7 +427,7 @@ impl BlockCollection { match self.blocks.get_mut(&h) { Some(ref mut block) => { trace!(target: "sync", "Got receipt {}", h); - block.receipts = Some(r.clone()); + block.receipts = Some(r.to_vec()); }, None => { warn!("Got receipt with no header {}", h); @@ -581,11 +586,11 @@ mod test { bc.reset_to(heads); assert!(!bc.is_empty()); assert_eq!(hashes[0], bc.heads[0]); - assert!(bc.needed_bodies(1, false).is_empty()); + assert!(bc.needed_bodies(1).is_empty()); assert!(!bc.contains(&hashes[0])); assert!(!bc.is_downloading(&hashes[0])); - let (h, n) = bc.needed_headers(6, false).unwrap(); + let (h, n) = bc.needed_headers(6).unwrap(); assert!(bc.is_downloading(&hashes[0])); assert_eq!(hashes[0], h); assert_eq!(n, 6); @@ -608,9 +613,9 @@ mod test { assert!(!bc.contains(&hashes[0])); assert_eq!(hashes[5], bc.head.unwrap()); - let (h, _) = bc.needed_headers(6, false).unwrap(); + let (h, _) = bc.needed_headers(6).unwrap(); assert_eq!(hashes[5], h); - let (h, _) = bc.needed_headers(6, false).unwrap(); + let (h, _) = bc.needed_headers(6).unwrap(); assert_eq!(hashes[20], h); bc.insert_headers(headers[10..16].into_iter().map(Clone::clone).collect()); assert!(bc.drain().is_empty());