From 723c44ae77f76869d5607cbb5a7919477dba23c3 Mon Sep 17 00:00:00 2001 From: "Timothy N. Tsvetkov" <33668+tukan@users.noreply.github.com> Date: Thu, 18 Apr 2024 20:04:28 +0300 Subject: [PATCH] fix: fix senders hanging when the buffer is full (#85) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #83 Previously, to determine if the buffer was full, we checked whether the head and tail were pointing to the same slot with the head one generation behind. However, this check fails if we skip slots, leading to scenarios where the `head` and `tail` point to different slots even though the buffer is full. For example, consider a buffer with 3 slots. Initially, we write to the buffer three times (gen + 0). Then, we read from slot 0 and slot 1, holding the reference from slot 1, and read from slot 2 (gen + 0). Next, we write to slot 0 (gen + 1) and read from slot 0 (gen + 1), which moves our `head` to slot 1 (gen + 1). Then we try to write to slot 1 (gen + 1) and skip it, so we write to slot 2 (gen + 1). Then again we write to slot 0 (gen + 2). And then we attempt to write to slot 1 but we skip and attempt to write to slot 2 (gen + 2). However, we can’t write into it because it still contains data from the previous generation (gen + 1), and our `head` points to slot 1 instead of slot 2. This fix ensures the buffer full condition accurately reflects the actual status of the slots, particularly when writes are skipped. --- src/lib.rs | 80 +++++++++++++++++++++++++++++++++ src/mpsc/tests/mpsc_async.rs | 24 +++++----- src/mpsc/tests/mpsc_blocking.rs | 53 +++++++++++++++++----- 3 files changed, 136 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f702ef2..c6c0869 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -249,6 +249,7 @@ impl Core { Some(state & HAS_READER | next_state) }) .unwrap_or_else(|_| unreachable!())); + tail = next_tail; backoff.spin(); continue; } @@ -308,6 +309,16 @@ impl Core { test_println!("channel full"); return Err(TrySendError::Full(())); } + // if the current state is marked as data written by the current (or even future) + // generation, we can spin, as our tail is probably behind the actual tail. + // however, if the state is marked as data written by the previous generation and nobody + // is reading it, then we don't have any available slots. + let (tail_idx, tail_gen) = self.idx_gen(tail); + let (state_idx, state_gen) = self.idx_gen(state); + if test_dbg!(state_idx == tail_idx + 1) && test_dbg!(state_gen < tail_gen) { + test_println!("channel full"); + return Err(TrySendError::Full(())); + } backoff.spin_yield(); tail = test_dbg!(self.tail.load(Acquire)); @@ -405,6 +416,7 @@ impl Core { match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) { Ok(_) => { test_println!("skipped head slot [{}], new head={}", idx, next_head); + head = next_head; } Err(actual) => { test_println!( @@ -696,4 +708,72 @@ mod tests { // don't panic in drop impl. core.has_dropped_slots = true; } + + #[test] + fn full_simple() { + const CAP: usize = 3; + let mut core = Core::new(CAP); + let slots: Box<[Slot]> = Slot::::make_boxed_array(CAP); + let recycle = recycling::DefaultRecycle::new(); + + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + assert!(matches!( + core.push_ref(&slots, &recycle), + Err(TrySendError::Full(())) + )); + + // don't panic in drop impl. + core.has_dropped_slots = true; + } + + #[test] + fn full_read_and_write() { + const CAP: usize = 3; + let mut core = Core::new(CAP); + let slots: Box<[Slot]> = Slot::::make_boxed_array(CAP); + let recycle = recycling::DefaultRecycle::new(); + + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.pop_ref(&slots).unwrap(); + core.pop_ref(&slots).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + assert!(matches!( + core.push_ref(&slots, &recycle), + Err(TrySendError::Full(())) + )); + + // don't panic in drop impl. + core.has_dropped_slots = true; + } + + #[test] + fn full_with_skip() { + const CAP: usize = 3; + let mut core = Core::new(CAP); + let slots: Box<[Slot]> = Slot::::make_boxed_array(CAP); + let recycle = recycling::DefaultRecycle::new(); + + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.pop_ref(&slots).unwrap(); + let _hold = core.pop_ref(&slots).unwrap(); + core.pop_ref(&slots).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + core.pop_ref(&slots).unwrap(); + core.push_ref(&slots, &recycle).unwrap(); + assert!(matches!( + core.push_ref(&slots, &recycle), + Err(TrySendError::Full(())) + )); + + // don't panic in drop impl. + core.has_dropped_slots = true; + } } diff --git a/src/mpsc/tests/mpsc_async.rs b/src/mpsc/tests/mpsc_async.rs index b720540..d7e9d2a 100644 --- a/src/mpsc/tests/mpsc_async.rs +++ b/src/mpsc/tests/mpsc_async.rs @@ -38,7 +38,7 @@ fn mpsc_try_send_recv() { #[cfg_attr(ci_skip_slow_models, ignore)] fn mpsc_try_recv_ref() { loom::model(|| { - let (tx, rx) = channel(2); + let (tx, rx) = channel(3); let p1 = { let tx = tx.clone(); @@ -80,14 +80,14 @@ fn mpsc_try_recv_ref() { fn mpsc_test_skip_slot() { // This test emulates a situation where we might need to skip a slot. The setup includes two writing // threads that write elements to the channel and one reading thread that maintains a RecvRef to the - // third element until the end of the test, necessitating the skip: + // first element until the end of the test, necessitating the skip: // Given that the channel capacity is 2, here's the sequence of operations: - // Thread 1 writes: 1, 2 - // Thread 2 writes: 3, 4 - // The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1. + // Thread 1 writes: 1, 2, 3 + // Thread 2 writes: 4, 5, 6 + // The main thread reads from slots in this order: 0 (holds ref), 1, 2, 1, 2, 1. // As a result, the third slot is skipped during this process. loom::model(|| { - let (tx, rx) = channel(2); + let (tx, rx) = channel(3); let p1 = { let tx = tx.clone(); @@ -95,6 +95,7 @@ fn mpsc_test_skip_slot() { future::block_on(async move { tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); + tx.send(3).await.unwrap(); }) }) }; @@ -102,8 +103,9 @@ fn mpsc_test_skip_slot() { let p2 = { thread::spawn(move || { future::block_on(async move { - tx.send(3).await.unwrap(); tx.send(4).await.unwrap(); + tx.send(5).await.unwrap(); + tx.send(6).await.unwrap(); }) }) }; @@ -111,13 +113,13 @@ fn mpsc_test_skip_slot() { let mut vals = Vec::new(); let mut hold: Vec> = Vec::new(); - while vals.len() < 4 { + while vals.len() < 6 { match rx.try_recv_ref() { Ok(val) => { - if vals.len() == 2 && !hold.is_empty() { + if vals.len() == 4 && !hold.is_empty() { vals.push(*hold.pop().unwrap()); vals.push(*val); - } else if vals.len() == 1 && hold.is_empty() { + } else if vals.is_empty() && hold.is_empty() { hold.push(val); } else { vals.push(*val); @@ -132,7 +134,7 @@ fn mpsc_test_skip_slot() { } vals.sort_unstable(); - assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + assert_eq_dbg!(vals, vec![1, 2, 3, 4, 5, 6]); p1.join().unwrap(); p2.join().unwrap(); diff --git a/src/mpsc/tests/mpsc_blocking.rs b/src/mpsc/tests/mpsc_blocking.rs index 533f583..d6a40cb 100644 --- a/src/mpsc/tests/mpsc_blocking.rs +++ b/src/mpsc/tests/mpsc_blocking.rs @@ -1,7 +1,7 @@ use super::*; use crate::loom::{self, alloc::Track, thread}; -use crate::mpsc::blocking; use crate::mpsc::blocking::RecvRef; +use crate::mpsc::{blocking, errors}; #[test] #[cfg_attr(ci_skip_slow_models, ignore)] @@ -78,14 +78,14 @@ fn mpsc_try_recv_ref() { fn mpsc_test_skip_slot() { // This test emulates a situation where we might need to skip a slot. The setup includes two writing // threads that write elements to the channel and one reading thread that maintains a RecvRef to the - // third element until the end of the test, necessitating the skip: + // first element until the end of the test, necessitating the skip: // Given that the channel capacity is 2, here's the sequence of operations: // Thread 1 writes: 1, 2 // Thread 2 writes: 3, 4 - // The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1. - // As a result, the third slot is skipped during this process. + // The main thread reads from slots in this order: 0 (holds ref), 1, 1, 1. + // As a result, the first slot is skipped during this process. loom::model(|| { - let (tx, rx) = blocking::channel(2); + let (tx, rx) = blocking::channel(3); let p1 = { let tx = tx.clone(); @@ -107,13 +107,10 @@ fn mpsc_test_skip_slot() { let mut vals = Vec::new(); let mut hold: Vec> = Vec::new(); - while vals.len() < 4 { + while vals.len() + hold.len() < 4 { match rx.try_recv_ref() { Ok(val) => { - if vals.len() == 2 && !hold.is_empty() { - vals.push(*hold.pop().unwrap()); - vals.push(*val); - } else if vals.len() == 1 && hold.is_empty() { + if vals.is_empty() && hold.is_empty() { hold.push(val); } else { vals.push(*val); @@ -126,6 +123,7 @@ fn mpsc_test_skip_slot() { } thread::yield_now(); } + vals.push(*hold.pop().unwrap()); vals.sort_unstable(); assert_eq_dbg!(vals, vec![1, 2, 3, 4]); @@ -454,3 +452,38 @@ fn tx_close_drains_queue() { producer.join().unwrap(); }); } + +// Reproduces https://github.com/hawkw/thingbuf/issues/83 +// Pushing to a thingbuf should not hang when the buffer is full. +#[test] +fn test_full() { + loom::model(|| { + let (tx, rx) = channel(4); + let p1 = { + let tx = tx.clone(); + thread::spawn(move || loop { + match tx.try_send(1) { + Ok(_) => {} + Err(errors::TrySendError::Full(_)) => break, + Err(err) => assert!(false, "unexpected error: {:?}", err), + } + thread::yield_now(); + }) + }; + + let p2 = { + let tx = tx.clone(); + thread::spawn(move || loop { + match tx.try_send(2) { + Ok(_) => {} + Err(errors::TrySendError::Full(_)) => break, + Err(err) => assert!(false, "unexpected error: {:?}", err), + } + thread::yield_now(); + }) + }; + + p1.join().unwrap(); + p2.join().unwrap(); + }); +}