diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 42a1e056..9ab1f0ac 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -4,6 +4,8 @@ * Check service readiness once per decoded item +* Run un-readiness check in separate task + ## [2.8.2] - 2024-11-05 * Do not rely on not_ready(), always check service readiness diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 6f5faacf..965a9029 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -19,7 +19,8 @@ path = "src/lib.rs" ntex-codec = "0.6" ntex-bytes = "0.1" ntex-util = "2.5" -ntex-service = "3.3" +ntex-service = "3.3.3" +ntex-rt = "0.4" bitflags = "2" log = "0.4" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index aa0d61f0..e2c1de36 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -1,7 +1,7 @@ //! Framed transport dispatcher #![allow(clippy::let_underscore_future)] use std::task::{ready, Context, Poll}; -use std::{cell::Cell, future::Future, pin::Pin, rc::Rc}; +use std::{cell::Cell, future::poll_fn, future::Future, pin::Pin, rc::Rc}; use ntex_codec::{Decoder, Encoder}; use ntex_service::{IntoService, Pipeline, PipelineBinding, PipelineCall, Service}; @@ -126,12 +126,12 @@ pin_project_lite::pin_project! { bitflags::bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] struct Flags: u8 { - const READY_ERR = 0b000001; - const IO_ERR = 0b000010; - const KA_ENABLED = 0b000100; - const KA_TIMEOUT = 0b001000; - const READ_TIMEOUT = 0b010000; - const READY = 0b100000; + const READY_ERR = 0b0000001; + const IO_ERR = 0b0000010; + const KA_ENABLED = 0b0000100; + const KA_TIMEOUT = 0b0001000; + const READ_TIMEOUT = 0b0010000; + const READY_TASK = 0b1000000; } } @@ -160,7 +160,8 @@ where codec: U, service: PipelineBinding>, error: Cell::Error>>>, - inflight: Cell, + inflight: Cell, + ready: Cell, } #[derive(Copy, Clone, Debug)] @@ -222,6 +223,7 @@ where codec, error: Cell::new(None), inflight: Cell::new(0), + ready: Cell::new(false), service: Pipeline::new(service.into_service()).bind(), }); @@ -282,6 +284,12 @@ where } } + // ready task + if slf.flags.contains(Flags::READY_TASK) { + slf.flags.insert(Flags::READY_TASK); + ntex_rt::spawn(not_ready(slf.shared.clone())); + } + loop { match slf.st { DispatcherState::Processing => { @@ -342,7 +350,7 @@ where PollService::Continue => continue, }; - slf.flags.remove(Flags::READY); + slf.shared.ready.set(false); slf.call_service(cx, item); } // handle write back-pressure @@ -472,19 +480,14 @@ where } fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.flags.contains(Flags::READY) { - if self.shared.service.poll_not_ready(cx).is_ready() { - self.flags.remove(Flags::READY); - } else { - return Poll::Ready(self.check_error()); - } + if self.shared.ready.get() { + return Poll::Ready(self.check_error()); } // wait until service becomes ready match self.shared.service.poll_ready(cx) { Poll::Ready(Ok(_)) => { - self.flags.insert(Flags::READY); - let _ = self.shared.service.poll_not_ready(cx); + self.shared.ready.set(true); Poll::Ready(self.check_error()) } // pause io read task @@ -625,6 +628,30 @@ where } } +async fn not_ready(slf: Rc>) +where + S: Service, Response = Option>> + 'static, + U: Encoder + Decoder + 'static, +{ + let pl = slf.service.clone(); + loop { + if !pl.is_shutdown() { + if let Err(err) = poll_fn(|cx| pl.poll_ready(cx)).await { + log::trace!("{}: Service readiness check failed, stopping", slf.io.tag()); + slf.error.set(Some(DispatcherError::Service(err))); + break; + } + if !pl.is_shutdown() { + poll_fn(|cx| pl.poll_not_ready(cx)).await; + slf.ready.set(false); + slf.io.wake(); + continue; + } + } + break; + } +} + #[cfg(test)] mod tests { use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex}; @@ -724,6 +751,7 @@ mod tests { io: state.into(), error: Cell::new(None), inflight: Cell::new(0), + ready: Cell::new(false), service: Pipeline::new(service).bind(), }); diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index 7087f392..53d3b3cf 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [3.3.3] - 2024-11-10 + +* Add Pipeline::is_shutdown() helper + ## [3.3.2] - 2024-11-10 * Fix un-needed wakeups for unready future diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 3713a98a..f23fccd9 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "3.3.2" +version = "3.3.3" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index 8dedbbe5..ed4de4ee 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -12,6 +12,7 @@ pub struct ServiceCtx<'a, S: ?Sized> { #[derive(Debug)] pub(crate) struct WaitersRef { cur: cell::Cell, + shutdown: cell::Cell, wakers: cell::UnsafeCell>, indexes: cell::UnsafeCell>>, } @@ -19,10 +20,15 @@ pub(crate) struct WaitersRef { impl WaitersRef { pub(crate) fn new() -> (u32, Self) { let mut waiters = slab::Slab::new(); + + // first insert for wake ups from services + let _ = waiters.insert(None); + ( waiters.insert(Default::default()) as u32, WaitersRef { cur: cell::Cell::new(u32::MAX), + shutdown: cell::Cell::new(false), indexes: cell::UnsafeCell::new(waiters), wakers: cell::UnsafeCell::new(Vec::default()), }, @@ -62,6 +68,18 @@ impl WaitersRef { self.get()[idx as usize] = Some(cx.waker().clone()); } + pub(crate) fn register_unready(&self, cx: &mut Context<'_>) { + self.get()[0] = Some(cx.waker().clone()); + } + + pub(crate) fn notify_unready(&self) { + if let Some(item) = self.get().get_mut(0) { + if let Some(waker) = item.take() { + waker.wake(); + } + } + } + pub(crate) fn notify(&self) { let wakers = self.get_wakers(); if !wakers.is_empty() { @@ -90,6 +108,14 @@ impl WaitersRef { false } } + + pub(crate) fn shutdown(&self) { + self.shutdown.set(true); + } + + pub(crate) fn is_shutdown(&self) -> bool { + self.shutdown.get() + } } impl<'a, S> ServiceCtx<'a, S> { diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 791f67b2..243ce885 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -110,6 +110,12 @@ impl Pipeline { } } + #[inline] + /// Check if shutdown is initiated. + pub fn is_shutdown(&self) -> bool { + self.state.waiters.is_shutdown() + } + #[inline] /// Shutdown enclosed service. pub async fn shutdown(&self) @@ -202,6 +208,12 @@ where &self.pl.state.svc } + #[inline] + /// Get pipeline + pub fn pipeline(&self) -> Pipeline { + self.pl.clone() + } + #[inline] /// Returns `Ready` when the pipeline is able to process requests. /// @@ -263,6 +275,8 @@ where // `self` is alive let pl: &'static Pipeline = unsafe { std::mem::transmute(&self.pl) }; *st = State::Shutdown(Box::pin(async move { pl.shutdown().await })); + pl.state.waiters.shutdown(); + pl.state.waiters.notify_unready(); self.poll_shutdown(cx) } State::Shutdown(ref mut fut) => Pin::new(fut).poll(cx), @@ -300,6 +314,12 @@ where } } + #[inline] + /// Check if shutdown is initiated. + pub fn is_shutdown(&self) -> bool { + self.pl.state.waiters.is_shutdown() + } + #[inline] /// Shutdown enclosed service. pub async fn shutdown(&self) { @@ -449,14 +469,14 @@ struct CheckUnReadiness { impl Unpin for CheckUnReadiness {} -impl Future for CheckUnReadiness +impl Future for CheckUnReadiness where F: Fn(&'static Pipeline) -> Fut, - Fut: Future, + Fut: Future, { - type Output = T; + type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let mut slf = self.as_mut(); if slf.fut.is_none() { @@ -464,11 +484,18 @@ where } let fut = slf.fut.as_mut().unwrap(); match unsafe { Pin::new_unchecked(fut) }.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(res) => { + Poll::Pending => { + if slf.pl.state.waiters.is_shutdown() { + Poll::Ready(()) + } else { + slf.pl.state.waiters.register_unready(cx); + Poll::Pending + } + } + Poll::Ready(()) => { let _ = slf.fut.take(); slf.pl.state.waiters.notify(); - Poll::Ready(res) + Poll::Ready(()) } } }