Skip to content

Commit

Permalink
sync: support mpsc send with &self
Browse files Browse the repository at this point in the history
Updates the mpsc channel to use the intrusive waker based sempahore.
This enables using `Sender` with `&self`.

Instead of using `Sender::poll_ready` to ensure capacity and updating
the `Sender` state, `async fn Sender::reserve()` is added. This function
returns a `Permit` value representing the reserved capacity.

Fixes: #2637
Refs: #2718 (intrusive waiters)
  • Loading branch information
carllerche committed Sep 22, 2020
1 parent e7091fd commit 2b50675
Show file tree
Hide file tree
Showing 14 changed files with 362 additions and 2,498 deletions.
4 changes: 3 additions & 1 deletion tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ impl Inner {
}

fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
self.rx.poll_recv(cx)
use futures_core::stream::Stream;

Pin::new(&mut self.rx).poll_next(cx)
}

fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/signal/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,11 @@ impl Signal {
/// }
/// ```
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
/*
let _ = self.driver.poll(cx);
self.rx.poll_recv(cx)
*/
unimplemented!();
}
}

Expand Down
8 changes: 6 additions & 2 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ impl Semaphore {
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
#[allow(dead_code)]
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock().unwrap();
// If the semaphore's permits counter has enough permits for an
Expand All @@ -185,6 +184,11 @@ impl Semaphore {
}
}

/// Returns true if the semaphore is closed
pub(crate) fn is_closed(&self) -> bool {
self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
}

pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
assert!(
num_permits as usize <= Self::MAX_PERMITS,
Expand All @@ -194,7 +198,7 @@ impl Semaphore {
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
let mut curr = self.permits.load(Acquire);
loop {
// Has the semaphore closed?git
// Has the semaphore closed?
if curr & Self::CLOSED > 0 {
return Err(TryAcquireError::Closed);
}
Expand Down
1 change: 0 additions & 1 deletion tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ cfg_sync! {
pub mod oneshot;

pub(crate) mod batch_semaphore;
pub(crate) mod semaphore_ll;
mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};

Expand Down
Loading

0 comments on commit 2b50675

Please sign in to comment.