Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Use std::sync::Condvar #1732

Merged
merged 1 commit into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions ethcore/src/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Sorts them ready for blockchain insertion.
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use util::*;
use verification::*;
use error::*;
Expand Down Expand Up @@ -80,12 +81,12 @@ impl BlockQueueInfo {
pub struct BlockQueue {
panic_handler: Arc<PanicHandler>,
engine: Arc<Box<Engine>>,
more_to_verify: Arc<Condvar>,
more_to_verify: Arc<SCondvar>,
verification: Arc<Verification>,
verifiers: Vec<JoinHandle<()>>,
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
empty: Arc<Condvar>,
empty: Arc<SCondvar>,
processing: RwLock<HashSet<H256>>,
max_queue_size: usize,
max_mem_use: usize,
Expand Down Expand Up @@ -133,6 +134,8 @@ struct Verification {
verified: Mutex<VecDeque<PreverifiedBlock>>,
verifying: Mutex<VecDeque<VerifyingBlock>>,
bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>,
empty: SMutex<()>,
}

impl BlockQueue {
Expand All @@ -143,15 +146,18 @@ impl BlockQueue {
verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()),
empty: SMutex::new(()),

});
let more_to_verify = Arc::new(Condvar::new());
let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false));
let ready_signal = Arc::new(QueueSignal {
deleting: deleting.clone(),
signalled: AtomicBool::new(false),
message_channel: message_channel
});
let empty = Arc::new(Condvar::new());
let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc();

let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
Expand Down Expand Up @@ -190,17 +196,17 @@ impl BlockQueue {
}
}

fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>) {
while !deleting.load(AtomicOrdering::Acquire) {
{
let mut unverified = verification.unverified.lock();
let mut more_to_verify = verification.more_to_verify.lock().unwrap();
Copy link
Collaborator

@tomusdrw tomusdrw Jul 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also some logical changes here (and in flush). I'm just making sure it's intended, is it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used a different mutex for a Condvar so that verification structures would continue using parking_lot mutex.


if unverified.is_empty() && verification.verifying.lock().is_empty() {
if verification.unverified.lock().is_empty() && verification.verifying.lock().is_empty() {
empty.notify_all();
}

while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) {
wait.wait(&mut unverified);
while verification.unverified.lock().is_empty() && !deleting.load(AtomicOrdering::Acquire) {
more_to_verify = wait.wait(more_to_verify).unwrap();
}

if deleting.load(AtomicOrdering::Acquire) {
Expand Down Expand Up @@ -276,9 +282,9 @@ impl BlockQueue {

/// Wait for unverified queue to be empty
pub fn flush(&self) {
let mut unverified = self.verification.unverified.lock();
while !unverified.is_empty() || !self.verification.verifying.lock().is_empty() {
self.empty.wait(&mut unverified);
let mut lock = self.verification.empty.lock().unwrap();
while !self.verification.unverified.lock().is_empty() || !self.verification.verifying.lock().is_empty() {
lock = self.empty.wait(lock).unwrap();
}
}

Expand Down
9 changes: 5 additions & 4 deletions util/src/io/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use io::{IoError, IoHandler};
use io::worker::{Worker, Work, WorkType};
use panics::*;

use parking_lot::{Condvar, RwLock, Mutex};
use parking_lot::{RwLock};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};

/// Timer ID
pub type TimerToken = usize;
Expand Down Expand Up @@ -169,16 +170,16 @@ pub struct IoManager<Message> where Message: Send + Sync {
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>,
workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
work_ready: Arc<SCondvar>,
}

impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
/// Creates a new instance and registers it with the event loop.
pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
let (worker, stealer) = chase_lev::deque();
let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(()));
let work_ready = Arc::new(Condvar::new());
let work_ready_mutex = Arc::new(SMutex::new(()));
let work_ready = Arc::new(SCondvar::new());
let workers = (0..num_workers).map(|i|
Worker::new(
i,
Expand Down
20 changes: 10 additions & 10 deletions util/src/io/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use io::service::{HandlerId, IoChannel, IoContext};
use io::{IoHandler};
use panics::*;

use parking_lot::{Condvar, Mutex};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};

pub enum WorkType<Message> {
Readable,
Expand All @@ -44,18 +44,18 @@ pub struct Work<Message> {
/// Sorts them ready for blockchain insertion.
pub struct Worker {
thread: Option<JoinHandle<()>>,
wait: Arc<Condvar>,
wait: Arc<SCondvar>,
deleting: Arc<AtomicBool>,
wait_mutex: Arc<Mutex<()>>,
wait_mutex: Arc<SMutex<()>>,
}

impl Worker {
/// Creates a new worker instance.
pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
wait: Arc<SCondvar>,
wait_mutex: Arc<SMutex<()>>,
panic_handler: Arc<PanicHandler>
) -> Worker
where Message: Send + Sync + Clone + 'static {
Expand All @@ -77,17 +77,17 @@ impl Worker {
}

fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
channel: IoChannel<Message>, wait: Arc<SCondvar>,
wait_mutex: Arc<SMutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + Clone + 'static {
loop {
{
let mut lock = wait_mutex.lock();
let lock = wait_mutex.lock().unwrap();
if deleting.load(AtomicOrdering::Acquire) {
return;
}
wait.wait(&mut lock);
let _ = wait.wait(lock);
}

if deleting.load(AtomicOrdering::Acquire) {
Expand Down Expand Up @@ -123,7 +123,7 @@ impl Worker {
impl Drop for Worker {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoWorker] Closing...");
let _ = self.wait_mutex.lock();
let _ = self.wait_mutex.lock().unwrap();
self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the issue is that this call happens before the atomic store due to compiler or CPU isntruction reordering.
System condvars can have spurious wakeups, but parking_lot ones can't. I don't have a windows machine right now, but could you test using parking_lot and adding a call to atomic::fence(AtomicOrdering::SeqCst) between the store to deleting and the condvar notification?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has nothing to do with deleting as it does not change when the deadlock happens

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If deleting hasn't changed, then the deadlock is in the mutex, isn't it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be related to mutex unlocking on wait. It is hard to tell cause the logic inside is quite complex. I'll see if I can create an example and file an issue.

let thread = mem::replace(&mut self.thread, None).unwrap();
Expand Down