Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TryJoin on tuple trying to free a null pointer #155

Closed
fogodev opened this issue Sep 13, 2023 · 8 comments · Fixed by #157
Closed

TryJoin on tuple trying to free a null pointer #155

fogodev opened this issue Sep 13, 2023 · 8 comments · Fixed by #157
Labels
bug Something isn't working

Comments

@fogodev
Copy link

fogodev commented Sep 13, 2023

I've been using futures_concurrency crate as much as I can at Spacedrive, I prefer its functions and traits approach better than the macros approach of futures crate. But I was having a weird crash problem with a TryJoin. I was able to reproduce a minimal example as follows.

use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};

async fn process(iter: impl IntoIterator<Item = &i32>, fail: bool) -> Result<Vec<i32>, Vec<()>> {
    if fail {
        return Err(vec![]);
    } else {
        sleep(Duration::from_secs(5)).await;
    }

    Ok(iter
        .into_iter()
        .map(|i| ready(*i))
        .collect::<Vec<_>>()
        .join()
        .await)
}

#[tokio::main]
async fn main() -> Result<(), Vec<()>> {
    let v = (0..10).collect::<Vec<_>>();

    (
        process(v.iter().take(5), true),
        process(v.iter().take(0), false),
    )
        .try_join()
        .await?;

    Ok(())
}
image

On MacOS it fires a SIGABRT and on Linux a SIGSEGV (saying that the faulty address is 0x0)

@matheus-consoli
Copy link
Collaborator

even stranger, if you use a slight modified version, you trigger an infinite recursion:

use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};

async fn process(
    iter: impl IntoIterator<Item = &LoudDrop<i32>>,
    fail: bool,
) -> Result<Vec<LoudDrop<i32>>, Vec<()>> {
    if fail {
        return Err(vec![]);
    } else {
        sleep(Duration::from_secs(5)).await;
    }

    Ok(iter
        .into_iter()
        .map(|i| ready(i.clone()))
        .collect::<Vec<_>>()
        .join()
        .await)
}

#[derive(Clone)]
struct LoudDrop<T>(T);
impl<T> Drop for LoudDrop<T> {
    fn drop(&mut self) {
        println!("DROP");
    }
}

#[tokio::main]
async fn main() -> Result<(), Vec<()>> {
    let v = (0..4).map(LoudDrop).collect::<Vec<_>>();

    // 
    // COMMENT THIS LINE AND YOU GET A SEGFAULT
    // UNCOMMENT THIS LINE AND YOU GET AN INFINITE RECURSION
    //
    println!("{}", v.len());

    (
        process(v.iter().take(2), true),
        process(v.iter().take(0), false),
    )
        .try_join()
        .await?;

    Ok(())
}

@matheus-consoli
Copy link
Collaborator

miri is detecting UB in the drop_initialized_values, here:

unsafe { $output.assume_init_drop() };

miri output
error: Undefined Behavior: using uninitialized data, but this operation requires initialized memory
   --> /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/raw_vec.rs:223:9
    |
223 |         self.ptr.as_ptr()
    |         ^^^^^^^^ using uninitialized data, but this operation requires initialized memory
    |
    = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
    = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
    = note: BACKTRACE:
    = note: inside `alloc::raw_vec::RawVec::<LoudDrop<i32>>::ptr` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/raw_vec.rs:223:9: 223:17
    = note: inside `std::vec::Vec::<LoudDrop<i32>>::as_mut_ptr` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs:1324:9: 1324:23
    = note: inside `<std::vec::Vec<LoudDrop<i32>> as std::ops::Drop>::drop` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs:3064:62: 3064:79
    = note: inside `std::ptr::drop_in_place::<std::vec::Vec<LoudDrop<i32>>> - shim(Some(std::vec::Vec<LoudDrop<i32>>))` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56
    = note: inside `std::mem::MaybeUninit::<std::vec::Vec<LoudDrop<i32>>>::assume_init_drop` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/maybe_uninit.rs:728:18: 728:55
    = note: inside `<futures_concurrency::future::try_join::tuple::TryJoin2<A, ResA, B, ResB, Err> as pin_project::__private::PinnedDrop>::drop::__drop_inner::<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>>` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:72:22: 72:48
    = note: inside `<futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>> as pin_project::__private::PinnedDrop>::drop` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:263:9: 263:23
    = note: inside `futures_concurrency::future::try_join::tuple::_::<impl std::ops::Drop for futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>>>::drop` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:174:23: 174:33
    = note: inside `std::ptr::drop_in_place::<futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>>> - shim(Some(futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec<LoudDrop<i32>>, std::vec::Vec<()>>))` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56
note: inside closure
   --> src/main.rs:42:14
    |
42  |         .await?;
    |              ^
    = note: inside closure at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:63: 282:87
    = note: inside `tokio::runtime::coop::with_budget::<std::task::Poll<std::result::Result<(), std::vec::Vec<()>>>, [closure@tokio::runtime::park::CachedParkThread::block_on<[async block@src/main.rs:31:1: 31:15]>::{closure#0}]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:107:5: 107:8
    = note: inside `tokio::runtime::coop::budget::<std::task::Poll<std::result::Result<(), std::vec::Vec<()>>>, [closure@tokio::runtime::park::CachedParkThread::block_on<[async block@src/main.rs:31:1: 31:15]>::{closure#0}]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:73:5: 73:38
    = note: inside `tokio::runtime::park::CachedParkThread::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:31: 282:88
    = note: inside `tokio::runtime::context::blocking::BlockingRegionGuard::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/blocking.rs:66:9: 66:25
    = note: inside closure at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:87:13: 87:38
    = note: inside `tokio::runtime::scheduler::multi_thread::MultiThread::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:86:9: 88:11
note: inside `main`
   --> src/main.rs:44:5
    |
44  |     Ok(())
    |     ^^^^^^

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to previous error```
</details>

@fogodev
Copy link
Author

fogodev commented Sep 13, 2023

Also, worth to note that these errors are only happening with Tokio for some reason, just tested my example with async_std and worked just fine.

Edit: Also tested with smol and futures crate executor, also crashes with them. The unsafe usage that @matheus-consoli mentioned gives that the outputs are only being properly initialized with async_std crate, with the other executors it's trying to drop invalid data.

@yoshuawuyts
Copy link
Owner

yoshuawuyts commented Sep 16, 2023

Hey, thanks for reporting this!

Im not behind a computer right now, but from reading the code it seems we may be setting the wrong state on line 36 of try_join.rs. We're tagging the entry as "ready" rather than "completed", and then dropping the data. This leads to a double-drop, which is what I believe Miri is finding.

We do want to drop in-line, so I think what we should do is to instead mark it as "completed" and the drop it inline. If anyone wants to try that out and see if it still fails that would be helpful. Otherwise I can give this a try when I'm back at work next week.

@fogodev
Copy link
Author

fogodev commented Sep 16, 2023

Tested here, calling $this.state[$fut_idx].set_none(); just after the ManuallyDrop::drop line solved this issue. Setting the PollState to none was the closest thing I could find to mark it as "completed" instead of "ready". Introduced this line on both join and try_join for tuples.

I can help it further, just need some guidance on how to proceed. I'm putting here the tests output. Most of them were just expecting a "ready" state and should be simple to fix for a "none" state or for a new "completed" state, the ones that worried me more are the ones for leaking stuff, namely the assertion at

NOT_LEAKING.with(|flag| {
assert!(*flag.borrow());
})

Unit tests
running 59 tests
test future::join::array::test::debug ... ok
test future::join::array::test::empty ... ok
test future::join::vec::test::debug ... ok
test future::join::array::test::smoke ... ok
test future::join::tuple::test::join_0 ... ok
test future::future_group::test::smoke ... ok
test future::join::vec::test::empty ... ok
test future::join::vec::test::smoke ... ok
test future::race::array::test::no_fairness ... ok
test future::join::tuple::test::does_not_leak_memory ... FAILED
test future::race::tuple::test::race_1 ... ok
test future::join::tuple::test::join_3 ... FAILED
test future::join::tuple::test::join_1 ... FAILED
test future::join::tuple::test::join_2 ... FAILED
test future::race::tuple::test::race_2 ... ok
test future::race::tuple::test::race_3 ... ok
test future::race::vec::test::no_fairness ... ok
test future::race_ok::array::test::all_err ... ok
test future::race_ok::array::test::all_ok ... ok
test future::race_ok::array::test::one_err ... ok
test future::race_ok::tuple::test::race_ok_1 ... ok
test future::race_ok::tuple::test::race_ok_2 ... ok
test future::race_ok::tuple::test::race_ok_3 ... ok
test future::race_ok::tuple::test::race_ok_err ... ok
test future::race_ok::vec::test::all_err ... ok
test future::race_ok::vec::test::all_ok ... ok
test future::race_ok::vec::test::one_err ... ok
test future::try_join::array::test::all_ok ... ok
test future::try_join::array::test::empty ... ok
test future::try_join::array::test::one_err ... ok
test future::try_join::tuple::test::all_ok ... FAILED
test future::try_join::tuple::test::does_not_leak_memory ... FAILED
test future::try_join::tuple::test::issue_135_resume_after_completion ... FAILED
test future::try_join::tuple::test::one_err ... ok
test future::try_join::vec::test::all_ok ... ok
test future::try_join::vec::test::empty ... ok
test future::try_join::vec::test::one_err ... ok
test stream::chain::array::tests::chain_3 ... ok
test stream::chain::tuple::tests::chain_3 ... ok
test stream::chain::vec::tests::chain_3 ... ok
test stream::merge::array::tests::merge_array_2x2 ... ok
test stream::merge::array::tests::merge_array_4 ... ok
test stream::merge::tuple::tests::merge_tuple_0 ... ok
test stream::merge::array::tests::merge_channels ... FAILED
test stream::merge::tuple::tests::merge_channels ... FAILED
test stream::merge::tuple::tests::merge_tuple_1 ... ok
test stream::merge::tuple::tests::merge_tuple_2 ... ok
test stream::merge::tuple::tests::merge_tuple_3 ... ok
test stream::merge::tuple::tests::merge_tuple_4 ... ok
test stream::merge::vec::tests::merge_vec_2x2 ... ok
test stream::merge::vec::tests::merge_vec_4 ... ok
test stream::stream_group::test::smoke ... ok
test stream::merge::vec::tests::merge_channels ... FAILED
test stream::zip::array::tests::zip_array_3 ... ok
test stream::zip::tuple::tests::zip_tuple_3 ... ok
test utils::poll_state::vec::tests::boxed_does_not_allocate_twice ... ok
test stream::zip::vec::tests::zip_array_3 ... ok
test utils::poll_state::vec::tests::type_size ... ok
test utils::wakers::vec::readiness_vec::test::resize ... ok

failures:

---- future::join::tuple::test::does_not_leak_memory stdout ----
thread 'future::join::tuple::test::does_not_leak_memory' panicked at 'assertion failed: *flag.borrow()', src/future/join/tuple.rs:367:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

---- future::join::tuple::test::join_3 stdout ----
thread 'future::join::tuple::test::join_3' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13

---- future::join::tuple::test::join_1 stdout ----
thread 'future::join::tuple::test::join_1' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13

---- future::join::tuple::test::join_2 stdout ----
thread 'future::join::tuple::test::join_2' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13

---- future::try_join::tuple::test::all_ok stdout ----
thread 'future::try_join::tuple::test::all_ok' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13

---- future::try_join::tuple::test::does_not_leak_memory stdout ----
thread 'future::try_join::tuple::test::does_not_leak_memory' panicked at 'assertion failed: *flag.borrow()', src/future/try_join/tuple.rs:398:13

---- future::try_join::tuple::test::issue_135_resume_after_completion stdout ----
thread 'future::try_join::tuple::test::issue_135_resume_after_completion' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13

---- stream::merge::array::tests::merge_channels stdout ----
thread 'stream::merge::array::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13

---- stream::merge::tuple::tests::merge_channels stdout ----
thread 'stream::merge::tuple::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13

---- stream::merge::vec::tests::merge_channels stdout ----
thread 'stream::merge::vec::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13


failures:
    future::join::tuple::test::does_not_leak_memory
    future::join::tuple::test::join_1
    future::join::tuple::test::join_2
    future::join::tuple::test::join_3
    future::try_join::tuple::test::all_ok
    future::try_join::tuple::test::does_not_leak_memory
    future::try_join::tuple::test::issue_135_resume_after_completion
    stream::merge::array::tests::merge_channels
    stream::merge::tuple::tests::merge_channels
    stream::merge::vec::tests::merge_channels

test result: FAILED. 49 passed; 10 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.01s

error: test failed, to rerun pass `--lib`

@yoshuawuyts yoshuawuyts added the bug Something isn't working label Sep 19, 2023
@yoshuawuyts
Copy link
Owner

I'm trying to reproduce this bug today. And I believe the bug may actually be in Join, not TryJoin. The following test case fails:

use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};

async fn process_not_fail() -> Result<Vec<i32>, ()> {
    sleep(Duration::from_millis(100)).await;
    Ok(vec![ready(1), ready(2)].join().await)
}

async fn process_fail() -> Result<Vec<i32>, ()> {
    Err(())
}

#[tokio::test]
async fn test() {
    let res = (process_fail(), process_not_fail()).try_join().await;
    assert!(res.is_err());
}

But if we replace Vec::join with Array::join, then the error stops triggering:

use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};

async fn process_not_fail() -> Result<[i32; 2], ()> {
    sleep(Duration::from_millis(100)).await;
    Ok([ready(1), ready(2)].join().await) // <- changed this line
}

async fn process_fail() -> Result<[i32; 2], ()> {
    Err(())
}

#[tokio::test]
async fn test() {
    let res = (process_fail(), process_not_fail()).try_join().await;
    assert!(res.is_err());
}

I want to see if I can further reduce this to see whether I can trigger it without using try_join later on.

@yoshuawuyts
Copy link
Owner

yoshuawuyts commented Sep 22, 2023

If we replace the try join of the tuple with a try join of an array, we trigger the same error. This should make this easier to debug since we're no longer needing to track down the source of the error through macro frames in the backtrace.

use futures_concurrency::future::{Join, TryJoin};
use futures_core::Future;
use std::{future::ready, pin::Pin};
use tokio::time::{sleep, Duration};

pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

async fn process_not_fail() -> Result<Vec<i32>, ()> {
    sleep(Duration::from_millis(100)).await;
    Ok(vec![ready(1), ready(2)].join().await)
}

async fn process_fail() -> Result<Vec<i32>, ()> {
    Err(())
}

#[tokio::test]
async fn test() {
    let a: BoxFuture<'static, _> = Box::pin(process_fail());
    let b: BoxFuture<'static, _> = Box::pin(process_not_fail());
    let res = [a, b].try_join().await;
    assert!(res.is_err());
}

@yoshuawuyts
Copy link
Owner

Ha, I found the bug. It turns out the error was in all of the implementations of TryJoin, not just the tuple one. Fixing it up for all impls and filing a patch now!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants