diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 5ae4f7c8fcc..f5afcfc8c07 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -18,8 +18,8 @@ //! Sorts them ready for blockchain insertion. use std::thread::{self, JoinHandle}; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; -use std::sync::{Condvar as SCondvar, Mutex as SMutex, Arc}; use std::cmp; use std::collections::{VecDeque, HashSet, HashMap}; use heapsize::HeapSizeOf; @@ -141,11 +141,11 @@ struct Sizes { /// Keeps them in the same order as inserted, minus invalid items. pub struct VerificationQueue { engine: Arc, - more_to_verify: Arc, + more_to_verify: Arc, verification: Arc>, deleting: Arc, ready_signal: Arc, - empty: Arc, + empty: Arc, processing: RwLock>, // hash to difficulty ticks_since_adjustment: AtomicUsize, max_queue_size: usize, @@ -202,8 +202,6 @@ struct Verification { verifying: Mutex>>, verified: Mutex>, bad: Mutex>, - more_to_verify: SMutex<()>, - empty: SMutex<()>, sizes: Sizes, check_seal: bool, } @@ -216,8 +214,6 @@ impl VerificationQueue { verifying: Mutex::new(VecDeque::new()), verified: Mutex::new(VecDeque::new()), bad: Mutex::new(HashSet::new()), - more_to_verify: SMutex::new(()), - empty: SMutex::new(()), sizes: Sizes { unverified: AtomicUsize::new(0), verifying: AtomicUsize::new(0), @@ -225,14 +221,14 @@ impl VerificationQueue { }, check_seal: check_seal, }); - let more_to_verify = Arc::new(SCondvar::new()); + let more_to_verify = Arc::new(Condvar::new()); let deleting = Arc::new(AtomicBool::new(false)); let ready_signal = Arc::new(QueueSignal { deleting: deleting.clone(), signalled: AtomicBool::new(false), message_channel: Mutex::new(message_channel), }); - let empty = Arc::new(SCondvar::new()); + let empty = Arc::new(Condvar::new()); let scale_verifiers = config.verifier_settings.scale_verifiers; let num_cpus = ::num_cpus::get(); @@ -292,9 +288,9 @@ impl VerificationQueue { fn verify( verification: Arc>, engine: Arc, - wait: Arc, + wait: Arc, ready: Arc, - empty: Arc, + empty: Arc, state: Arc<(Mutex, Condvar)>, id: usize, ) { @@ -319,19 +315,19 @@ impl VerificationQueue { // wait for work if empty. { - let mut more_to_verify = verification.more_to_verify.lock().unwrap(); + let mut unverified = verification.unverified.lock(); - if verification.unverified.lock().is_empty() && verification.verifying.lock().is_empty() { + if unverified.is_empty() && verification.verifying.lock().is_empty() { empty.notify_all(); } - while verification.unverified.lock().is_empty() { + while unverified.is_empty() { if let State::Exit = *state.0.lock() { debug!(target: "verification", "verifier {} exiting", id); return; } - more_to_verify = wait.wait(more_to_verify).unwrap(); + wait.wait(&mut unverified); } if let State::Exit = *state.0.lock() { @@ -450,9 +446,9 @@ impl VerificationQueue { /// Wait for unverified queue to be empty pub fn flush(&self) { - let mut lock = self.verification.empty.lock().unwrap(); - while !self.verification.unverified.lock().is_empty() || !self.verification.verifying.lock().is_empty() { - lock = self.empty.wait(lock).unwrap(); + let mut unverified = self.verification.unverified.lock(); + while !unverified.is_empty() || !self.verification.verifying.lock().is_empty() { + self.empty.wait(&mut unverified); } } @@ -712,7 +708,7 @@ impl Drop for VerificationQueue { // acquire this lock to force threads to reach the waiting point // if they're in-between the exit check and the more_to_verify wait. { - let _more = self.verification.more_to_verify.lock().unwrap(); + let _more = self.verification.unverified.lock(); self.more_to_verify.notify_all(); } diff --git a/util/io/src/service_mio.rs b/util/io/src/service_mio.rs index 089d54cc458..5687fe6b008 100644 --- a/util/io/src/service_mio.rs +++ b/util/io/src/service_mio.rs @@ -24,8 +24,7 @@ use crossbeam::sync::chase_lev; use slab::Slab; use {IoError, IoHandler}; use worker::{Worker, Work, WorkType}; -use parking_lot::{RwLock, Mutex}; -use std::sync::{Condvar as SCondvar, Mutex as SMutex}; +use parking_lot::{Condvar, RwLock, Mutex}; use std::time::Duration; /// Timer ID @@ -186,7 +185,7 @@ pub struct IoManager where Message: Send + Sync { handlers: Arc>>>>, workers: Vec, worker_channel: chase_lev::Worker>, - work_ready: Arc, + work_ready: Arc, } impl IoManager where Message: Send + Sync + 'static { @@ -197,8 +196,8 @@ impl IoManager where Message: Send + Sync + 'static { ) -> Result<(), IoError> { let (worker, stealer) = chase_lev::deque(); let num_workers = 4; - let work_ready_mutex = Arc::new(SMutex::new(())); - let work_ready = Arc::new(SCondvar::new()); + let work_ready_mutex = Arc::new(Mutex::new(())); + let work_ready = Arc::new(Condvar::new()); let workers = (0..num_workers).map(|i| Worker::new( i, diff --git a/util/io/src/worker.rs b/util/io/src/worker.rs index da144afea49..2520608483d 100644 --- a/util/io/src/worker.rs +++ b/util/io/src/worker.rs @@ -22,7 +22,7 @@ use service_mio::{HandlerId, IoChannel, IoContext}; use IoHandler; use LOCAL_STACK_SIZE; -use std::sync::{Condvar as SCondvar, Mutex as SMutex}; +use parking_lot::{Condvar, Mutex}; const STACK_SIZE: usize = 16*1024*1024; @@ -45,9 +45,9 @@ pub struct Work { /// Sorts them ready for blockchain insertion. pub struct Worker { thread: Option>, - wait: Arc, + wait: Arc, deleting: Arc, - wait_mutex: Arc>, + wait_mutex: Arc>, } impl Worker { @@ -55,8 +55,8 @@ impl Worker { pub fn new(index: usize, stealer: chase_lev::Stealer>, channel: IoChannel, - wait: Arc, - wait_mutex: Arc>, + wait: Arc, + wait_mutex: Arc>, ) -> Worker where Message: Send + Sync + 'static { let deleting = Arc::new(AtomicBool::new(false)); @@ -76,17 +76,17 @@ impl Worker { } fn work_loop(stealer: chase_lev::Stealer>, - channel: IoChannel, wait: Arc, - wait_mutex: Arc>, + channel: IoChannel, wait: Arc, + wait_mutex: Arc>, deleting: Arc) where Message: Send + Sync + 'static { loop { { - let lock = wait_mutex.lock().expect("Poisoned work_loop mutex"); + let mut lock = wait_mutex.lock(); if deleting.load(AtomicOrdering::Acquire) { return; } - let _ = wait.wait(lock); + wait.wait(&mut lock); } while !deleting.load(AtomicOrdering::Acquire) { @@ -122,7 +122,7 @@ impl Worker { impl Drop for Worker { fn drop(&mut self) { trace!(target: "shutdown", "[IoWorker] Closing..."); - let _ = self.wait_mutex.lock().expect("Poisoned work_loop mutex"); + let _ = self.wait_mutex.lock(); self.deleting.store(true, AtomicOrdering::Release); self.wait.notify_all(); if let Some(thread) = self.thread.take() {