-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ConcurrentStream usage with tokio leads to ACCESS_VIOLATION #182
Comments
Interesting, thanks for reporting it |
From rust-lang/futures-rs#2851 (comment):
IIRC, the tokio timer is |
I also noticed the unsoundness when looking through the code just now. I have put together a minimal test repro for the pin unsoundness in this ConcurrentStream vec collect impl. use std::{future::Future, marker::PhantomPinned, pin::pin, task::Poll};
use futures_concurrency::{
concurrent_stream::{ConcurrentStream, Consumer, ConsumerState},
future::Race,
};
use futures_executor::block_on;
use pin_project::pin_project;
#[pin_project]
struct PinCheck {
addr: usize,
#[pin]
_pinned: PhantomPinned,
}
impl PinCheck {
fn new() -> Self {
Self {
addr: 0,
_pinned: PhantomPinned,
}
}
}
impl Future for PinCheck {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let this = self.project();
let addr = this.addr as *mut usize as usize;
if *this.addr == 0 {
cx.waker().wake_by_ref();
*this.addr = addr;
Poll::Pending
} else {
assert_eq!(*this.addr, addr, "pinned value was moved.");
Poll::Ready(())
}
}
}
struct Tricky;
impl ConcurrentStream for Tricky {
type Item = ();
type Future = PinCheck;
async fn drive<C>(self, consumer: C) -> C::Output
where
C: Consumer<Self::Item, Self::Future>,
{
let mut consumer = pin!(consumer);
for _ in 0..64 {
match consumer.as_mut().send(PinCheck::new()).await {
ConsumerState::Break => return consumer.flush().await,
ConsumerState::Continue => continue,
ConsumerState::Empty => unreachable!(),
}
}
let progress = async { Some(consumer.as_mut().progress().await) };
let noop = async { None };
// poll progress once.
assert!((progress, noop).race().await.is_none());
// push new entry, reallocs internal futures slab.
// this moves the futures and violates pinning.
consumer.as_mut().send(PinCheck::new()).await;
consumer.flush().await
}
fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
todo!()
}
}
#[test]
fn it_works() {
block_on(async { Tricky.collect::<Vec<()>>().await });
} this currently outputs
|
I've been out on sick leave for the past several days. Just wanted to quickly acknowledge this is indeed an issue we should fix. I want to thank @inklesspen1rus for reporting this, and I wanted to thank everyone else in this thread helping narrow this issue down. |
Tried to use concurrent streams to sleep in parallel with tokio:
But sometimes I get crash:
Without "current_thread" flavor program just freeze
Other runtimes work fine:
async_std
smol
Also tokio runtime with smol Timer works fine:
Is that Tokio issue?
The text was updated successfully, but these errors were encountered: