Skip to content

Commit

Permalink
feat(blocking::mpsc): add Sender::send(_ref)_timeout methods (#79)
Browse files Browse the repository at this point in the history
Follow up to #75.

Co-authored-by: Eliza Weisman <[email protected]>
  • Loading branch information
utkarshgupta137 and hawkw authored Apr 28, 2023
1 parent 8181ea0 commit 979ed6e
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 4 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ some cases where you might be better off considering other options:
prefer a channel implementation that only allocates memory for messages as
it's needed (such as [`tokio::sync::mpsc`]).

- **You need a blocking channel with `send_timeout`** or **a blocking channel
with a `select` operation**. I'm probably not going to implement these things.
The blocking channel isn't particularly important to me compared to the async
channel, and I _probably_ won't add a bunch of additional APIs to it.
- **You need a blocking channel with a `select` operation**.
I'm probably not going to implement it. I _may_ accept a PR if you raise it.

If you need a synchronous channel with this kind of functionality,
[`crossbeam-channel`] is probably a good choice.
Expand Down
299 changes: 299 additions & 0 deletions src/mpsc/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,124 @@ feature! {
}
}

/// Reserves a slot in the channel to mutate in place, blocking until
/// there is a free slot to write to, waiting for at most `timeout`.
///
/// This is similar to the [`send_timeout`] method, but, rather than taking a
/// message by value to write to the channel, this method reserves a
/// writable slot in the channel, and returns a [`SendRef`] that allows
/// mutating the slot in place. If the [`StaticReceiver`] end of the
/// channel uses the [`StaticReceiver::recv_ref`] method for receiving
/// from the channel, this allows allocations for channel messages to be
/// reused in place.
///
/// # Errors
///
/// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed.
///
/// # Examples
///
/// Sending formatted strings by writing them directly to channel slots,
/// in place:
///
/// ```
/// use thingbuf::mpsc::{blocking::StaticChannel, errors::SendTimeoutError};
/// use std::{fmt::Write, time::Duration, thread};
///
/// static CHANNEL: StaticChannel<String, 1> = StaticChannel::new();
/// let (tx, rx) = CHANNEL.split();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// let msg = rx.recv_ref().unwrap();
/// println!("{}", msg);
/// thread::sleep(Duration::from_millis(500));
/// });
///
/// thread::spawn(move || {
/// let mut value = tx.send_ref_timeout(Duration::from_millis(200)).unwrap();
/// write!(value, "hello").expect("writing to a `String` should never fail");
/// thread::sleep(Duration::from_millis(400));
///
/// let mut value = tx.send_ref_timeout(Duration::from_millis(200)).unwrap();
/// write!(value, "world").expect("writing to a `String` should never fail");
/// thread::sleep(Duration::from_millis(400));
///
/// assert_eq!(
/// Err(&SendTimeoutError::Timeout(())),
/// tx.send_ref_timeout(Duration::from_millis(200)).as_deref().map(String::as_str)
/// );
/// });
/// ```
///
/// [`send_timeout`]: Self::send_timeout
#[cfg(not(all(test, loom)))]
pub fn send_ref_timeout(&self, timeout: Duration) -> Result<SendRef<'_, T>, SendTimeoutError> {
send_ref_timeout(self.core, self.slots, self.recycle, timeout)
}

/// Sends a message by value, blocking until there is a free slot to
/// write to, waiting for at most `timeout`.
///
/// This method takes the message by value, and replaces any previous
/// value in the slot. This means that the channel will *not* function
/// as an object pool while sending messages with `send_timeout`. This method is
/// most appropriate when messages don't own reusable heap allocations,
/// or when the [`StaticReceiver`] end of the channel must receive messages
/// by moving them out of the channel by value (using the
/// [`StaticReceiver::recv`] method). When messages in the channel own
/// reusable heap allocations (such as `String`s or `Vec`s), and the
/// [`StaticReceiver`] doesn't need to receive them by value, consider using
/// [`send_ref_timeout`] instead, to enable allocation reuse.
///
/// # Errors
///
/// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking::StaticChannel, errors::SendTimeoutError};
/// use std::{time::Duration, thread};
///
/// static CHANNEL: StaticChannel<i32, 1> = StaticChannel::new();
/// let (tx, rx) = CHANNEL.split();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// let msg = rx.recv().unwrap();
/// println!("{}", msg);
/// thread::sleep(Duration::from_millis(500));
/// });
///
/// thread::spawn(move || {
/// tx.send_timeout(1, Duration::from_millis(200)).unwrap();
/// thread::sleep(Duration::from_millis(400));
///
/// tx.send_timeout(2, Duration::from_millis(200)).unwrap();
/// thread::sleep(Duration::from_millis(400));
///
/// assert_eq!(
/// Err(SendTimeoutError::Timeout(3)),
/// tx.send_timeout(3, Duration::from_millis(200))
/// );
/// });
/// ```
///
/// [`send_ref_timeout`]: Self::send_ref_timeout
#[cfg(not(all(test, loom)))]
pub fn send_timeout(&self, val: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match self.send_ref_timeout(timeout) {
Err(e) => Err(e.with_value(val)),
Ok(mut slot) => {
*slot = val;
Ok(())
}
}
}

/// Attempts to reserve a slot in the channel to mutate in place,
/// without blocking until capacity is available.
///
Expand Down Expand Up @@ -586,6 +704,7 @@ feature! {
/// - [`Ok`]`(`[`RecvRef`]`<T>)` if a message was received.
/// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed.
///
/// # Examples
///
/// ```
Expand Down Expand Up @@ -643,6 +762,7 @@ feature! {
/// - [`Ok`]`(<T>)` if a message was received.
/// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed.
///
/// # Examples
///
/// ```
Expand Down Expand Up @@ -927,6 +1047,127 @@ where
}
}

/// Reserves a slot in the channel to mutate in place, blocking until
/// there is a free slot to write to, waiting for at most `timeout`.
///
/// This is similar to the [`send_timeout`] method, but, rather than taking a
/// message by value to write to the channel, this method reserves a
/// writable slot in the channel, and returns a [`SendRef`] that allows
/// mutating the slot in place. If the [`Receiver`] end of the channel
/// uses the [`Receiver::recv_ref`] method for receiving from the channel,
/// this allows allocations for channel messages to be reused in place.
///
/// # Errors
///
/// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed.
///
/// # Examples
///
/// Sending formatted strings by writing them directly to channel slots,
/// in place:
///
/// ```
/// use thingbuf::mpsc::{blocking, errors::SendTimeoutError};
/// use std::{fmt::Write, time::Duration, thread};
///
/// let (tx, rx) = blocking::channel::<String>(1);
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// let msg = rx.recv_ref().unwrap();
/// println!("{}", msg);
/// thread::sleep(Duration::from_millis(500));
/// });
///
/// thread::spawn(move || {
/// let mut value = tx.send_ref_timeout(Duration::from_millis(200)).unwrap();
/// write!(value, "hello").expect("writing to a `String` should never fail");
/// thread::sleep(Duration::from_millis(400));
///
/// let mut value = tx.send_ref_timeout(Duration::from_millis(200)).unwrap();
/// write!(value, "world").expect("writing to a `String` should never fail");
/// thread::sleep(Duration::from_millis(400));
///
/// assert_eq!(
/// Err(&SendTimeoutError::Timeout(())),
/// tx.send_ref_timeout(Duration::from_millis(200)).as_deref().map(String::as_str)
/// );
/// });
/// ```
///
/// [`send_timeout`]: Self::send_timeout
#[cfg(not(all(test, loom)))]
pub fn send_ref_timeout(&self, timeout: Duration) -> Result<SendRef<'_, T>, SendTimeoutError> {
send_ref_timeout(
&self.inner.core,
self.inner.slots.as_ref(),
&self.inner.recycle,
timeout,
)
}

/// Sends a message by value, blocking until there is a free slot to
/// write to, for at most `timeout`.
///
/// This method takes the message by value, and replaces any previous
/// value in the slot. This means that the channel will *not* function
/// as an object pool while sending messages with `send_timeout`. This method is
/// most appropriate when messages don't own reusable heap allocations,
/// or when the [`Receiver`] end of the channel must receive messages by
/// moving them out of the channel by value (using the
/// [`Receiver::recv`] method). When messages in the channel own
/// reusable heap allocations (such as `String`s or `Vec`s), and the
/// [`Receiver`] doesn't need to receive them by value, consider using
/// [`send_ref_timeout`] instead, to enable allocation reuse.
///
///
/// # Errors
///
/// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking, errors::SendTimeoutError};
/// use std::{time::Duration, thread};
///
/// let (tx, rx) = blocking::channel(1);
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// let msg = rx.recv().unwrap();
/// println!("{}", msg);
/// thread::sleep(Duration::from_millis(500));
/// });
///
/// thread::spawn(move || {
/// tx.send_timeout(1, Duration::from_millis(200)).unwrap();
/// thread::sleep(Duration::from_millis(400));
///
/// tx.send_timeout(2, Duration::from_millis(200)).unwrap();
/// thread::sleep(Duration::from_millis(400));
///
/// assert_eq!(
/// Err(SendTimeoutError::Timeout(3)),
/// tx.send_timeout(3, Duration::from_millis(200))
/// );
/// });
/// ```
///
/// [`send_ref_timeout`]: Self::send_ref_timeout
#[cfg(not(all(test, loom)))]
pub fn send_timeout(&self, val: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match self.send_ref_timeout(timeout) {
Err(e) => Err(e.with_value(val)),
Ok(mut slot) => {
*slot = val;
Ok(())
}
}
}

/// Attempts to reserve a slot in the channel to mutate in place,
/// without blocking until capacity is available.
///
Expand Down Expand Up @@ -1149,6 +1390,7 @@ impl<T, R> Receiver<T, R> {
/// - [`Ok`]`(`[`RecvRef`]`<T>)` if a message was received.
/// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed.
///
/// # Examples
///
/// ```
Expand Down Expand Up @@ -1454,3 +1696,60 @@ fn send_ref<'a, T, R: Recycle<T>>(
}
}
}

#[cfg(not(all(test, loom)))]
#[inline]
fn send_ref_timeout<'a, T, R: Recycle<T>>(
core: &'a ChannelCore<Thread>,
slots: &'a [Slot<T>],
recycle: &'a R,
timeout: Duration,
) -> Result<SendRef<'a, T>, SendTimeoutError> {
// fast path: avoid getting the thread and constructing the node if the
// slot is immediately ready.
match core.try_send_ref(slots, recycle) {
Ok(slot) => return Ok(SendRef(slot)),
Err(TrySendError::Closed(_)) => return Err(SendTimeoutError::Closed(())),
_ => {}
}

let mut waiter = queue::Waiter::new();
let mut unqueued = true;
let thread = thread::current();
let mut boff = Backoff::new();
let beginning_park = Instant::now();
loop {
let node = unsafe {
// Safety: in this case, it's totally safe to pin the waiter, as
// it is owned uniquely by this function, and it cannot possibly
// be moved while this thread is parked.
Pin::new_unchecked(&mut waiter)
};

let wait = if unqueued {
test_dbg!(core.tx_wait.start_wait(node, &thread))
} else {
test_dbg!(core.tx_wait.continue_wait(node, &thread))
};

match wait {
WaitResult::Closed => return Err(SendTimeoutError::Closed(())),
WaitResult::Notified => {
boff.spin_yield();
match core.try_send_ref(slots.as_ref(), recycle) {
Ok(slot) => return Ok(SendRef(slot)),
Err(TrySendError::Closed(_)) => return Err(SendTimeoutError::Closed(())),
_ => {}
}
}
WaitResult::Wait => {
unqueued = false;
thread::park_timeout(timeout);
let elapsed = beginning_park.elapsed();
if elapsed >= timeout {
return Err(SendTimeoutError::Timeout(()));
}
}
}
}
}
Loading

0 comments on commit 979ed6e

Please sign in to comment.