Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

ref: Write some doc comments #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ edition = "2021"

[dependencies]
async-channel = "1.7.1"
# TODO: we could eventually use `futures_util::future::CatchUnwind`
# futures-util = { version = "0.3.24", default-features = false }
tokio = { version = "1.20.1", features = ["rt", "sync"] }

[dev-dependencies]
Expand Down
35 changes: 31 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,42 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use async_channel::{Receiver, Sender, TrySendError};
use async_channel::{Receiver, Sender};
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

pub use async_channel::TrySendError;
pub use oneshot::Receiver as QueueReceiver;

type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

/// A task queue with a bounded number of workers.
pub struct Queue {
workers: Vec<JoinHandle<()>>,
sender: Sender<BoxFuture>,
tasks_running: Arc<AtomicUsize>,
}

/// Statistics about the current queue state.
#[non_exhaustive]
#[derive(Debug)]
pub struct Stats {
/// This is the total queue capacity that it was configured with.
pub queue_capacity: Option<usize>,
/// The current length of the queue.
pub queue_len: usize,
/// The number of workers the queue was configured with.
pub workers: usize,
/// The number currently active workers processing tasks.
pub active_workers: usize,
}

impl Queue {
/// Creates a new Task Queue.
///
/// This immediately spawns `workers` onto `runtime`, and optionally allows
/// specifying a bound for its task queue.
pub fn new(runtime: Handle, workers: usize, queue_capacity: Option<usize>) -> Self {
let tasks_running = Arc::new(AtomicUsize::new(0));
let (sender, receiver) = match queue_capacity {
Expand All @@ -44,21 +57,27 @@ impl Queue {
}
}

/// Consumes and closes the task queue.
///
/// This waits for all the remaining tasks to be processed.
pub async fn close(self) {
let Self {
workers, sender, ..
} = self;
sender.close();
// dropping the sender would also implicitly close it
drop(sender);
for worker in workers {
// NOTE:
// `JoinError` has two variants internally:
// 1) a panic 2) cancellation.
// We never cancel the worker tasks ourselves, though they might
// have panic-ed.
// We never cancel the worker tasks ourselves.
// But the tasks themselves could have panic-d.
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
worker.await.unwrap();
}
}

/// Returns statistics about the current state of the task queue.
pub fn stats(&self) -> Stats {
let active_workers = self.tasks_running.load(Ordering::Relaxed);
Stats {
Expand All @@ -69,13 +88,18 @@ impl Queue {
}
}

pub fn enqueue<F, T>(&self, future: F) -> Result<oneshot::Receiver<T>, TrySendError<()>>
/// Enqueue a new `future` onto this queue.
///
/// Returns either a [`QueueReceiver`] when a task was successfully
/// enqueued, or a [`TrySendError`] when the queue is full.
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
pub fn enqueue<F, T>(&self, future: F) -> Result<QueueReceiver<T>, TrySendError<()>>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (send, receiver) = oneshot::channel();
match self.sender.try_send(Box::pin(async move {
// XXX: `oneshot::Sender` is not `UnwindSafe` itself.
send.send(future.await).ok();
})) {
Ok(_) => Ok(receiver),
Expand All @@ -93,6 +117,9 @@ impl Queue {

tasks_running.fetch_add(1, Ordering::Relaxed);

// TODO: once our `BoxFuture` are `UnwindSafe`, we could use the
// `futures_util::future::CatchUnwind` utility to catch panics in
// this worker.
future.await;

tasks_running.fetch_sub(1, Ordering::Relaxed);
Expand Down