From f0e414d1c0b7f6a7b2058e708dd149f8c45f8db5 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Sun, 28 Feb 2021 22:25:07 +0800 Subject: [PATCH] wait at boundary Signed-off-by: Jay Lee --- crossbeam-channel/src/flavors/list.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 4e3e90ba9..5056aa431 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -564,9 +564,23 @@ impl Channel { /// /// This method should only be called when all receivers are dropped. fn discard_all_messages(&self) { - let mut head = self.head.index.load(Ordering::Relaxed); - let tail = self.tail.index.load(Ordering::Relaxed); - let mut block = self.head.block.load(Ordering::Relaxed); + let backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + loop { + let offset = (tail >> SHIFT) % LAP; + if offset != BLOCK_CAP { + break; + } + + // New updates to tail will be rejected by MARK_BIT and aborted unless it's + // at boundary. We need to wait for the updates take affect otherwise there + // can be memory leaks. + backoff.snooze(); + tail = self.tail.index.load(Ordering::Acquire); + } + + let mut head = self.head.index.load(Ordering::Acquire); + let mut block = self.head.block.load(Ordering::Acquire); unsafe { // Drop all messages between head and tail and deallocate the heap-allocated blocks. @@ -582,7 +596,7 @@ impl Channel { } else { (*block).wait_next(); // Deallocate the block and move to the next one. - let next = (*block).next.load(Ordering::Relaxed); + let next = (*block).next.load(Ordering::Acquire); drop(Box::from_raw(block)); block = next; }