diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 5759422..9b7f085 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -26,8 +26,9 @@ jobs: model: - mpsc_send_recv_wrap - mpsc_try_send_recv - - mpsc::rx_close_unconsumed - - mpsc_sync::rx_close_unconsumed + - mpsc_try_recv_ref + - mpsc_async::rx_close_unconsumed + - mpsc_blocking::rx_close_unconsumed name: model '${{ matrix.model }}'' runs-on: ubuntu-latest steps: @@ -54,7 +55,7 @@ jobs: scope: # NOTE: if adding loom models in a new module, that module needs to be # added to this list! - - mpsc_sync + - mpsc_blocking - mpsc_async - thingbuf - util diff --git a/Cargo.toml b/Cargo.toml index 864a0ce..b080abe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ lto = true opt-level = 3 [patch.crates-io] -loom = { git = "https://github.com/tokio-rs/loom", rev = "555b52fdb267964f0950a52be87b0f28c40b054c" } +loom = { git = "https://github.com/tokio-rs/loom", rev = "a93bf2390e0fcfdb7c5899b31db0e4e795ab4aab" } [package.metadata.docs.rs] all-features = true diff --git a/src/mpsc/tests/mpsc_async.rs b/src/mpsc/tests/mpsc_async.rs index a85415a..2648146 100644 --- a/src/mpsc/tests/mpsc_async.rs +++ b/src/mpsc/tests/mpsc_async.rs @@ -34,6 +34,47 @@ fn mpsc_try_send_recv() { }) } +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_try_recv_ref() { + loom::model(|| { + let (tx, rx) = channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + future::block_on(async move { + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + }) + }) + }; + let p2 = thread::spawn(move || { + future::block_on(async move { + tx.send(3).await.unwrap(); + tx.send(4).await.unwrap(); + }) + }); + + let mut vals = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => vals.push(*val), + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => panic!("channel closed"), + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + #[test] fn rx_closes() { const ITERATIONS: usize = 6; diff --git a/src/mpsc/tests/mpsc_blocking.rs b/src/mpsc/tests/mpsc_blocking.rs index c518e35..89b6f0d 100644 --- a/src/mpsc/tests/mpsc_blocking.rs +++ b/src/mpsc/tests/mpsc_blocking.rs @@ -2,15 +2,7 @@ use super::*; use crate::loom::{self, alloc::Track, thread}; #[test] -// This test currently fails because `loom` implements the wrong semantics for -// `Thread::unpark()`/`thread::park` (see -// https://github.com/tokio-rs/loom/issues/246). -// However, it implements the correct semantics for async `Waker`s (which -// _should_ be the same as park/unpark), so the async version of this test more -// or less verifies that the algorithm here is correct. -// -// TODO(eliza): when tokio-rs/loom#246 is fixed, we can re-enable this test! -#[ignore] +#[cfg_attr(ci_skip_slow_models, ignore)] fn mpsc_try_send_recv() { loom::model(|| { let (tx, rx) = blocking::channel(3); @@ -40,6 +32,45 @@ fn mpsc_try_send_recv() { }) } +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_try_recv_ref() { + loom::model(|| { + let (tx, rx) = blocking::channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + *tx.send_ref().unwrap() = 1; + thread::yield_now(); + *tx.send_ref().unwrap() = 2; + }) + }; + let p2 = thread::spawn(move || { + *tx.send_ref().unwrap() = 3; + thread::yield_now(); + *tx.send_ref().unwrap() = 4; + }); + + let mut vals = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => vals.push(*val), + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => panic!("channel closed"), + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + #[test] fn rx_closes() { const ITERATIONS: usize = 6; @@ -185,15 +216,7 @@ fn spsc_recv_then_try_send_then_close() { } #[test] -// This test currently fails because `loom` implements the wrong semantics for -// `Thread::unpark()`/`thread::park` (see -// https://github.com/tokio-rs/loom/issues/246). -// However, it implements the correct semantics for async `Waker`s (which -// _should_ be the same as park/unpark), so the async version of this test more -// or less verifies that the algorithm here is correct. -// -// TODO(eliza): when tokio-rs/loom#246 is fixed, we can re-enable this test! -#[ignore] +#[cfg_attr(ci_skip_slow_models, ignore)] fn mpsc_send_recv_wrap() { loom::model(|| { let (tx, rx) = blocking::channel::(1); @@ -284,15 +307,6 @@ fn spsc_send_recv_in_order_no_wrap() { } #[test] -// This test currently fails because `loom` implements the wrong semantics for -// `Thread::unpark()`/`thread::park` (see -// https://github.com/tokio-rs/loom/issues/246). -// However, it implements the correct semantics for async `Waker`s (which -// _should_ be the same as park/unpark), so the async version of this test more -// or less verifies that the algorithm here is correct. -// -// TODO(eliza): when tokio-rs/loom#246 is fixed, we can re-enable this test! -#[ignore] fn spsc_send_recv_in_order_wrap() { const N_SENDS: usize = 2; loom::model(|| {