diff --git a/Cargo.toml b/Cargo.toml index af220ae..25262f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" [dependencies] async-channel = "1.7.1" +# TODO: we could eventually use `futures_util::future::CatchUnwind` +# futures-util = { version = "0.3.24", default-features = false } tokio = { version = "1.20.1", features = ["rt", "sync"] } [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 40734cd..95784d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,29 +3,42 @@ use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use async_channel::{Receiver, Sender, TrySendError}; +use async_channel::{Receiver, Sender}; use tokio::runtime::Handle; use tokio::sync::oneshot; use tokio::task::JoinHandle; +pub use async_channel::TrySendError; +pub use oneshot::Receiver as QueueReceiver; + type BoxFuture = Pin + Send + 'static>>; +/// A task queue with a bounded number of workers. pub struct Queue { workers: Vec>, sender: Sender, tasks_running: Arc, } +/// Statistics about the current queue state. #[non_exhaustive] #[derive(Debug)] pub struct Stats { + /// This is the total queue capacity that it was configured with. pub queue_capacity: Option, + /// The current length of the queue. pub queue_len: usize, + /// The number of workers the queue was configured with. pub workers: usize, + /// The number currently active workers processing tasks. pub active_workers: usize, } impl Queue { + /// Creates a new Task Queue. + /// + /// This immediately spawns `workers` onto `runtime`, and optionally allows + /// specifying a bound for its task queue. pub fn new(runtime: Handle, workers: usize, queue_capacity: Option) -> Self { let tasks_running = Arc::new(AtomicUsize::new(0)); let (sender, receiver) = match queue_capacity { @@ -44,21 +57,27 @@ impl Queue { } } + /// Consumes and closes the task queue. + /// + /// This waits for all the remaining tasks to be processed. pub async fn close(self) { let Self { workers, sender, .. } = self; + sender.close(); + // dropping the sender would also implicitly close it drop(sender); for worker in workers { // NOTE: // `JoinError` has two variants internally: // 1) a panic 2) cancellation. - // We never cancel the worker tasks ourselves, though they might - // have panic-ed. + // We never cancel the worker tasks ourselves. + // But the tasks themselves could have panicked. worker.await.unwrap(); } } + /// Returns statistics about the current state of the task queue. pub fn stats(&self) -> Stats { let active_workers = self.tasks_running.load(Ordering::Relaxed); Stats { @@ -69,13 +88,18 @@ impl Queue { } } - pub fn enqueue(&self, future: F) -> Result, TrySendError<()>> + /// Enqueue a new `future` onto this queue. + /// + /// Returns either a [`QueueReceiver`] if a task was successfully + /// enqueued, or a [`TrySendError`] if the queue is full. + pub fn enqueue(&self, future: F) -> Result, TrySendError<()>> where F: Future + Send + 'static, T: Send + 'static, { let (send, receiver) = oneshot::channel(); match self.sender.try_send(Box::pin(async move { + // XXX: `oneshot::Sender` is not `UnwindSafe` itself. send.send(future.await).ok(); })) { Ok(_) => Ok(receiver), @@ -93,6 +117,9 @@ impl Queue { tasks_running.fetch_add(1, Ordering::Relaxed); + // TODO: once our `BoxFuture` are `UnwindSafe`, we could use the + // `futures_util::future::CatchUnwind` utility to catch panics in + // this worker. future.await; tasks_running.fetch_sub(1, Ordering::Relaxed);