Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

[beta] Fix queue deadlock #4095

Merged
merged 1 commit into from
Jan 9, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>,
verifiers: Vec<JoinHandle<()>>,
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>,
processing: RwLock<HashMap<H256, U256>>, // hash to difficulty
max_queue_size: usize,
max_mem_use: usize,
total_difficulty: RwLock<U256>,
verifier_handles: Vec<JoinHandle<()>>,
}

struct QueueSignal {
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc();

let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let mut verifier_handles: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2;
for i in 0..thread_count {
let verification = verification.clone();
Expand All @@ -196,7 +196,7 @@ impl<K: Kind> VerificationQueue<K> {
let empty = empty.clone();
let deleting = deleting.clone();
let panic_handler = panic_handler.clone();
verifiers.push(
verifier_handles.push(
thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
Expand All @@ -213,13 +213,13 @@ impl<K: Kind> VerificationQueue<K> {
ready_signal: ready_signal.clone(),
more_to_verify: more_to_verify.clone(),
verification: verification.clone(),
verifiers: verifiers,
deleting: deleting.clone(),
processing: RwLock::new(HashMap::new()),
empty: empty.clone(),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
total_difficulty: RwLock::new(0.into()),
verifier_handles: verifier_handles,
}
}

Expand Down Expand Up @@ -531,8 +531,15 @@ impl<K: Kind> Drop for VerificationQueue<K> {
trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear();
self.deleting.store(true, AtomicOrdering::Release);
self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) {

// acquire this lock to force threads to reach the waiting point
// if they're in-between the exit check and the more_to_verify wait.
{
let _more = self.verification.more_to_verify.lock().unwrap();
self.more_to_verify.notify_all();
}

for t in self.verifier_handles.drain(..) {
t.join().unwrap();
}
trace!(target: "shutdown", "[VerificationQueue] Closed.");
Expand Down