Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: support mpsc send with &self #2861

Merged
merged 14 commits into from
Sep 25, 2020
4 changes: 3 additions & 1 deletion tokio-test/src/io.rs
Original file line number Diff line number Diff line change
@@ -201,7 +201,9 @@ impl Inner {
}

fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
self.rx.poll_recv(cx)
use futures_core::stream::Stream;

Pin::new(&mut self.rx).poll_next(cx)
}

fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
30 changes: 1 addition & 29 deletions tokio/src/signal/unix.rs
Original file line number Diff line number Diff line change
@@ -391,35 +391,7 @@ impl Signal {
poll_fn(|cx| self.poll_recv(cx)).await
}

/// Polls to receive the next signal notification event, outside of an
/// `async` context.
///
/// `None` is returned if no more events can be received by this stream.
///
/// # Examples
///
/// Polling from a manually implemented future
///
/// ```rust,no_run
/// use std::pin::Pin;
/// use std::future::Future;
/// use std::task::{Context, Poll};
/// use tokio::signal::unix::Signal;
///
/// struct MyFuture {
/// signal: Signal,
/// }
///
/// impl Future for MyFuture {
/// type Output = Option<()>;
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// println!("polling MyFuture");
/// self.signal.poll_recv(cx)
/// }
/// }
/// ```
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.rx.poll_recv(cx)
}
}
4 changes: 2 additions & 2 deletions tokio/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -269,8 +269,8 @@ pub trait StreamExt: Stream {
/// # #[tokio::main(basic_scheduler)]
/// async fn main() {
/// # time::pause();
/// let (mut tx1, rx1) = mpsc::channel(10);
/// let (mut tx2, rx2) = mpsc::channel(10);
/// let (tx1, rx1) = mpsc::channel(10);
/// let (tx2, rx2) = mpsc::channel(10);
///
/// let mut rx = rx1.merge(rx2);
///
4 changes: 2 additions & 2 deletions tokio/src/stream/stream_map.rs
Original file line number Diff line number Diff line change
@@ -57,8 +57,8 @@ use std::task::{Context, Poll};
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx1) = mpsc::channel(10);
/// let (mut tx2, rx2) = mpsc::channel(10);
/// let (tx1, rx1) = mpsc::channel(10);
/// let (tx2, rx2) = mpsc::channel(10);
///
/// tokio::spawn(async move {
/// tx1.send(1).await.unwrap();
8 changes: 6 additions & 2 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
@@ -165,7 +165,6 @@ impl Semaphore {
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
#[allow(dead_code)]
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock().unwrap();
// If the semaphore's permits counter has enough permits for an
@@ -185,6 +184,11 @@ impl Semaphore {
}
}

/// Returns true if the semaphore is closed
pub(crate) fn is_closed(&self) -> bool {
self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
carllerche marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
assert!(
num_permits as usize <= Self::MAX_PERMITS,
@@ -194,7 +198,7 @@ impl Semaphore {
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
let mut curr = self.permits.load(Acquire);
loop {
// Has the semaphore closed?git
// Has the semaphore closed?
if curr & Self::CLOSED > 0 {
return Err(TryAcquireError::Closed);
}
9 changes: 4 additions & 5 deletions tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -106,7 +106,7 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let (mut tx, mut rx) = mpsc::channel(100);
//! let (tx, mut rx) = mpsc::channel(100);
//!
//! tokio::spawn(async move {
//! for i in 0..10 {
@@ -150,7 +150,7 @@
//! for _ in 0..10 {
//! // Each task needs its own `tx` handle. This is done by cloning the
//! // original handle.
//! let mut tx = tx.clone();
//! let tx = tx.clone();
//!
//! tokio::spawn(async move {
//! tx.send(&b"data to write"[..]).await.unwrap();
@@ -213,7 +213,7 @@
//!
//! // Spawn tasks that will send the increment command.
//! for _ in 0..10 {
//! let mut cmd_tx = cmd_tx.clone();
//! let cmd_tx = cmd_tx.clone();
//!
//! join_handles.push(tokio::spawn(async move {
//! let (resp_tx, resp_rx) = oneshot::channel();
@@ -443,7 +443,6 @@ cfg_sync! {
pub mod oneshot;

pub(crate) mod batch_semaphore;
pub(crate) mod semaphore_ll;
mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};

@@ -470,7 +469,7 @@ cfg_not_sync! {

cfg_signal! {
pub(crate) mod mpsc;
pub(crate) mod semaphore_ll;
pub(crate) mod batch_semaphore;
}
}

Loading