Skip to content

Commit

Permalink
feat(mpsc): add support for statically-allocated MPSC channels (#23)
Browse files Browse the repository at this point in the history
This branch adds new `StaticChannel` types that can be used to construct
statically allocated MPSC channel variants. The static channels can be
used without requiring *any* heap allocations. This is intended
primarily for embedded systems and bare metal programming where
allocators may not be available or heap memory is constrained.

Closes #17

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Dec 25, 2021
1 parent b6dbfde commit 5b17c18
Show file tree
Hide file tree
Showing 18 changed files with 1,040 additions and 305 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
std = ["alloc"]
std = ["alloc", "parking_lot"]
alloc = []
default = ["std"]

[dependencies]
pin-project = "1"
parking_lot = { version = "0.11", optional = true }
parking_lot = { version = "0.11", optional = true, default-features = false }

[dev-dependencies]
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
Expand Down
8 changes: 4 additions & 4 deletions bench/benches/async_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ aaaaaaaaaaaaaa";
&senders,
|b, &senders| {
b.to_async(rt()).iter(|| async {
use thingbuf::{mpsc, ThingBuf};
let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(CAPACITY));
use thingbuf::mpsc;
let (tx, rx) = mpsc::channel::<String>(CAPACITY);
for _ in 0..senders {
let tx = tx.clone();
task::spawn(async move {
Expand Down Expand Up @@ -149,8 +149,8 @@ fn bench_mpsc_integer(c: &mut Criterion) {
&senders,
|b, &senders| {
b.to_async(rt()).iter(|| async {
use thingbuf::{mpsc, ThingBuf};
let (tx, rx) = mpsc::channel(ThingBuf::new(CAPACITY));
use thingbuf::mpsc;
let (tx, rx) = mpsc::channel::<i32>(CAPACITY);
for i in 0..senders {
let tx = tx.clone();
task::spawn(async move {
Expand Down
19 changes: 7 additions & 12 deletions bench/benches/async_spsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ aaaaaaaaaaaaaa";
group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
let rt = runtime::Builder::new_current_thread().build().unwrap();
b.to_async(rt).iter(|| async {
use thingbuf::{
mpsc::{self, TrySendError},
ThingBuf,
};
let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
use thingbuf::mpsc::{self, TrySendError};

let (tx, rx) = mpsc::channel::<String>(100);
task::spawn(async move {
loop {
match tx.try_send_ref() {
Expand Down Expand Up @@ -158,8 +156,8 @@ aaaaaaaaaaaaaa";
group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
let rt = runtime::Builder::new_current_thread().build().unwrap();
b.to_async(rt).iter(|| async {
use thingbuf::{mpsc, ThingBuf};
let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
use thingbuf::mpsc;
let (tx, rx) = mpsc::channel::<String>(100);
task::spawn(async move {
while let Ok(mut slot) = tx.send_ref().await {
slot.clear();
Expand Down Expand Up @@ -267,11 +265,8 @@ fn bench_spsc_try_send_integer(c: &mut Criterion) {
group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
let rt = runtime::Builder::new_current_thread().build().unwrap();
b.to_async(rt).iter(|| async {
use thingbuf::{
mpsc::{self, TrySendError},
ThingBuf,
};
let (tx, rx) = mpsc::channel(ThingBuf::new(100));
use thingbuf::mpsc::{self, TrySendError};
let (tx, rx) = mpsc::channel(100);
task::spawn(async move {
let mut i = 0;
loop {
Expand Down
11 changes: 4 additions & 7 deletions bench/benches/sync_spsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ aaaaaaaaaaaaaa";
group.throughput(Throughput::Elements(size));
group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
b.iter(|| {
use thingbuf::{
mpsc::{sync, TrySendError},
ThingBuf,
};
let (tx, rx) = sync::channel(ThingBuf::<String>::new(100));
use thingbuf::mpsc::{sync, TrySendError};
let (tx, rx) = sync::channel::<String>(100);
let producer = thread::spawn(move || loop {
match tx.try_send_ref() {
Ok(mut slot) => {
Expand Down Expand Up @@ -108,8 +105,8 @@ aaaaaaaaaaaaaa";
group.throughput(Throughput::Elements(size));
group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
b.iter(|| {
use thingbuf::{mpsc::sync, ThingBuf};
let (tx, rx) = sync::channel(ThingBuf::<String>::new(100));
use thingbuf::mpsc::sync;
let (tx, rx) = sync::channel::<String>(100);
let producer = thread::spawn(move || {
while let Ok(mut slot) = tx.send_ref() {
slot.clear();
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ feature! {

mod stringbuf;
pub use stringbuf::{StaticStringBuf, StringBuf};

pub mod mpsc;
}

pub mod mpsc;

mod static_thingbuf;
pub use self::static_thingbuf::StaticThingBuf;

Expand Down
81 changes: 57 additions & 24 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
use crate::{
loom::{atomic::AtomicUsize, hint},
wait::{Notify, WaitCell, WaitQueue, WaitResult},
Ref, ThingBuf,
Core, Ref, Slot,
};
use core::fmt;
use core::pin::Pin;
use core::task::Poll;
use core::{fmt, ops::Index, task::Poll};

#[derive(Debug)]
#[non_exhaustive]
Expand All @@ -30,8 +28,8 @@ pub enum TrySendError<T = ()> {
pub struct Closed<T = ()>(T);

#[derive(Debug)]
struct Inner<T, N: Notify> {
thingbuf: ThingBuf<T>,
struct ChannelCore<N> {
core: Core,
rx_wait: WaitCell<N>,
tx_count: AtomicUsize,
tx_wait: WaitQueue<N>,
Expand Down Expand Up @@ -72,38 +70,63 @@ impl TrySendError {
}

// ==== impl Inner ====
impl<T, N: Notify + Unpin> Inner<T, N> {
fn new(thingbuf: ThingBuf<T>) -> Self {
impl<N> ChannelCore<N> {
#[cfg(not(loom))]
const fn new(capacity: usize) -> Self {
Self {
thingbuf,
core: Core::new(capacity),
rx_wait: WaitCell::new(),
tx_count: AtomicUsize::new(1),
tx_wait: WaitQueue::new(),
}
}

#[cfg(loom)]
fn new(capacity: usize) -> Self {
Self {
core: Core::new(capacity),
rx_wait: WaitCell::new(),
tx_count: AtomicUsize::new(1),
tx_wait: WaitQueue::new(),
}
}
}

impl<N> ChannelCore<N>
where
N: Notify + Unpin,
{
fn close_rx(&self) {
if self.thingbuf.core.close() {
if self.core.close() {
crate::loom::hint::spin_loop();
test_println!("draining_queue");
self.tx_wait.close();
}
}
}

impl<T: Default, N: Notify + Unpin> Inner<T, N> {
fn try_send_ref(&self) -> Result<SendRefInner<'_, T, N>, TrySendError> {
self.thingbuf
.core
.push_ref(self.thingbuf.slots.as_ref())
.map(|slot| SendRefInner {
_notify: NotifyRx(&self.rx_wait),
slot,
})
impl<N> ChannelCore<N>
where
N: Notify + Unpin,
{
fn try_send_ref<'a, T>(
&'a self,
slots: &'a [Slot<T>],
) -> Result<SendRefInner<'a, T, N>, TrySendError>
where
T: Default,
{
self.core.push_ref(slots).map(|slot| SendRefInner {
_notify: NotifyRx(&self.rx_wait),
slot,
})
}

fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
match self.try_send_ref() {
fn try_send<T>(&self, slots: &[Slot<T>], val: T) -> Result<(), TrySendError<T>>
where
T: Default,
{
match self.try_send_ref(slots) {
Ok(mut slot) => {
slot.with_mut(|slot| *slot = val);
Ok(())
Expand All @@ -117,11 +140,19 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
/// The loop itself has to be written in the actual `send` method's
/// implementation, rather than on `inner`, because it might be async and
/// may yield, or might park the thread.
fn poll_recv_ref(&self, mk_waiter: impl Fn() -> N) -> Poll<Option<Ref<'_, T>>> {
fn poll_recv_ref<'a, T, S>(
&'a self,
slots: &'a S,
mk_waiter: impl Fn() -> N,
) -> Poll<Option<Ref<'a, T>>>
where
S: Index<usize, Output = Slot<T>> + ?Sized,
T: Default,
{
macro_rules! try_poll_recv {
() => {
// If we got a value, return it!
match self.thingbuf.core.pop_ref(self.thingbuf.slots.as_ref()) {
match self.core.pop_ref(slots) {
Ok(slot) => return Poll::Ready(Some(slot)),
Err(TrySendError::Closed(_)) => return Poll::Ready(None),
_ => {}
Expand Down Expand Up @@ -151,7 +182,7 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
// the channel is closed (all the receivers are dropped).
// however, there may be messages left in the queue. try
// popping from the queue until it's empty.
return Poll::Ready(self.thingbuf.pop_ref());
return Poll::Ready(self.core.pop_ref(slots).ok());
}
WaitResult::Notified => {
// we were notified while we were trying to register the
Expand All @@ -163,6 +194,8 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
}
}

// === impl SendRefInner ===

impl<T, N: Notify> core::ops::Deref for SendRefInner<'_, T, N> {
type Target = T;

Expand Down
Loading

0 comments on commit 5b17c18

Please sign in to comment.