diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 02535fa119d..4022a7251b4 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -18,6 +18,7 @@ //! Sorts them ready for blockchain insertion. use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use util::*; use verification::*; use error::*; @@ -80,12 +81,12 @@ impl BlockQueueInfo { pub struct BlockQueue { panic_handler: Arc, engine: Arc>, - more_to_verify: Arc, + more_to_verify: Arc, verification: Arc, verifiers: Vec>, deleting: Arc, ready_signal: Arc, - empty: Arc, + empty: Arc, processing: RwLock>, max_queue_size: usize, max_mem_use: usize, @@ -133,6 +134,8 @@ struct Verification { verified: Mutex>, verifying: Mutex>, bad: Mutex>, + more_to_verify: SMutex<()>, + empty: SMutex<()>, } impl BlockQueue { @@ -143,15 +146,18 @@ impl BlockQueue { verified: Mutex::new(VecDeque::new()), verifying: Mutex::new(VecDeque::new()), bad: Mutex::new(HashSet::new()), + more_to_verify: SMutex::new(()), + empty: SMutex::new(()), + }); - let more_to_verify = Arc::new(Condvar::new()); + let more_to_verify = Arc::new(SCondvar::new()); let deleting = Arc::new(AtomicBool::new(false)); let ready_signal = Arc::new(QueueSignal { deleting: deleting.clone(), signalled: AtomicBool::new(false), message_channel: message_channel }); - let empty = Arc::new(Condvar::new()); + let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); let mut verifiers: Vec> = Vec::new(); @@ -190,17 +196,17 @@ impl BlockQueue { } } - fn verify(verification: Arc, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { + fn verify(verification: Arc, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { while !deleting.load(AtomicOrdering::Acquire) { { - let mut unverified = verification.unverified.lock(); + let mut more_to_verify = verification.more_to_verify.lock().unwrap(); - if unverified.is_empty() && verification.verifying.lock().is_empty() { + if verification.unverified.lock().is_empty() && verification.verifying.lock().is_empty() { empty.notify_all(); } - while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) { - wait.wait(&mut unverified); + while verification.unverified.lock().is_empty() && !deleting.load(AtomicOrdering::Acquire) { + more_to_verify = wait.wait(more_to_verify).unwrap(); } if deleting.load(AtomicOrdering::Acquire) { @@ -276,9 +282,9 @@ impl BlockQueue { /// Wait for unverified queue to be empty pub fn flush(&self) { - let mut unverified = self.verification.unverified.lock(); - while !unverified.is_empty() || !self.verification.verifying.lock().is_empty() { - self.empty.wait(&mut unverified); + let mut lock = self.verification.empty.lock().unwrap(); + while !self.verification.unverified.lock().is_empty() || !self.verification.verifying.lock().is_empty() { + lock = self.empty.wait(lock).unwrap(); } } diff --git a/util/src/io/service.rs b/util/src/io/service.rs index bfd63b04ca4..6e7dad4bb30 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -25,7 +25,8 @@ use io::{IoError, IoHandler}; use io::worker::{Worker, Work, WorkType}; use panics::*; -use parking_lot::{Condvar, RwLock, Mutex}; +use parking_lot::{RwLock}; +use std::sync::{Condvar as SCondvar, Mutex as SMutex}; /// Timer ID pub type TimerToken = usize; @@ -169,7 +170,7 @@ pub struct IoManager where Message: Send + Sync { handlers: Slab>, HandlerId>, workers: Vec, worker_channel: chase_lev::Worker>, - work_ready: Arc, + work_ready: Arc, } impl IoManager where Message: Send + Sync + Clone + 'static { @@ -177,8 +178,8 @@ impl IoManager where Message: Send + Sync + Clone + 'static { pub fn start(panic_handler: Arc, event_loop: &mut EventLoop>) -> Result<(), UtilError> { let (worker, stealer) = chase_lev::deque(); let num_workers = 4; - let work_ready_mutex = Arc::new(Mutex::new(())); - let work_ready = Arc::new(Condvar::new()); + let work_ready_mutex = Arc::new(SMutex::new(())); + let work_ready = Arc::new(SCondvar::new()); let workers = (0..num_workers).map(|i| Worker::new( i, diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index a81cdca08c3..c96bdf007fc 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -23,7 +23,7 @@ use io::service::{HandlerId, IoChannel, IoContext}; use io::{IoHandler}; use panics::*; -use parking_lot::{Condvar, Mutex}; +use std::sync::{Condvar as SCondvar, Mutex as SMutex}; pub enum WorkType { Readable, @@ -44,9 +44,9 @@ pub struct Work { /// Sorts them ready for blockchain insertion. pub struct Worker { thread: Option>, - wait: Arc, + wait: Arc, deleting: Arc, - wait_mutex: Arc>, + wait_mutex: Arc>, } impl Worker { @@ -54,8 +54,8 @@ impl Worker { pub fn new(index: usize, stealer: chase_lev::Stealer>, channel: IoChannel, - wait: Arc, - wait_mutex: Arc>, + wait: Arc, + wait_mutex: Arc>, panic_handler: Arc ) -> Worker where Message: Send + Sync + Clone + 'static { @@ -77,17 +77,17 @@ impl Worker { } fn work_loop(stealer: chase_lev::Stealer>, - channel: IoChannel, wait: Arc, - wait_mutex: Arc>, + channel: IoChannel, wait: Arc, + wait_mutex: Arc>, deleting: Arc) where Message: Send + Sync + Clone + 'static { loop { { - let mut lock = wait_mutex.lock(); + let lock = wait_mutex.lock().unwrap(); if deleting.load(AtomicOrdering::Acquire) { return; } - wait.wait(&mut lock); + let _ = wait.wait(lock); } if deleting.load(AtomicOrdering::Acquire) { @@ -123,7 +123,7 @@ impl Worker { impl Drop for Worker { fn drop(&mut self) { trace!(target: "shutdown", "[IoWorker] Closing..."); - let _ = self.wait_mutex.lock(); + let _ = self.wait_mutex.lock().unwrap(); self.deleting.store(true, AtomicOrdering::Release); self.wait.notify_all(); let thread = mem::replace(&mut self.thread, None).unwrap();