Skip to content

Commit

Permalink
fix: fix senders hanging when the buffer is full (#85)
Browse files Browse the repository at this point in the history
Fixes #83

Previously, to determine if the buffer was full, we checked whether the
head and tail were pointing to the same slot with the head one
generation behind. However, this check fails if we skip slots, leading
to scenarios where the `head` and `tail` point to different slots even
though the buffer is full.

For example, consider a buffer with 3 slots. Initially, we write to the
buffer three times (gen + 0). Then, we read from slot 0 and slot 1,
holding the reference from slot 1, and read from slot 2 (gen + 0). Next,
we write to slot 0 (gen + 1) and read from slot 0 (gen + 1), which moves
our `head` to slot 1 (gen + 1). Then we try to write to slot 1 (gen + 1)
and skip it, so we write to slot 2 (gen + 1). Then again we write to
slot 0 (gen + 2). And then we attempt to write to slot 1 but we skip and
attempt to write to slot 2 (gen + 2). However, we can’t write into it
because it still contains data from the previous generation (gen + 1),
and our `head` points to slot 1 instead of slot 2.

This fix ensures the buffer full condition accurately reflects the
actual status of the slots, particularly when writes are skipped.
  • Loading branch information
tukan authored Apr 18, 2024
1 parent c8e2a73 commit 723c44a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 21 deletions.
80 changes: 80 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl Core {
Some(state & HAS_READER | next_state)
})
.unwrap_or_else(|_| unreachable!()));
tail = next_tail;
backoff.spin();
continue;
}
Expand Down Expand Up @@ -308,6 +309,16 @@ impl Core {
test_println!("channel full");
return Err(TrySendError::Full(()));
}
// if the current state is marked as data written by the current (or even future)
// generation, we can spin, as our tail is probably behind the actual tail.
// however, if the state is marked as data written by the previous generation and nobody
// is reading it, then we don't have any available slots.
let (tail_idx, tail_gen) = self.idx_gen(tail);
let (state_idx, state_gen) = self.idx_gen(state);
if test_dbg!(state_idx == tail_idx + 1) && test_dbg!(state_gen < tail_gen) {
test_println!("channel full");
return Err(TrySendError::Full(()));
}

backoff.spin_yield();
tail = test_dbg!(self.tail.load(Acquire));
Expand Down Expand Up @@ -405,6 +416,7 @@ impl Core {
match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) {
Ok(_) => {
test_println!("skipped head slot [{}], new head={}", idx, next_head);
head = next_head;
}
Err(actual) => {
test_println!(
Expand Down Expand Up @@ -696,4 +708,72 @@ mod tests {
// don't panic in drop impl.
core.has_dropped_slots = true;
}

#[test]
fn full_simple() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();

core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));

// don't panic in drop impl.
core.has_dropped_slots = true;
}

#[test]
fn full_read_and_write() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();

core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));

// don't panic in drop impl.
core.has_dropped_slots = true;
}

#[test]
fn full_with_skip() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();

core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
let _hold = core.pop_ref(&slots).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));

// don't panic in drop impl.
core.has_dropped_slots = true;
}
}
24 changes: 13 additions & 11 deletions src/mpsc/tests/mpsc_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn mpsc_try_send_recv() {
#[cfg_attr(ci_skip_slow_models, ignore)]
fn mpsc_try_recv_ref() {
loom::model(|| {
let (tx, rx) = channel(2);
let (tx, rx) = channel(3);

let p1 = {
let tx = tx.clone();
Expand Down Expand Up @@ -80,44 +80,46 @@ fn mpsc_try_recv_ref() {
fn mpsc_test_skip_slot() {
// This test emulates a situation where we might need to skip a slot. The setup includes two writing
// threads that write elements to the channel and one reading thread that maintains a RecvRef to the
// third element until the end of the test, necessitating the skip:
// first element until the end of the test, necessitating the skip:
// Given that the channel capacity is 2, here's the sequence of operations:
// Thread 1 writes: 1, 2
// Thread 2 writes: 3, 4
// The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1.
// Thread 1 writes: 1, 2, 3
// Thread 2 writes: 4, 5, 6
// The main thread reads from slots in this order: 0 (holds ref), 1, 2, 1, 2, 1.
// As a result, the third slot is skipped during this process.
loom::model(|| {
let (tx, rx) = channel(2);
let (tx, rx) = channel(3);

let p1 = {
let tx = tx.clone();
thread::spawn(move || {
future::block_on(async move {
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
tx.send(3).await.unwrap();
})
})
};

let p2 = {
thread::spawn(move || {
future::block_on(async move {
tx.send(3).await.unwrap();
tx.send(4).await.unwrap();
tx.send(5).await.unwrap();
tx.send(6).await.unwrap();
})
})
};

let mut vals = Vec::new();
let mut hold: Vec<RecvRef<usize>> = Vec::new();

while vals.len() < 4 {
while vals.len() < 6 {
match rx.try_recv_ref() {
Ok(val) => {
if vals.len() == 2 && !hold.is_empty() {
if vals.len() == 4 && !hold.is_empty() {
vals.push(*hold.pop().unwrap());
vals.push(*val);
} else if vals.len() == 1 && hold.is_empty() {
} else if vals.is_empty() && hold.is_empty() {
hold.push(val);
} else {
vals.push(*val);
Expand All @@ -132,7 +134,7 @@ fn mpsc_test_skip_slot() {
}

vals.sort_unstable();
assert_eq_dbg!(vals, vec![1, 2, 3, 4]);
assert_eq_dbg!(vals, vec![1, 2, 3, 4, 5, 6]);

p1.join().unwrap();
p2.join().unwrap();
Expand Down
53 changes: 43 additions & 10 deletions src/mpsc/tests/mpsc_blocking.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::loom::{self, alloc::Track, thread};
use crate::mpsc::blocking;
use crate::mpsc::blocking::RecvRef;
use crate::mpsc::{blocking, errors};

#[test]
#[cfg_attr(ci_skip_slow_models, ignore)]
Expand Down Expand Up @@ -78,14 +78,14 @@ fn mpsc_try_recv_ref() {
fn mpsc_test_skip_slot() {
// This test emulates a situation where we might need to skip a slot. The setup includes two writing
// threads that write elements to the channel and one reading thread that maintains a RecvRef to the
// third element until the end of the test, necessitating the skip:
// first element until the end of the test, necessitating the skip:
// Given that the channel capacity is 2, here's the sequence of operations:
// Thread 1 writes: 1, 2
// Thread 2 writes: 3, 4
// The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1.
// As a result, the third slot is skipped during this process.
// The main thread reads from slots in this order: 0 (holds ref), 1, 1, 1.
// As a result, the first slot is skipped during this process.
loom::model(|| {
let (tx, rx) = blocking::channel(2);
let (tx, rx) = blocking::channel(3);

let p1 = {
let tx = tx.clone();
Expand All @@ -107,13 +107,10 @@ fn mpsc_test_skip_slot() {
let mut vals = Vec::new();
let mut hold: Vec<RecvRef<usize>> = Vec::new();

while vals.len() < 4 {
while vals.len() + hold.len() < 4 {
match rx.try_recv_ref() {
Ok(val) => {
if vals.len() == 2 && !hold.is_empty() {
vals.push(*hold.pop().unwrap());
vals.push(*val);
} else if vals.len() == 1 && hold.is_empty() {
if vals.is_empty() && hold.is_empty() {
hold.push(val);
} else {
vals.push(*val);
Expand All @@ -126,6 +123,7 @@ fn mpsc_test_skip_slot() {
}
thread::yield_now();
}
vals.push(*hold.pop().unwrap());

vals.sort_unstable();
assert_eq_dbg!(vals, vec![1, 2, 3, 4]);
Expand Down Expand Up @@ -454,3 +452,38 @@ fn tx_close_drains_queue() {
producer.join().unwrap();
});
}

// Reproduces https://github.com/hawkw/thingbuf/issues/83
// Pushing to a thingbuf should not hang when the buffer is full.
#[test]
fn test_full() {
loom::model(|| {
let (tx, rx) = channel(4);
let p1 = {
let tx = tx.clone();
thread::spawn(move || loop {
match tx.try_send(1) {
Ok(_) => {}
Err(errors::TrySendError::Full(_)) => break,
Err(err) => assert!(false, "unexpected error: {:?}", err),
}
thread::yield_now();
})
};

let p2 = {
let tx = tx.clone();
thread::spawn(move || loop {
match tx.try_send(2) {
Ok(_) => {}
Err(errors::TrySendError::Full(_)) => break,
Err(err) => assert!(false, "unexpected error: {:?}", err),
}
thread::yield_now();
})
};

p1.join().unwrap();
p2.join().unwrap();
});
}

0 comments on commit 723c44a

Please sign in to comment.