Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #83: thingbuf hangs when buffer is full #85

Merged
merged 7 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl Core {
Some(state & HAS_READER | next_state)
})
.unwrap_or_else(|_| unreachable!()));
tail = next_tail;
backoff.spin();
continue;
}
Expand Down Expand Up @@ -308,6 +309,16 @@ impl Core {
test_println!("channel full");
return Err(TrySendError::Full(()));
}
// if the current state is marked as data written by the current (or even future)
// generation we can spin as probably our tail is behind the actual tail
// but if the state is marked as data written by the previous generation and nobody
// is reading it then we don't have any available slots
tukan marked this conversation as resolved.
Show resolved Hide resolved
let (tail_idx, tail_gen) = self.idx_gen(tail);
let (state_ix, state_gen) = self.idx_gen(state);
if test_dbg!(state_ix == tail_idx + 1 && state_gen < tail_gen) {
tukan marked this conversation as resolved.
Show resolved Hide resolved
tukan marked this conversation as resolved.
Show resolved Hide resolved
test_println!("channel full");
return Err(TrySendError::Full(()));
}

backoff.spin_yield();
tail = test_dbg!(self.tail.load(Acquire));
Expand Down Expand Up @@ -405,6 +416,7 @@ impl Core {
match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) {
Ok(_) => {
test_println!("skipped head slot [{}], new head={}", idx, next_head);
head = next_head;
}
Err(actual) => {
test_println!(
Expand Down Expand Up @@ -696,4 +708,72 @@ mod tests {
// don't panic in drop impl.
core.has_dropped_slots = true;
}

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to also add a loom model exercising what happens when a buffer fills up due to concurrent pushes from multiple threads? we could do something where we spawn multiple threads and have each one try to push in a loop until the buffer is full, which would check that all of those threads eventually complete.

we could also exercise slot skipping by adding a thread that calls pop_ref and either mem::forgets the guards or stuffs them someplace to hang onto them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw I need your advise here, I added a simple loom test but I find it difficult to construct a good test with read/writes under loom. Imagine we have a thread that reads from the buffer (for example, exactly three times) and we have two threads that write to the buffer until three elements are read and the buffer is full, under loom we will fail because we will reach max iterations in cases in which we attempt to write a lot and doesn't read, making us spin in "buffer is full" and not progress

#[test]
fn full_simple() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();

core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));

// don't panic in drop impl.
core.has_dropped_slots = true;
}

#[test]
fn full_read_and_write() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();

core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));

// don't panic in drop impl.
core.has_dropped_slots = true;
}

#[test]
fn full_with_skip() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();

core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
let _hold = core.pop_ref(&slots).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));

// don't panic in drop impl.
core.has_dropped_slots = true;
}
}
24 changes: 13 additions & 11 deletions src/mpsc/tests/mpsc_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn mpsc_try_send_recv() {
#[cfg_attr(ci_skip_slow_models, ignore)]
fn mpsc_try_recv_ref() {
loom::model(|| {
let (tx, rx) = channel(2);
let (tx, rx) = channel(3);

let p1 = {
let tx = tx.clone();
Expand Down Expand Up @@ -80,44 +80,46 @@ fn mpsc_try_recv_ref() {
fn mpsc_test_skip_slot() {
// This test emulates a situation where we might need to skip a slot. The setup includes two writing
// threads that write elements to the channel and one reading thread that maintains a RecvRef to the
// third element until the end of the test, necessitating the skip:
// first element until the end of the test, necessitating the skip:
// Given that the channel capacity is 2, here's the sequence of operations:
// Thread 1 writes: 1, 2
// Thread 2 writes: 3, 4
// The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1.
// Thread 1 writes: 1, 2, 3
// Thread 2 writes: 4, 5, 6
// The main thread reads from slots in this order: 0 (holds ref), 1, 2, 1, 2, 1.
// As a result, the third slot is skipped during this process.
loom::model(|| {
let (tx, rx) = channel(2);
let (tx, rx) = channel(3);

let p1 = {
let tx = tx.clone();
thread::spawn(move || {
future::block_on(async move {
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
tx.send(3).await.unwrap();
})
})
};

let p2 = {
thread::spawn(move || {
future::block_on(async move {
tx.send(3).await.unwrap();
tx.send(4).await.unwrap();
tx.send(5).await.unwrap();
tx.send(6).await.unwrap();
})
})
};

let mut vals = Vec::new();
let mut hold: Vec<RecvRef<usize>> = Vec::new();

while vals.len() < 4 {
while vals.len() < 6 {
match rx.try_recv_ref() {
Ok(val) => {
if vals.len() == 2 && !hold.is_empty() {
if vals.len() == 4 && !hold.is_empty() {
vals.push(*hold.pop().unwrap());
vals.push(*val);
} else if vals.len() == 1 && hold.is_empty() {
} else if vals.is_empty() && hold.is_empty() {
hold.push(val);
} else {
vals.push(*val);
Expand All @@ -132,7 +134,7 @@ fn mpsc_test_skip_slot() {
}

vals.sort_unstable();
assert_eq_dbg!(vals, vec![1, 2, 3, 4]);
assert_eq_dbg!(vals, vec![1, 2, 3, 4, 5, 6]);

p1.join().unwrap();
p2.join().unwrap();
Expand Down
16 changes: 7 additions & 9 deletions src/mpsc/tests/mpsc_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ fn mpsc_try_recv_ref() {
fn mpsc_test_skip_slot() {
// This test emulates a situation where we might need to skip a slot. The setup includes two writing
// threads that write elements to the channel and one reading thread that maintains a RecvRef to the
// third element until the end of the test, necessitating the skip:
// first element until the end of the test, necessitating the skip:
// Given that the channel capacity is 2, here's the sequence of operations:
// Thread 1 writes: 1, 2
// Thread 2 writes: 3, 4
// The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1.
// As a result, the third slot is skipped during this process.
// The main thread reads from slots in this order: 0 (holds ref), 1, 1, 1.
// As a result, the first slot is skipped during this process.
loom::model(|| {
let (tx, rx) = blocking::channel(2);
let (tx, rx) = blocking::channel(3);

let p1 = {
let tx = tx.clone();
Expand All @@ -107,13 +107,10 @@ fn mpsc_test_skip_slot() {
let mut vals = Vec::new();
let mut hold: Vec<RecvRef<usize>> = Vec::new();

while vals.len() < 4 {
while vals.len() + hold.len() < 4 {
match rx.try_recv_ref() {
Ok(val) => {
if vals.len() == 2 && !hold.is_empty() {
vals.push(*hold.pop().unwrap());
vals.push(*val);
} else if vals.len() == 1 && hold.is_empty() {
if vals.is_empty() && hold.is_empty() {
hold.push(val);
} else {
vals.push(*val);
Expand All @@ -126,6 +123,7 @@ fn mpsc_test_skip_slot() {
}
thread::yield_now();
}
vals.push(*hold.pop().unwrap());

vals.sort_unstable();
assert_eq_dbg!(vals, vec![1, 2, 3, 4]);
Expand Down
Loading