-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TryJoin
on tuple trying to free a null pointer
#155
Comments
even stranger, if you use a slight modified version, you trigger an infinite recursion: use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};
async fn process(
iter: impl IntoIterator<Item = &LoudDrop<i32>>,
fail: bool,
) -> Result<Vec<LoudDrop<i32>>, Vec<()>> {
if fail {
return Err(vec![]);
} else {
sleep(Duration::from_secs(5)).await;
}
Ok(iter
.into_iter()
.map(|i| ready(i.clone()))
.collect::<Vec<_>>()
.join()
.await)
}
#[derive(Clone)]
struct LoudDrop<T>(T);
impl<T> Drop for LoudDrop<T> {
fn drop(&mut self) {
println!("DROP");
}
}
#[tokio::main]
async fn main() -> Result<(), Vec<()>> {
let v = (0..4).map(LoudDrop).collect::<Vec<_>>();
//
// COMMENT THIS LINE AND YOU GET A SEGFAULT
// UNCOMMENT THIS LINE AND YOU GET AN INFINITE RECURSION
//
println!("{}", v.len());
(
process(v.iter().take(2), true),
process(v.iter().take(0), false),
)
.try_join()
.await?;
Ok(())
} |
miri is detecting UB in the
miri outputerror: Undefined Behavior: using uninitialized data, but this operation requires initialized memory
--> /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/raw_vec.rs:223:9
|
223 | self.ptr.as_ptr()
| ^^^^^^^^ using uninitialized data, but this operation requires initialized memory
|
= help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
= help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
= note: BACKTRACE:
= note: inside `alloc::raw_vec::RawVec::<LoudDrop<i32>>::ptr` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/raw_vec.rs:223:9: 223:17
= note: inside `std::vec::Vec::<LoudDrop<i32>>::as_mut_ptr` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs:1324:9: 1324:23
= note: inside `<std::vec::Vec<LoudDrop<i32>> as std::ops::Drop>::drop` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs:3064:62: 3064:79
= note: inside `std::ptr::drop_in_place::<std::vec::Vec<LoudDrop<i32>>> - shim(Some(std::vec::Vec<LoudDrop<i32>>))` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56
= note: inside `std::mem::MaybeUninit::<std::vec::Vec<LoudDrop<i32>>>::assume_init_drop` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/maybe_uninit.rs:728:18: 728:55
= note: inside `<futures_concurrency::future::try_join::tuple::TryJoin2<A, ResA, B, ResB, Err> as pin_project::__private::PinnedDrop>::drop::__drop_inner::<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>>` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:72:22: 72:48
= note: inside `<futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>> as pin_project::__private::PinnedDrop>::drop` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:263:9: 263:23
= note: inside `futures_concurrency::future::try_join::tuple::_::<impl std::ops::Drop for futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>>>::drop` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:174:23: 174:33
= note: inside `std::ptr::drop_in_place::<futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>>> - shim(Some(futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>>))` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56
note: inside closure
--> src/main.rs:42:14
|
42 | .await?;
| ^
= note: inside closure at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:63: 282:87
= note: inside `tokio::runtime::coop::with_budget::<std::task::Poll<std::result::Result<(), std::vec::Vec<()>>>, [closure@tokio::runtime::park::CachedParkThread::block_on<[async block@src/main.rs:31:1: 31:15]>::{closure#0}]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:107:5: 107:8
= note: inside `tokio::runtime::coop::budget::<std::task::Poll<std::result::Result<(), std::vec::Vec<()>>>, [closure@tokio::runtime::park::CachedParkThread::block_on<[async block@src/main.rs:31:1: 31:15]>::{closure#0}]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:73:5: 73:38
= note: inside `tokio::runtime::park::CachedParkThread::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:31: 282:88
= note: inside `tokio::runtime::context::blocking::BlockingRegionGuard::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/blocking.rs:66:9: 66:25
= note: inside closure at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:87:13: 87:38
= note: inside `tokio::runtime::scheduler::multi_thread::MultiThread::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:86:9: 88:11
note: inside `main`
--> src/main.rs:44:5
|
44 | Ok(())
| ^^^^^^
note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace
error: aborting due to previous error```
</details> |
Also, worth to note that these errors are only happening with Tokio for some reason, just tested my example with async_std and worked just fine. Edit: Also tested with smol and futures crate executor, also crashes with them. The |
Hey, thanks for reporting this! Im not behind a computer right now, but from reading the code it seems we may be setting the wrong state on line 36 of try_join.rs. We're tagging the entry as "ready" rather than "completed", and then dropping the data. This leads to a double-drop, which is what I believe Miri is finding. We do want to drop in-line, so I think what we should do is to instead mark it as "completed" and the drop it inline. If anyone wants to try that out and see if it still fails that would be helpful. Otherwise I can give this a try when I'm back at work next week. |
Tested here, calling I can help it further, just need some guidance on how to proceed. I'm putting here the tests output. Most of them were just expecting a "ready" state and should be simple to fix for a "none" state or for a new "completed" state, the ones that worried me more are the ones for leaking stuff, namely the assertion at futures-concurrency/src/future/try_join/tuple.rs Lines 395 to 397 in a2a7f6a
Unit testsrunning 59 tests
test future::join::array::test::debug ... ok
test future::join::array::test::empty ... ok
test future::join::vec::test::debug ... ok
test future::join::array::test::smoke ... ok
test future::join::tuple::test::join_0 ... ok
test future::future_group::test::smoke ... ok
test future::join::vec::test::empty ... ok
test future::join::vec::test::smoke ... ok
test future::race::array::test::no_fairness ... ok
test future::join::tuple::test::does_not_leak_memory ... FAILED
test future::race::tuple::test::race_1 ... ok
test future::join::tuple::test::join_3 ... FAILED
test future::join::tuple::test::join_1 ... FAILED
test future::join::tuple::test::join_2 ... FAILED
test future::race::tuple::test::race_2 ... ok
test future::race::tuple::test::race_3 ... ok
test future::race::vec::test::no_fairness ... ok
test future::race_ok::array::test::all_err ... ok
test future::race_ok::array::test::all_ok ... ok
test future::race_ok::array::test::one_err ... ok
test future::race_ok::tuple::test::race_ok_1 ... ok
test future::race_ok::tuple::test::race_ok_2 ... ok
test future::race_ok::tuple::test::race_ok_3 ... ok
test future::race_ok::tuple::test::race_ok_err ... ok
test future::race_ok::vec::test::all_err ... ok
test future::race_ok::vec::test::all_ok ... ok
test future::race_ok::vec::test::one_err ... ok
test future::try_join::array::test::all_ok ... ok
test future::try_join::array::test::empty ... ok
test future::try_join::array::test::one_err ... ok
test future::try_join::tuple::test::all_ok ... FAILED
test future::try_join::tuple::test::does_not_leak_memory ... FAILED
test future::try_join::tuple::test::issue_135_resume_after_completion ... FAILED
test future::try_join::tuple::test::one_err ... ok
test future::try_join::vec::test::all_ok ... ok
test future::try_join::vec::test::empty ... ok
test future::try_join::vec::test::one_err ... ok
test stream::chain::array::tests::chain_3 ... ok
test stream::chain::tuple::tests::chain_3 ... ok
test stream::chain::vec::tests::chain_3 ... ok
test stream::merge::array::tests::merge_array_2x2 ... ok
test stream::merge::array::tests::merge_array_4 ... ok
test stream::merge::tuple::tests::merge_tuple_0 ... ok
test stream::merge::array::tests::merge_channels ... FAILED
test stream::merge::tuple::tests::merge_channels ... FAILED
test stream::merge::tuple::tests::merge_tuple_1 ... ok
test stream::merge::tuple::tests::merge_tuple_2 ... ok
test stream::merge::tuple::tests::merge_tuple_3 ... ok
test stream::merge::tuple::tests::merge_tuple_4 ... ok
test stream::merge::vec::tests::merge_vec_2x2 ... ok
test stream::merge::vec::tests::merge_vec_4 ... ok
test stream::stream_group::test::smoke ... ok
test stream::merge::vec::tests::merge_channels ... FAILED
test stream::zip::array::tests::zip_array_3 ... ok
test stream::zip::tuple::tests::zip_tuple_3 ... ok
test utils::poll_state::vec::tests::boxed_does_not_allocate_twice ... ok
test stream::zip::vec::tests::zip_array_3 ... ok
test utils::poll_state::vec::tests::type_size ... ok
test utils::wakers::vec::readiness_vec::test::resize ... ok
failures:
---- future::join::tuple::test::does_not_leak_memory stdout ----
thread 'future::join::tuple::test::does_not_leak_memory' panicked at 'assertion failed: *flag.borrow()', src/future/join/tuple.rs:367:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
---- future::join::tuple::test::join_3 stdout ----
thread 'future::join::tuple::test::join_3' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13
---- future::join::tuple::test::join_1 stdout ----
thread 'future::join::tuple::test::join_1' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13
---- future::join::tuple::test::join_2 stdout ----
thread 'future::join::tuple::test::join_2' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13
---- future::try_join::tuple::test::all_ok stdout ----
thread 'future::try_join::tuple::test::all_ok' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13
---- future::try_join::tuple::test::does_not_leak_memory stdout ----
thread 'future::try_join::tuple::test::does_not_leak_memory' panicked at 'assertion failed: *flag.borrow()', src/future/try_join/tuple.rs:398:13
---- future::try_join::tuple::test::issue_135_resume_after_completion stdout ----
thread 'future::try_join::tuple::test::issue_135_resume_after_completion' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13
---- stream::merge::array::tests::merge_channels stdout ----
thread 'stream::merge::array::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13
---- stream::merge::tuple::tests::merge_channels stdout ----
thread 'stream::merge::tuple::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13
---- stream::merge::vec::tests::merge_channels stdout ----
thread 'stream::merge::vec::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13
failures:
future::join::tuple::test::does_not_leak_memory
future::join::tuple::test::join_1
future::join::tuple::test::join_2
future::join::tuple::test::join_3
future::try_join::tuple::test::all_ok
future::try_join::tuple::test::does_not_leak_memory
future::try_join::tuple::test::issue_135_resume_after_completion
stream::merge::array::tests::merge_channels
stream::merge::tuple::tests::merge_channels
stream::merge::vec::tests::merge_channels
test result: FAILED. 49 passed; 10 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.01s
error: test failed, to rerun pass `--lib` |
I'm trying to reproduce this bug today. And I believe the bug may actually be in use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};
async fn process_not_fail() -> Result<Vec<i32>, ()> {
sleep(Duration::from_millis(100)).await;
Ok(vec![ready(1), ready(2)].join().await)
}
async fn process_fail() -> Result<Vec<i32>, ()> {
Err(())
}
#[tokio::test]
async fn test() {
let res = (process_fail(), process_not_fail()).try_join().await;
assert!(res.is_err());
} But if we replace use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};
async fn process_not_fail() -> Result<[i32; 2], ()> {
sleep(Duration::from_millis(100)).await;
Ok([ready(1), ready(2)].join().await) // <- changed this line
}
async fn process_fail() -> Result<[i32; 2], ()> {
Err(())
}
#[tokio::test]
async fn test() {
let res = (process_fail(), process_not_fail()).try_join().await;
assert!(res.is_err());
} I want to see if I can further reduce this to see whether I can trigger it without using |
If we replace the try join of the tuple with a try join of an array, we trigger the same error. This should make this easier to debug since we're no longer needing to track down the source of the error through macro frames in the backtrace. use futures_concurrency::future::{Join, TryJoin};
use futures_core::Future;
use std::{future::ready, pin::Pin};
use tokio::time::{sleep, Duration};
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
async fn process_not_fail() -> Result<Vec<i32>, ()> {
sleep(Duration::from_millis(100)).await;
Ok(vec![ready(1), ready(2)].join().await)
}
async fn process_fail() -> Result<Vec<i32>, ()> {
Err(())
}
#[tokio::test]
async fn test() {
let a: BoxFuture<'static, _> = Box::pin(process_fail());
let b: BoxFuture<'static, _> = Box::pin(process_not_fail());
let res = [a, b].try_join().await;
assert!(res.is_err());
} |
Ha, I found the bug. It turns out the error was in all of the implementations of |
I've been using
futures_concurrency
crate as much as I can at Spacedrive, I prefer its functions and traits approach better than the macros approach offutures
crate. But I was having a weird crash problem with aTryJoin
. I was able to reproduce a minimal example as follows.On MacOS it fires a SIGABRT and on Linux a SIGSEGV (saying that the faulty address is
0x0
)The text was updated successfully, but these errors were encountered: