Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Block import optimization (#2748)
Browse files Browse the repository at this point in the history
* Block import optimization

* whitespace

[ci:none]
  • Loading branch information
arkpar committed Oct 20, 2016
1 parent b9e4644 commit fc0c186
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 86 deletions.
61 changes: 42 additions & 19 deletions ethcore/src/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,21 @@ struct QueueSignal {

impl QueueSignal {
#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set(&self) {
fn set_sync(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
}

if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) {
debug!("Error sending BlockVerified message: {:?}", e);
}
}
}

#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set_async(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
Expand All @@ -131,8 +145,8 @@ impl QueueSignal {
struct Verification {
// All locks must be captured in the order declared here.
unverified: Mutex<VecDeque<UnverifiedBlock>>,
verified: Mutex<VecDeque<PreverifiedBlock>>,
verifying: Mutex<VecDeque<VerifyingBlock>>,
verified: Mutex<VecDeque<PreverifiedBlock>>,
bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>,
empty: SMutex<()>,
Expand All @@ -143,8 +157,8 @@ impl BlockQueue {
pub fn new(config: BlockQueueConfig, engine: Arc<Engine>, message_channel: IoChannel<ClientIoMessage>) -> BlockQueue {
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()),
empty: SMutex::new(()),
Expand Down Expand Up @@ -226,7 +240,7 @@ impl BlockQueue {
};

let block_hash = block.header.hash();
match verify_block_unordered(block.header, block.bytes, &*engine) {
let is_ready = match verify_block_unordered(block.header, block.bytes, &*engine) {
Ok(verified) => {
let mut verifying = verification.verifying.lock();
for e in verifying.iter_mut() {
Expand All @@ -240,7 +254,9 @@ impl BlockQueue {
let mut verified = verification.verified.lock();
let mut bad = verification.bad.lock();
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
true
} else {
false
}
},
Err(err) => {
Expand All @@ -250,9 +266,17 @@ impl BlockQueue {
warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err);
bad.insert(block_hash.clone());
verifying.retain(|e| e.hash != block_hash);
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash {
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
true
} else {
false
}
}
};
if is_ready {
// Import the block immediately
ready.set_sync();
}
}
}
Expand Down Expand Up @@ -361,15 +385,17 @@ impl BlockQueue {
*verified = new_verified;
}

/// Mark given block as processed
pub fn mark_as_good(&self, block_hashes: &[H256]) {
if block_hashes.is_empty() {
return;
/// Mark given item as processed.
/// Returns true if the queue becomes empty.
pub fn mark_as_good(&self, hashes: &[H256]) -> bool {
if hashes.is_empty() {
return self.processing.read().is_empty();
}
let mut processing = self.processing.write();
for hash in block_hashes {
for hash in hashes {
processing.remove(hash);
}
processing.is_empty()
}

/// Removes up to `max` verified blocks from the queue
Expand All @@ -383,7 +409,7 @@ impl BlockQueue {
}
self.ready_signal.reset();
if !verified.is_empty() {
self.ready_signal.set();
self.ready_signal.set_async();
}
result
}
Expand All @@ -408,12 +434,9 @@ impl BlockQueue {
verified_queue_size: verified_len,
max_queue_size: self.max_queue_size,
max_mem_use: self.max_mem_use,
mem_used:
unverified_bytes
+ verifying_bytes
+ verified_bytes
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.processing.read().heap_size_of_children(),
mem_used: unverified_bytes
+ verifying_bytes
+ verified_bytes
}
}

Expand Down
25 changes: 12 additions & 13 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,16 +355,19 @@ impl Client {

/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 64;
let (imported_blocks, import_results, invalid_blocks, imported, duration) = {
let max_blocks_to_import = 4;
let (imported_blocks, import_results, invalid_blocks, imported, duration, is_empty) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);

let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.drain(max_blocks_to_import);
if blocks.is_empty() {
return 0;
}
let _timer = PerfTimer::new("import_verified_blocks");
let start = precise_time_ns();
let blocks = self.block_queue.drain(max_blocks_to_import);

for block in blocks {
let header = &block.header;
Expand Down Expand Up @@ -393,23 +396,19 @@ impl Client {
let imported = imported_blocks.len();
let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();

{
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
if !imported_blocks.is_empty() {
self.block_queue.mark_as_good(&imported_blocks);
}
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
let is_empty = self.block_queue.mark_as_good(&imported_blocks);
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, imported, duration_ns)
(imported_blocks, import_results, invalid_blocks, imported, duration_ns, is_empty)
};

{
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);

if self.queue_info().is_empty() {
if is_empty {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}

Expand Down
1 change: 1 addition & 0 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ impl Miner {
trace!(target: "miner", "done recalibration.");
}

let _timer = PerfTimer::new("prepare_block");
let (transactions, mut open_block, original_work_hash) = {
let transactions = {self.transaction_queue.lock().top_transactions()};
let mut sealing_work = self.sealing_work.lock();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ fn can_handle_long_fork() {
push_blocks_to_client(client, 49, 1201, 800);
push_blocks_to_client(client, 53, 1201, 600);

for _ in 0..40 {
for _ in 0..400 {
client.import_verified_blocks();
}
assert_eq!(2000, client.chain_info().best_block_number);
Expand Down
Loading

0 comments on commit fc0c186

Please sign in to comment.