Skip to content

Commit

Permalink
wait at boundary
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <[email protected]>
  • Loading branch information
BusyJay committed Feb 28, 2021
1 parent f28ea86 commit f0e414d
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,9 +564,23 @@ impl<T> Channel<T> {
///
/// This method should only be called when all receivers are dropped.
fn discard_all_messages(&self) {
let mut head = self.head.index.load(Ordering::Relaxed);
let tail = self.tail.index.load(Ordering::Relaxed);
let mut block = self.head.block.load(Ordering::Relaxed);
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
loop {
let offset = (tail >> SHIFT) % LAP;
if offset != BLOCK_CAP {
break;
}

// New updates to tail will be rejected by MARK_BIT and aborted unless it's
// at boundary. We need to wait for the updates take affect otherwise there
// can be memory leaks.
backoff.snooze();
tail = self.tail.index.load(Ordering::Acquire);
}

let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);

unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
Expand All @@ -582,7 +596,7 @@ impl<T> Channel<T> {
} else {
(*block).wait_next();
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Relaxed);
let next = (*block).next.load(Ordering::Acquire);
drop(Box::from_raw(block));
block = next;
}
Expand Down

0 comments on commit f0e414d

Please sign in to comment.