-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Use std::sync::Condvar #1732
Use std::sync::Condvar #1732
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,7 @@ use io::service::{HandlerId, IoChannel, IoContext}; | |
use io::{IoHandler}; | ||
use panics::*; | ||
|
||
use parking_lot::{Condvar, Mutex}; | ||
use std::sync::{Condvar as SCondvar, Mutex as SMutex}; | ||
|
||
pub enum WorkType<Message> { | ||
Readable, | ||
|
@@ -44,18 +44,18 @@ pub struct Work<Message> { | |
/// Sorts them ready for blockchain insertion. | ||
pub struct Worker { | ||
thread: Option<JoinHandle<()>>, | ||
wait: Arc<Condvar>, | ||
wait: Arc<SCondvar>, | ||
deleting: Arc<AtomicBool>, | ||
wait_mutex: Arc<Mutex<()>>, | ||
wait_mutex: Arc<SMutex<()>>, | ||
} | ||
|
||
impl Worker { | ||
/// Creates a new worker instance. | ||
pub fn new<Message>(index: usize, | ||
stealer: chase_lev::Stealer<Work<Message>>, | ||
channel: IoChannel<Message>, | ||
wait: Arc<Condvar>, | ||
wait_mutex: Arc<Mutex<()>>, | ||
wait: Arc<SCondvar>, | ||
wait_mutex: Arc<SMutex<()>>, | ||
panic_handler: Arc<PanicHandler> | ||
) -> Worker | ||
where Message: Send + Sync + Clone + 'static { | ||
|
@@ -77,17 +77,17 @@ impl Worker { | |
} | ||
|
||
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>, | ||
channel: IoChannel<Message>, wait: Arc<Condvar>, | ||
wait_mutex: Arc<Mutex<()>>, | ||
channel: IoChannel<Message>, wait: Arc<SCondvar>, | ||
wait_mutex: Arc<SMutex<()>>, | ||
deleting: Arc<AtomicBool>) | ||
where Message: Send + Sync + Clone + 'static { | ||
loop { | ||
{ | ||
let mut lock = wait_mutex.lock(); | ||
let lock = wait_mutex.lock().unwrap(); | ||
if deleting.load(AtomicOrdering::Acquire) { | ||
return; | ||
} | ||
wait.wait(&mut lock); | ||
let _ = wait.wait(lock); | ||
} | ||
|
||
if deleting.load(AtomicOrdering::Acquire) { | ||
|
@@ -123,7 +123,7 @@ impl Worker { | |
impl Drop for Worker { | ||
fn drop(&mut self) { | ||
trace!(target: "shutdown", "[IoWorker] Closing..."); | ||
let _ = self.wait_mutex.lock(); | ||
let _ = self.wait_mutex.lock().unwrap(); | ||
self.deleting.store(true, AtomicOrdering::Release); | ||
self.wait.notify_all(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if the issue is that this call happens before the atomic store due to compiler or CPU isntruction reordering. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It has nothing to do with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to be related to mutex unlocking on wait. It is hard to tell cause the logic inside is quite complex. I'll see if I can create an example and file an issue. |
||
let thread = mem::replace(&mut self.thread, None).unwrap(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are also some logical changes here (and in
flush
). I'm just making sure it's intended, is it?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've used a different mutex for a
Condvar
so that verification structures would continue using parking_lot mutex.