Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Block import optimization #2748

Merged
merged 2 commits into from
Oct 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethcore/res/ethereum/tests
Submodule tests updated 0 files
25 changes: 12 additions & 13 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,16 +361,19 @@ impl Client {

/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 64;
let (imported_blocks, import_results, invalid_blocks, imported, duration) = {
let max_blocks_to_import = 4;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prevents flushing buffer from growing too large.

let (imported_blocks, import_results, invalid_blocks, imported, duration, is_empty) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);

let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.drain(max_blocks_to_import);
if blocks.is_empty() {
return 0;
}
let _timer = PerfTimer::new("import_verified_blocks");
let start = precise_time_ns();
let blocks = self.block_queue.drain(max_blocks_to_import);

for block in blocks {
let header = &block.header;
Expand All @@ -394,23 +397,19 @@ impl Client {
let imported = imported_blocks.len();
let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();

{
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
if !imported_blocks.is_empty() {
self.block_queue.mark_as_good(&imported_blocks);
}
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
let is_empty = self.block_queue.mark_as_good(&imported_blocks);
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, imported, duration_ns)
(imported_blocks, import_results, invalid_blocks, imported, duration_ns, is_empty)
};

{
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);

if self.queue_info().is_empty() {
if is_empty {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}

Expand Down
1 change: 1 addition & 0 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl Miner {
trace!(target: "miner", "prepare_block: done recalibration.");
}

let _timer = PerfTimer::new("prepare_block");
let (transactions, mut open_block, original_work_hash) = {
let transactions = {self.transaction_queue.lock().top_transactions()};
let mut sealing_work = self.sealing_work.lock();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ fn can_handle_long_fork() {
push_blocks_to_client(client, 49, 1201, 800);
push_blocks_to_client(client, 53, 1201, 600);

for _ in 0..40 {
for _ in 0..400 {
client.import_verified_blocks();
}
assert_eq!(2000, client.chain_info().best_block_number);
Expand Down
53 changes: 37 additions & 16 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,21 @@ struct QueueSignal {

impl QueueSignal {
#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set(&self) {
fn set_sync(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
}

if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) {
debug!("Error sending BlockVerified message: {:?}", e);
}
}
}

#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set_async(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
Expand All @@ -128,8 +142,8 @@ impl QueueSignal {
struct Verification<K: Kind> {
// All locks must be captured in the order declared here.
unverified: Mutex<VecDeque<K::Unverified>>,
verified: Mutex<VecDeque<K::Verified>>,
verifying: Mutex<VecDeque<Verifying<K>>>,
verified: Mutex<VecDeque<K::Verified>>,
bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>,
empty: SMutex<()>,
Expand All @@ -140,8 +154,8 @@ impl<K: Kind> VerificationQueue<K> {
pub fn new(config: Config, engine: Arc<Engine>, message_channel: IoChannel<ClientIoMessage>) -> Self {
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()),
empty: SMutex::new(()),
Expand Down Expand Up @@ -226,7 +240,7 @@ impl<K: Kind> VerificationQueue<K> {
};

let hash = item.hash();
match K::verify(item, &*engine) {
let is_ready = match K::verify(item, &*engine) {
Ok(verified) => {
let mut verifying = verification.verifying.lock();
let mut idx = None;
Expand All @@ -243,7 +257,9 @@ impl<K: Kind> VerificationQueue<K> {
let mut verified = verification.verified.lock();
let mut bad = verification.bad.lock();
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
true
} else {
false
}
},
Err(_) => {
Expand All @@ -256,9 +272,15 @@ impl<K: Kind> VerificationQueue<K> {

if verifying.front().map_or(false, |x| x.output.is_some()) {
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
true
} else {
false
}
}
};
if is_ready {
// Import the block immediately
ready.set_sync();
}
}
}
Expand Down Expand Up @@ -366,15 +388,17 @@ impl<K: Kind> VerificationQueue<K> {
*verified = new_verified;
}

/// Mark given item as processed
pub fn mark_as_good(&self, hashes: &[H256]) {
/// Mark given item as processed.
/// Returns true if the queue becomes empty.
pub fn mark_as_good(&self, hashes: &[H256]) -> bool {
if hashes.is_empty() {
return;
return self.processing.read().is_empty();
}
let mut processing = self.processing.write();
for hash in hashes {
processing.remove(hash);
}
processing.is_empty()
}

/// Removes up to `max` verified items from the queue
Expand All @@ -385,7 +409,7 @@ impl<K: Kind> VerificationQueue<K> {

self.ready_signal.reset();
if !verified.is_empty() {
self.ready_signal.set();
self.ready_signal.set_async();
}
result
}
Expand All @@ -411,12 +435,9 @@ impl<K: Kind> VerificationQueue<K> {
verified_queue_size: verified_len,
max_queue_size: self.max_queue_size,
max_mem_use: self.max_mem_use,
mem_used:
unverified_bytes
+ verifying_bytes
+ verified_bytes
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.processing.read().heap_size_of_children(),
mem_used: unverified_bytes
+ verifying_bytes
+ verified_bytes
}
}

Expand Down
Loading