diff --git a/src/lib.rs b/src/lib.rs index 228ffe8..46531b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ #![doc = include_str!("../README.md")] #![warn(missing_docs)] use core::{cmp, fmt, mem::MaybeUninit, ops, ptr}; + #[macro_use] mod macros; @@ -36,7 +37,7 @@ feature! { use crate::{ loom::{ - atomic::{AtomicUsize, Ordering::*}, + atomic::{AtomicBool, AtomicUsize, Ordering::*}, cell::{MutPtr, UnsafeCell}, }, mpsc::errors::{TryRecvError, TrySendError}, @@ -62,6 +63,7 @@ pub struct Ref<'slot, T> { ptr: MutPtr>, slot: &'slot Slot, new_state: usize, + is_pop: bool, } /// Error indicating that a `push` operation failed because a queue was at @@ -101,6 +103,7 @@ struct Core { struct Slot { value: UnsafeCell>, state: AtomicUsize, + has_reader: AtomicBool, } impl Core { @@ -184,8 +187,7 @@ impl Core { { test_println!("push_ref"); let mut backoff = Backoff::new(); - let mut tail = self.tail.load(Relaxed); - + let mut tail = test_dbg!(self.tail.load(Relaxed)); loop { if test_dbg!(tail & self.closed != 0) { return Err(TrySendError::Closed(())); @@ -210,54 +212,63 @@ impl Core { ); slots.get_unchecked(idx) }; - let state = test_dbg!(slot.state.load(Acquire)); - + let state = test_dbg!(slot.state.load(SeqCst)); + // slot is writable if test_dbg!(state == tail) { - // Move the tail index forward by 1. let next_tail = self.next(idx, gen); + // try to advance the tail match test_dbg!(self .tail .compare_exchange_weak(tail, next_tail, SeqCst, Acquire)) { Ok(_) => { - // We got the slot! It's now okay to write to it - test_println!("claimed tail slot [{}]", idx); - // Claim exclusive ownership over the slot - let ptr = slot.value.get_mut(); - - // Initialize or recycle the element. - unsafe { - // Safety: we have just claimed exclusive ownership over - // this slot. - let ptr = ptr.deref(); - if gen == 0 { - ptr.write(recycle.new_element()); - test_println!("-> initialized"); - } else { - // Safety: if the generation is > 0, then the - // slot has already been initialized. - recycle.recycle(ptr.assume_init_mut()); - test_println!("-> recycled"); + test_println!("advanced tail {} to {}", tail, next_tail); + test_println!("claimed slot [{}]", idx); + let has_reader = test_dbg!(slot.has_reader.load(SeqCst)); + if test_dbg!(!has_reader) { + // We got the slot! It's now okay to write to it + // Claim exclusive ownership over the slot + let ptr = slot.value.get_mut(); + // Initialize or recycle the element. + unsafe { + // Safety: we have just claimed exclusive ownership over + // this slot. + let ptr = ptr.deref(); + if gen == 0 { + ptr.write(recycle.new_element()); + test_println!("-> initialized"); + } else { + // Safety: if the generation is > 0, then the + // slot has already been initialized. + recycle.recycle(ptr.assume_init_mut()); + test_println!("-> recycled"); + } } + return Ok(Ref { + ptr, + new_state: tail + 1, + slot, + is_pop: false, + }); + } else { + test_println!("has an active reader, skipping slot [{}]", idx); + let next_state = tail.wrapping_add(self.gen); + test_dbg!(slot.state.store(test_dbg!(next_state), Release)); + backoff.spin(); + continue; } - - return Ok(Ref { - ptr, - new_state: tail + 1, - slot, - }); } Err(actual) => { // Someone else took this slot and advanced the tail // index. Try to claim the new tail. + test_println!("failed to advance tail {} to {}", tail, next_tail); tail = actual; backoff.spin(); continue; } } - } - - if test_dbg!(state.wrapping_add(self.gen) == tail + 1) { + } else { + // check if we have any available slots // fake RMW op to placate loom. this should be equivalent to // doing a relaxed load after a SeqCst fence (per Godbolt // https://godbolt.org/z/zb15qfEa9), however, loom understands @@ -270,12 +281,9 @@ impl Core { test_println!("channel full"); return Err(TrySendError::Full(())); } - - backoff.spin(); - } else { - backoff.spin_yield(); } + backoff.spin_yield(); tail = test_dbg!(self.tail.load(Acquire)); } } @@ -308,33 +316,39 @@ impl Core { ); slots.get_unchecked(idx) }; + let state = test_dbg!(slot.state.load(Acquire)); + let next_head = self.next(idx, gen); - // If the slot's state is ahead of the head index by one, we can pop - // it. + // If the slot's state is ahead of the head index by one, we can pop it. if test_dbg!(state == head + 1) { - let next_head = self.next(idx, gen); + // try to advance the head index match test_dbg!(self .head .compare_exchange_weak(head, next_head, SeqCst, Acquire)) { Ok(_) => { - test_println!("claimed head slot [{}]", idx); + test_println!("advanced head {} to {}", head, next_head); + test_println!("claimed slot [{}]", idx); + let new_state = head.wrapping_add(self.gen); + test_dbg!(slot.has_reader.store(true, SeqCst)); + test_dbg!(slot.state.store(test_dbg!(new_state), SeqCst)); return Ok(Ref { - new_state: head.wrapping_add(self.gen), + new_state, ptr: slot.value.get_mut(), slot, + is_pop: true, }); } Err(actual) => { + test_println!("failed to advance head, head={}, actual={}", head, actual); head = actual; backoff.spin(); continue; } } - } - - if test_dbg!(state == head) { + } else { + // Maybe we reached the tail index? If so, the buffer is empty. // fake RMW op to placate loom. this should be equivalent to // doing a relaxed load after a SeqCst fence (per Godbolt // https://godbolt.org/z/zb15qfEa9), however, loom understands @@ -342,9 +356,7 @@ impl Core { // SeqCst fence and a load. // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a // load it gets reordered differently in the model checker lmao... - let tail = test_dbg!(self.tail.fetch_or(0, SeqCst)); - if test_dbg!(tail & !self.closed == head) { return if test_dbg!(tail & self.closed != 0) { Err(TryRecvError::Closed) @@ -354,16 +366,38 @@ impl Core { }; } - if test_dbg!(backoff.done_spinning()) { - return Err(TryRecvError::Empty); + // Is anyone writing to the slot from this generation? + if test_dbg!(state == head) { + if test_dbg!(backoff.done_spinning()) { + return Err(TryRecvError::Empty); + } + backoff.spin(); + continue; } - backoff.spin(); - } else { - backoff.spin_yield(); + // The slot is in an invalid state (was skipped). Try to advance the head index. + match test_dbg!(self + .head + .compare_exchange_weak(head, next_head, SeqCst, Acquire)) + { + Ok(_) => { + test_println!("skipped head slot [{}], new head={}", idx, next_head); + backoff.spin(); + continue; + } + Err(actual) => { + test_println!( + "failed to skip head slot [{}], head={}, actual={}", + idx, + head, + actual + ); + head = actual; + backoff.spin(); + continue; + } + } } - - head = test_dbg!(self.head.load(Acquire)); } } @@ -475,8 +509,17 @@ impl ops::DerefMut for Ref<'_, T> { impl Drop for Ref<'_, T> { #[inline] fn drop(&mut self) { - test_println!("drop Ref<{}>", core::any::type_name::()); - test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release)); + if self.is_pop { + test_println!("drop Ref<{}> (pop)", core::any::type_name::()); + test_dbg!(self.slot.has_reader.store(test_dbg!(false), SeqCst)); + } else { + test_println!( + "drop Ref<{}> (push), new_state = {}", + core::any::type_name::(), + self.new_state + ); + test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release)); + } } } @@ -542,6 +585,7 @@ impl Slot { Self { value: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(idx), + has_reader: AtomicBool::new(false), } } @@ -550,6 +594,7 @@ impl Slot { Self { value: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(idx), + has_reader: AtomicBool::new(false), } } } diff --git a/src/mpsc/tests/mpsc_async.rs b/src/mpsc/tests/mpsc_async.rs index 2648146..f853f84 100644 --- a/src/mpsc/tests/mpsc_async.rs +++ b/src/mpsc/tests/mpsc_async.rs @@ -75,6 +75,62 @@ fn mpsc_try_recv_ref() { }) } +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_test_skip_slot() { + loom::model(|| { + let (tx, rx) = channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + future::block_on(async move { + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + }) + }) + }; + + let p2 = { + thread::spawn(move || { + future::block_on(async move { + tx.send(3).await.unwrap(); + tx.send(4).await.unwrap(); + }) + }) + }; + + let mut vals = Vec::new(); + let mut hold: Vec> = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => { + if vals.len() == 2 && !hold.is_empty() { + vals.push(*hold.pop().unwrap()); + vals.push(*val); + } else if vals.len() == 1 && hold.is_empty() { + hold.push(val); + } else { + vals.push(*val); + } + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => { + panic!("channel closed"); + }, + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + #[test] fn rx_closes() { const ITERATIONS: usize = 6; @@ -278,6 +334,43 @@ fn spsc_send_recv_in_order_wrap() { }) } +#[test] +fn spsc_send_recv_in_order_skip_wrap() { + const N_SENDS: usize = 5; + loom::model(|| { + let (tx, rx) = channel::((N_SENDS + 1) / 2); + let consumer = thread::spawn(move || { + future::block_on(async move { + let mut hold = Vec::new(); + assert_eq_dbg!(rx.recv().await, Some(1)); + loop { + match rx.try_recv_ref() { + Ok(val) => { + assert_eq_dbg!(*val, 2); + hold.push(val); + break; + }, + Err(TryRecvError::Empty) => { + loom::thread::yield_now(); + }, + Err(TryRecvError::Closed) => panic!("channel closed"), + } + } + for i in 3..=N_SENDS { + assert_eq_dbg!(rx.recv().await, Some(i)); + } + assert_eq_dbg!(rx.recv().await, None); + }); + }); + future::block_on(async move { + for i in 1..=N_SENDS { + tx.send(i).await.unwrap(); + } + }); + consumer.join().unwrap(); + }); +} + #[test] #[cfg_attr(ci_skip_slow_models, ignore)] fn mpsc_send_recv_wrap() { diff --git a/src/mpsc/tests/mpsc_blocking.rs b/src/mpsc/tests/mpsc_blocking.rs index 89b6f0d..a230cab 100644 --- a/src/mpsc/tests/mpsc_blocking.rs +++ b/src/mpsc/tests/mpsc_blocking.rs @@ -1,5 +1,7 @@ use super::*; use crate::loom::{self, alloc::Track, thread}; +use crate::mpsc::blocking; +use crate::mpsc::blocking::RecvRef; #[test] #[cfg_attr(ci_skip_slow_models, ignore)] @@ -71,6 +73,60 @@ fn mpsc_try_recv_ref() { }) } +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_test_skip_slot() { + loom::model(|| { + let (tx, rx) = blocking::channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + *tx.send_ref().unwrap() = 1; + thread::yield_now(); + *tx.send_ref().unwrap() = 2; + }) + }; + + let p2 = { + thread::spawn(move || { + *tx.send_ref().unwrap() = 3; + thread::yield_now(); + *tx.send_ref().unwrap() = 4; + }) + }; + + let mut vals = Vec::new(); + let mut hold: Vec> = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => { + if vals.len() == 2 && !hold.is_empty() { + vals.push(*hold.pop().unwrap()); + vals.push(*val); + } else if vals.len() == 1 && hold.is_empty() { + hold.push(val); + } else { + vals.push(*val); + } + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => { + panic!("channel closed"); + }, + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + #[test] fn rx_closes() { const ITERATIONS: usize = 6; @@ -326,6 +382,41 @@ fn spsc_send_recv_in_order_wrap() { }) } +#[test] +fn spsc_send_recv_in_order_skip_wrap() { + const N_SENDS: usize = 5; + loom::model(|| { + let (tx, rx) = blocking::channel::((N_SENDS + 1) / 2); + let consumer = thread::spawn(move || { + let mut hold = Vec::new(); + assert_eq_dbg!(rx.recv(), Some(1)); + loop { + match rx.try_recv_ref() { + Ok(val) => { + assert_eq_dbg!(*val, 2); + hold.push(val); + break; + }, + Err(TryRecvError::Empty) => { + loom::thread::yield_now(); + }, + Err(TryRecvError::Closed) => panic!("channel closed"), + } + } + for i in 3..=N_SENDS { + assert_eq_dbg!(rx.recv(), Some(i)); + } + assert_eq_dbg!(rx.recv(), None); + + }); + for i in 1..=N_SENDS { + tx.send(i).unwrap(); + } + drop(tx); + consumer.join().unwrap(); + }); +} + #[test] fn tx_close_wakes() { loom::model(|| { diff --git a/tests/mpsc_blocking.rs b/tests/mpsc_blocking.rs index 4d2fb96..d4a4e70 100644 --- a/tests/mpsc_blocking.rs +++ b/tests/mpsc_blocking.rs @@ -1,5 +1,6 @@ use std::thread; use thingbuf::mpsc::blocking; +use thingbuf::mpsc::errors::{TryRecvError, TrySendError}; #[test] fn basically_works() { @@ -70,3 +71,54 @@ fn tx_close_drains_queue() { producer.join().unwrap(); } } + +#[test] +fn spsc_skip_slot() { + let (tx, rx) = blocking::channel::(3); + // 0 lap + tx.send(0).unwrap(); + assert_eq!(rx.recv(), Some(0)); + tx.send(1).unwrap(); + let msg_ref = rx.try_recv_ref().unwrap(); + tx.send(2).unwrap(); + assert_eq!(rx.recv(), Some(2)); + // 1 lap + tx.send(3).unwrap(); + assert_eq!(rx.recv(), Some(3)); + tx.send(4).unwrap(); + assert_eq!(rx.recv(), Some(4)); + drop(msg_ref); + // 2 lap + tx.send(5).unwrap(); + tx.send(6).unwrap(); + tx.send(7).unwrap(); + assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_)))); + assert_eq!(rx.recv(), Some(5)); + assert_eq!(rx.recv(), Some(6)); + assert_eq!(rx.recv(), Some(7)); +} + +#[test] +fn spsc_full_after_skipped() { + let (tx, rx) = blocking::channel::(3); + // 0 lap + tx.send(0).unwrap(); + assert_eq!(rx.recv(), Some(0)); + tx.send(1).unwrap(); + let _msg_ref = rx.try_recv_ref().unwrap(); + tx.send(2).unwrap(); + // lap 1 + tx.send(3).unwrap(); + assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_)))); +} + +#[test] +fn spsc_empty_after_skipped() { + let (tx, rx) = blocking::channel::(2); + // 0 lap + tx.send(0).unwrap(); + tx.send(1).unwrap(); + let _msg_ref = rx.try_recv_ref().unwrap(); + assert_eq!(rx.recv(), Some(1)); + assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty))); +}