diff --git a/src/lib.rs b/src/lib.rs index f702ef2..c6c0869 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -249,6 +249,7 @@ impl Core { Some(state & HAS_READER | next_state) }) .unwrap_or_else(|_| unreachable!())); + tail = next_tail; backoff.spin(); continue; } @@ -308,6 +309,16 @@ impl Core { test_println!("channel full"); return Err(TrySendError::Full(())); } + // if the current state is marked as data written by the current (or even future) + // generation, we can spin, as our tail is probably behind the actual tail. + // however, if the state is marked as data written by the previous generation and nobody + // is reading it, then we don't have any available slots. + let (tail_idx, tail_gen) = self.idx_gen(tail); + let (state_idx, state_gen) = self.idx_gen(state); + if test_dbg!(state_idx == tail_idx + 1) && test_dbg!(state_gen < tail_gen) { + test_println!("channel full"); + return Err(TrySendError::Full(())); + } backoff.spin_yield(); tail = test_dbg!(self.tail.load(Acquire)); @@ -405,6 +416,7 @@ impl Core { match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) { Ok(_) => { test_println!("skipped head slot [{}], new head={}", idx, next_head); + head = next_head; } Err(actual) => { test_println!( @@ -696,4 +708,72 @@ mod tests { // don't panic in drop impl. core.has_dropped_slots = true; } + + #[test] + fn full_simple() { + const CAP: usize = 3; + let mut core = Core::new(CAP); + let slots: Box<[Slot]> = Slot::::make_boxed_array(CAP); + let recycle = recycling::DefaultRecycle::new(); + + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + assert!(matches!( + core.push_ref(&slots, &recycle), + Err(TrySendError::Full(())) + )); + + // don't panic in drop impl. + core.has_dropped_slots = true; + } + + #[test] + fn full_read_and_write() { + const CAP: usize = 3; + let mut core = Core::new(CAP); + let slots: Box<[Slot]> = Slot::::make_boxed_array(CAP); + let recycle = recycling::DefaultRecycle::new(); + + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.pop_ref(&slots).unwrap(); + core.pop_ref(&slots).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + assert!(matches!( + core.push_ref(&slots, &recycle), + Err(TrySendError::Full(())) + )); + + // don't panic in drop impl. + core.has_dropped_slots = true; + } + + #[test] + fn full_with_skip() { + const CAP: usize = 3; + let mut core = Core::new(CAP); + let slots: Box<[Slot]> = Slot::::make_boxed_array(CAP); + let recycle = recycling::DefaultRecycle::new(); + + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.pop_ref(&slots).unwrap(); + let _hold = core.pop_ref(&slots).unwrap(); + core.pop_ref(&slots).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.pop_ref(&slots).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + assert!(matches!( + core.push_ref(&slots, &recycle), + Err(TrySendError::Full(())) + )); + + // don't panic in drop impl. + core.has_dropped_slots = true; + } } diff --git a/src/mpsc/tests/mpsc_async.rs b/src/mpsc/tests/mpsc_async.rs index b720540..d7e9d2a 100644 --- a/src/mpsc/tests/mpsc_async.rs +++ b/src/mpsc/tests/mpsc_async.rs @@ -38,7 +38,7 @@ fn mpsc_try_send_recv() { #[cfg_attr(ci_skip_slow_models, ignore)] fn mpsc_try_recv_ref() { loom::model(|| { - let (tx, rx) = channel(2); + let (tx, rx) = channel(3); let p1 = { let tx = tx.clone(); @@ -80,14 +80,14 @@ fn mpsc_try_recv_ref() { fn mpsc_test_skip_slot() { // This test emulates a situation where we might need to skip a slot. The setup includes two writing // threads that write elements to the channel and one reading thread that maintains a RecvRef to the - // third element until the end of the test, necessitating the skip: + // first element until the end of the test, necessitating the skip: // Given that the channel capacity is 2, here's the sequence of operations: - // Thread 1 writes: 1, 2 - // Thread 2 writes: 3, 4 - // The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1. + // Thread 1 writes: 1, 2, 3 + // Thread 2 writes: 4, 5, 6 + // The main thread reads from slots in this order: 0 (holds ref), 1, 2, 1, 2, 1. // As a result, the third slot is skipped during this process. loom::model(|| { - let (tx, rx) = channel(2); + let (tx, rx) = channel(3); let p1 = { let tx = tx.clone(); @@ -95,6 +95,7 @@ fn mpsc_test_skip_slot() { future::block_on(async move { tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); + tx.send(3).await.unwrap(); }) }) }; @@ -102,8 +103,9 @@ fn mpsc_test_skip_slot() { let p2 = { thread::spawn(move || { future::block_on(async move { - tx.send(3).await.unwrap(); tx.send(4).await.unwrap(); + tx.send(5).await.unwrap(); + tx.send(6).await.unwrap(); }) }) }; @@ -111,13 +113,13 @@ fn mpsc_test_skip_slot() { let mut vals = Vec::new(); let mut hold: Vec> = Vec::new(); - while vals.len() < 4 { + while vals.len() < 6 { match rx.try_recv_ref() { Ok(val) => { - if vals.len() == 2 && !hold.is_empty() { + if vals.len() == 4 && !hold.is_empty() { vals.push(*hold.pop().unwrap()); vals.push(*val); - } else if vals.len() == 1 && hold.is_empty() { + } else if vals.is_empty() && hold.is_empty() { hold.push(val); } else { vals.push(*val); @@ -132,7 +134,7 @@ fn mpsc_test_skip_slot() { } vals.sort_unstable(); - assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + assert_eq_dbg!(vals, vec![1, 2, 3, 4, 5, 6]); p1.join().unwrap(); p2.join().unwrap(); diff --git a/src/mpsc/tests/mpsc_blocking.rs b/src/mpsc/tests/mpsc_blocking.rs index 533f583..d6a40cb 100644 --- a/src/mpsc/tests/mpsc_blocking.rs +++ b/src/mpsc/tests/mpsc_blocking.rs @@ -1,7 +1,7 @@ use super::*; use crate::loom::{self, alloc::Track, thread}; -use crate::mpsc::blocking; use crate::mpsc::blocking::RecvRef; +use crate::mpsc::{blocking, errors}; #[test] #[cfg_attr(ci_skip_slow_models, ignore)] @@ -78,14 +78,14 @@ fn mpsc_try_recv_ref() { fn mpsc_test_skip_slot() { // This test emulates a situation where we might need to skip a slot. The setup includes two writing // threads that write elements to the channel and one reading thread that maintains a RecvRef to the - // third element until the end of the test, necessitating the skip: + // first element until the end of the test, necessitating the skip: // Given that the channel capacity is 2, here's the sequence of operations: // Thread 1 writes: 1, 2 // Thread 2 writes: 3, 4 - // The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1. - // As a result, the third slot is skipped during this process. + // The main thread reads from slots in this order: 0 (holds ref), 1, 1, 1. + // As a result, the first slot is skipped during this process. loom::model(|| { - let (tx, rx) = blocking::channel(2); + let (tx, rx) = blocking::channel(3); let p1 = { let tx = tx.clone(); @@ -107,13 +107,10 @@ fn mpsc_test_skip_slot() { let mut vals = Vec::new(); let mut hold: Vec> = Vec::new(); - while vals.len() < 4 { + while vals.len() + hold.len() < 4 { match rx.try_recv_ref() { Ok(val) => { - if vals.len() == 2 && !hold.is_empty() { - vals.push(*hold.pop().unwrap()); - vals.push(*val); - } else if vals.len() == 1 && hold.is_empty() { + if vals.is_empty() && hold.is_empty() { hold.push(val); } else { vals.push(*val); @@ -126,6 +123,7 @@ fn mpsc_test_skip_slot() { } thread::yield_now(); } + vals.push(*hold.pop().unwrap()); vals.sort_unstable(); assert_eq_dbg!(vals, vec![1, 2, 3, 4]); @@ -454,3 +452,38 @@ fn tx_close_drains_queue() { producer.join().unwrap(); }); } + +// Reproduces https://github.com/hawkw/thingbuf/issues/83 +// Pushing to a thingbuf should not hang when the buffer is full. +#[test] +fn test_full() { + loom::model(|| { + let (tx, rx) = channel(4); + let p1 = { + let tx = tx.clone(); + thread::spawn(move || loop { + match tx.try_send(1) { + Ok(_) => {} + Err(errors::TrySendError::Full(_)) => break, + Err(err) => assert!(false, "unexpected error: {:?}", err), + } + thread::yield_now(); + }) + }; + + let p2 = { + let tx = tx.clone(); + thread::spawn(move || loop { + match tx.try_send(2) { + Ok(_) => {} + Err(errors::TrySendError::Full(_)) => break, + Err(err) => assert!(false, "unexpected error: {:?}", err), + } + thread::yield_now(); + }) + }; + + p1.join().unwrap(); + p2.join().unwrap(); + }); +}