From 3c7bef3b6f6b6c3ec780e5e2db12c9d5795c1b80 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 1 Aug 2022 14:28:23 -0700 Subject: [PATCH] feat(server): remove the high-level Server API (#2932) This removes `hyper::Server`, and it's related parts: - `hyper::server::Builder` - `hyper::server::accept` - `hyper::service::make_service_fn` New utilities for managing servers will exist in `hyper-util`. --- src/common/drain.rs | 217 ------------- src/common/exec.rs | 36 --- src/common/mod.rs | 2 - src/error.rs | 26 -- src/lib.rs | 2 - src/server/accept.rs | 71 ----- src/server/conn.rs | 7 - src/server/mod.rs | 37 +-- src/server/server.rs | 622 -------------------------------------- src/server/server_stub.rs | 16 - src/server/shutdown.rs | 128 -------- src/service/make.rs | 115 +------ src/service/mod.rs | 16 +- 13 files changed, 7 insertions(+), 1288 deletions(-) delete mode 100644 src/common/drain.rs delete mode 100644 src/server/accept.rs delete mode 100644 src/server/server.rs delete mode 100644 src/server/server_stub.rs delete mode 100644 src/server/shutdown.rs diff --git a/src/common/drain.rs b/src/common/drain.rs deleted file mode 100644 index 174da876df..0000000000 --- a/src/common/drain.rs +++ /dev/null @@ -1,217 +0,0 @@ -use std::mem; - -use pin_project_lite::pin_project; -use tokio::sync::watch; - -use super::{task, Future, Pin, Poll}; - -pub(crate) fn channel() -> (Signal, Watch) { - let (tx, rx) = watch::channel(()); - (Signal { tx }, Watch { rx }) -} - -pub(crate) struct Signal { - tx: watch::Sender<()>, -} - -pub(crate) struct Draining(Pin + Send + Sync>>); - -#[derive(Clone)] -pub(crate) struct Watch { - rx: watch::Receiver<()>, -} - -pin_project! { - #[allow(missing_debug_implementations)] - pub struct Watching { - #[pin] - future: F, - state: State, - watch: Pin + Send + Sync>>, - _rx: watch::Receiver<()>, - } -} - -enum State { - Watch(F), - Draining, -} - -impl Signal { - pub(crate) fn drain(self) -> Draining { - let _ = self.tx.send(()); - Draining(Box::pin(async move { self.tx.closed().await })) - } -} - -impl Future for Draining { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - Pin::new(&mut self.as_mut().0).poll(cx) - } -} - -impl Watch { - pub(crate) fn watch(self, future: F, on_drain: FN) -> Watching - where - F: Future, - FN: FnOnce(Pin<&mut F>), - { - let Self { mut rx } = self; - let _rx = rx.clone(); - Watching { - future, - state: State::Watch(on_drain), - watch: Box::pin(async move { - let _ = rx.changed().await; - }), - // Keep the receiver alive until the future completes, so that - // dropping it can signal that draining has completed. - _rx, - } - } -} - -impl Future for Watching -where - F: Future, - FN: FnOnce(Pin<&mut F>), -{ - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut me = self.project(); - loop { - match mem::replace(me.state, State::Draining) { - State::Watch(on_drain) => { - match Pin::new(&mut me.watch).poll(cx) { - Poll::Ready(()) => { - // Drain has been triggered! - on_drain(me.future.as_mut()); - } - Poll::Pending => { - *me.state = State::Watch(on_drain); - return me.future.poll(cx); - } - } - } - State::Draining => return me.future.poll(cx), - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - struct TestMe { - draining: bool, - finished: bool, - poll_cnt: usize, - } - - impl Future for TestMe { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll { - self.poll_cnt += 1; - if self.finished { - Poll::Ready(()) - } else { - Poll::Pending - } - } - } - - #[test] - fn watch() { - let mut mock = tokio_test::task::spawn(()); - mock.enter(|cx, _| { - let (tx, rx) = channel(); - let fut = TestMe { - draining: false, - finished: false, - poll_cnt: 0, - }; - - let mut watch = rx.watch(fut, |mut fut| { - fut.draining = true; - }); - - assert_eq!(watch.future.poll_cnt, 0); - - // First poll should poll the inner future - assert!(Pin::new(&mut watch).poll(cx).is_pending()); - assert_eq!(watch.future.poll_cnt, 1); - - // Second poll should poll the inner future again - assert!(Pin::new(&mut watch).poll(cx).is_pending()); - assert_eq!(watch.future.poll_cnt, 2); - - let mut draining = tx.drain(); - // Drain signaled, but needs another poll to be noticed. - assert!(!watch.future.draining); - assert_eq!(watch.future.poll_cnt, 2); - - // Now, poll after drain has been signaled. - assert!(Pin::new(&mut watch).poll(cx).is_pending()); - assert_eq!(watch.future.poll_cnt, 3); - assert!(watch.future.draining); - - // Draining is not ready until watcher completes - assert!(Pin::new(&mut draining).poll(cx).is_pending()); - - // Finishing up the watch future - watch.future.finished = true; - assert!(Pin::new(&mut watch).poll(cx).is_ready()); - assert_eq!(watch.future.poll_cnt, 4); - drop(watch); - - assert!(Pin::new(&mut draining).poll(cx).is_ready()); - }) - } - - #[test] - fn watch_clones() { - let mut mock = tokio_test::task::spawn(()); - mock.enter(|cx, _| { - let (tx, rx) = channel(); - - let fut1 = TestMe { - draining: false, - finished: false, - poll_cnt: 0, - }; - let fut2 = TestMe { - draining: false, - finished: false, - poll_cnt: 0, - }; - - let watch1 = rx.clone().watch(fut1, |mut fut| { - fut.draining = true; - }); - let watch2 = rx.watch(fut2, |mut fut| { - fut.draining = true; - }); - - let mut draining = tx.drain(); - - // Still 2 outstanding watchers - assert!(Pin::new(&mut draining).poll(cx).is_pending()); - - // drop 1 for whatever reason - drop(watch1); - - // Still not ready, 1 other watcher still pending - assert!(Pin::new(&mut draining).poll(cx).is_pending()); - - drop(watch2); - - // Now all watchers are gone, draining is complete - assert!(Pin::new(&mut draining).poll(cx).is_ready()); - }); - } -} diff --git a/src/common/exec.rs b/src/common/exec.rs index 29f374de21..1cc7036001 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -3,28 +3,17 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; -#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -use crate::body::Body; #[cfg(feature = "server")] use crate::body::HttpBody; #[cfg(all(feature = "http2", feature = "server"))] use crate::proto::h2::server::H2Stream; use crate::rt::Executor; -#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -use crate::server::server::{new_svc::NewSvcTask, Watcher}; -#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -use crate::service::HttpService; #[cfg(feature = "server")] pub trait ConnStreamExec: Clone { fn execute_h2stream(&mut self, fut: H2Stream); } -#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -pub trait NewSvcExec, E, W: Watcher>: Clone { - fn execute_new_svc(&mut self, fut: NewSvcTask); -} - pub(crate) type BoxSendFuture = Pin + Send>>; // Either the user provides an executor for background tasks, or we use @@ -78,18 +67,6 @@ where } } -#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -impl NewSvcExec for Exec -where - NewSvcTask: Future + Send + 'static, - S: HttpService, - W: Watcher, -{ - fn execute_new_svc(&mut self, fut: NewSvcTask) { - self.execute(fut) - } -} - // ==== impl Executor ===== #[cfg(feature = "server")] @@ -104,19 +81,6 @@ where } } -#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -impl NewSvcExec for E -where - E: Executor> + Clone, - NewSvcTask: Future, - S: HttpService, - W: Watcher, -{ - fn execute_new_svc(&mut self, fut: NewSvcTask) { - self.execute(fut) - } -} - // If http2 is not enable, we just have a stub here, so that the trait bounds // that *would* have been needed are still checked. Why? // diff --git a/src/common/mod.rs b/src/common/mod.rs index f455aac093..68a81538f6 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -10,8 +10,6 @@ macro_rules! ready { pub(crate) mod buf; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] pub(crate) mod date; -#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -pub(crate) mod drain; #[cfg(any(feature = "http1", feature = "http2", feature = "server"))] pub(crate) mod exec; pub(crate) mod io; diff --git a/src/error.rs b/src/error.rs index 6594b3e037..2bf134de3a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -40,10 +40,6 @@ pub(super) enum Kind { /// Error creating a TcpListener. #[cfg(all(feature = "tcp", feature = "server"))] Listen, - /// Error accepting on an Incoming stream. - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "server")] - Accept, /// User took too long to send headers #[cfg(all(feature = "http1", feature = "server", feature = "runtime"))] HeaderTimeout, @@ -96,10 +92,6 @@ pub(super) enum User { Body, /// The user aborted writing of the outgoing body. BodyWriteAborted, - /// Error calling user's MakeService. - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "server")] - MakeService, /// Error from future of user's Service. #[cfg(any(feature = "http1", feature = "http2"))] Service, @@ -278,12 +270,6 @@ impl Error { Error::new(Kind::Listen).with(cause) } - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "server")] - pub(super) fn new_accept>(cause: E) -> Error { - Error::new(Kind::Accept).with(cause) - } - #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "client")] pub(super) fn new_connect>(cause: E) -> Error { @@ -356,12 +342,6 @@ impl Error { Error::new_user(User::ManualUpgrade) } - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "server")] - pub(super) fn new_user_make_service>(cause: E) -> Error { - Error::new_user(User::MakeService).with(cause) - } - #[cfg(any(feature = "http1", feature = "http2"))] pub(super) fn new_user_service>(cause: E) -> Error { Error::new_user(User::Service).with(cause) @@ -435,9 +415,6 @@ impl Error { Kind::Canceled => "operation was canceled", #[cfg(all(feature = "server", feature = "tcp"))] Kind::Listen => "error creating server listener", - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "server")] - Kind::Accept => "error accepting connection", #[cfg(all(feature = "http1", feature = "server", feature = "runtime"))] Kind::HeaderTimeout => "read header from client timeout", #[cfg(any(feature = "http1", feature = "http2"))] @@ -455,9 +432,6 @@ impl Error { Kind::User(User::Body) => "error from user's HttpBody stream", Kind::User(User::BodyWriteAborted) => "user body write aborted", #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "server")] - Kind::User(User::MakeService) => "error from user's MakeService", - #[cfg(any(feature = "http1", feature = "http2"))] Kind::User(User::Service) => "error from user's Service", #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] diff --git a/src/lib.rs b/src/lib.rs index 7a99ce9636..04926928d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -102,6 +102,4 @@ cfg_feature! { #![feature = "server"] pub mod server; - #[doc(no_inline)] - pub use crate::server::Server; } diff --git a/src/server/accept.rs b/src/server/accept.rs deleted file mode 100644 index d38dcb986f..0000000000 --- a/src/server/accept.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! The `Accept` trait and supporting types. -//! -//! This module contains: -//! -//! - The [`Accept`](Accept) trait used to asynchronously accept incoming -//! connections. -//! - Utilities like `poll_fn` to ease creating a custom `Accept`. - -use crate::common::{ - task::{self, Poll}, - Pin, -}; - -/// Asynchronously accept incoming connections. -pub trait Accept { - /// The connection type that can be accepted. - type Conn; - /// The error type that can occur when accepting a connection. - type Error; - - /// Poll to accept the next connection. - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>>; -} - -/// Create an `Accept` with a polling function. -/// -/// # Example -/// -/// ``` -/// use std::task::Poll; -/// use hyper::server::{accept, Server}; -/// -/// # let mock_conn = (); -/// // If we created some mocked connection... -/// let mut conn = Some(mock_conn); -/// -/// // And accept just the mocked conn once... -/// let once = accept::poll_fn(move |cx| { -/// Poll::Ready(conn.take().map(Ok::<_, ()>)) -/// }); -/// -/// let builder = Server::builder(once); -/// ``` -pub fn poll_fn(func: F) -> impl Accept -where - F: FnMut(&mut task::Context<'_>) -> Poll>>, -{ - struct PollFn(F); - - // The closure `F` is never pinned - impl Unpin for PollFn {} - - impl Accept for PollFn - where - F: FnMut(&mut task::Context<'_>) -> Poll>>, - { - type Conn = IO; - type Error = E; - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>> { - (self.get_mut().0)(cx) - } - } - - PollFn(func) -} diff --git a/src/server/conn.rs b/src/server/conn.rs index 0649f22de9..9b6140b141 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -5,9 +5,6 @@ //! are not handled at this level. This module provides the building blocks to //! customize those things externally. //! -//! If you don't have need to manage connections yourself, consider using the -//! higher-level [Server](super) API. -//! //! ## Example //! A simple example that uses the `Http` struct to talk HTTP over a Tokio TCP stream //! ```no_run @@ -69,7 +66,6 @@ cfg_feature! { use tokio::io::{AsyncRead, AsyncWrite}; use tracing::trace; - pub use super::server::Connecting; use crate::body::{Body, HttpBody}; use crate::common::{task, Future, Pin, Poll, Unpin}; #[cfg(not(all(feature = "http1", feature = "http2")))] @@ -84,9 +80,6 @@ cfg_feature! { /// A lower-level configuration of the HTTP protocol. /// /// This structure is used to configure options for an HTTP server connection. -/// -/// If you don't have need to manage connections yourself, consider using the -/// higher-level [Server](super) API. #[derive(Clone, Debug)] #[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] diff --git a/src/server/mod.rs b/src/server/mod.rs index ad9b8013f9..46d6bf51a7 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,37 +1,10 @@ //! HTTP Server //! -//! A `Server` is created to listen on a port, parse HTTP requests, and hand -//! them off to a `Service`. +//! A "server" is usually created by listening on a port for new connections, +//! parse HTTP requests, and hand them off to a `Service`. //! -//! There are two levels of APIs provide for constructing HTTP servers: -//! -//! - The higher-level [`Server`](Server) type. -//! - The lower-level [`conn`](conn) module. -//! -//! # Server -//! -//! The [`Server`](Server) is main way to start listening for HTTP requests. -//! It wraps a listener with a [`MakeService`](crate::service), and then should -//! be executed to start serving requests. -//! -//! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default. -pub mod accept; +//! How exactly you choose to listen for connections is not something hyper +//! concerns itself with. After you have a connection, you can handle HTTP over +//! it with the types in the [`conn`](conn) module. pub mod conn; -pub use self::server::Server; - -cfg_feature! { - #![any(feature = "http1", feature = "http2")] - - pub(crate) mod server; - pub use self::server::Builder; - - mod shutdown; -} - -cfg_feature! { - #![not(any(feature = "http1", feature = "http2"))] - - mod server_stub; - use server_stub as server; -} diff --git a/src/server/server.rs b/src/server/server.rs deleted file mode 100644 index 61ef4ab467..0000000000 --- a/src/server/server.rs +++ /dev/null @@ -1,622 +0,0 @@ -use std::error::Error as StdError; -use std::fmt; -#[cfg(feature = "http1")] -use std::time::Duration; - -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::trace; - -use super::accept::Accept; -use crate::body::{Body, HttpBody}; -use crate::common::exec::Exec; -use crate::common::exec::{ConnStreamExec, NewSvcExec}; -use crate::common::{task, Future, Pin, Poll, Unpin}; -// Renamed `Http` as `Http_` for now so that people upgrading don't see an -// error that `hyper::server::Http` is private... -use super::conn::{Connection, Http as Http_, UpgradeableConnection}; -use super::shutdown::{Graceful, GracefulWatcher}; -use crate::service::{HttpService, MakeServiceRef}; - -use self::new_svc::NewSvcTask; - -pin_project! { - /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. - /// - /// `Server` is a `Future` mapping a bound listener with a set of service - /// handlers. It is built using the [`Builder`](Builder), and the future - /// completes when the server has been shutdown. It should be run by an - /// `Executor`. - pub struct Server { - #[pin] - incoming: I, - make_service: S, - protocol: Http_, - } -} - -/// A builder for a [`Server`](Server). -#[derive(Debug)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -pub struct Builder { - incoming: I, - protocol: Http_, -} - -// ===== impl Server ===== - -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -impl Server { - /// Starts a [`Builder`](Builder) with the provided incoming stream. - pub fn builder(incoming: I) -> Builder { - Builder { - incoming, - protocol: Http_::new(), - } - } -} - -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -impl Server -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - S::Error: Into>, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec<>::Future, B>, -{ - /// Prepares a server to handle graceful shutdown when the provided future - /// completes. - pub fn with_graceful_shutdown(self, signal: F) -> Graceful - where - F: Future, - E: NewSvcExec, - { - Graceful::new(self, signal) - } - - fn poll_next_( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>>> { - let me = self.project(); - match ready!(me.make_service.poll_ready_ref(cx)) { - Ok(()) => (), - Err(e) => { - trace!("make_service closed"); - return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); - } - } - - if let Some(item) = ready!(me.incoming.poll_accept(cx)) { - let io = item.map_err(crate::Error::new_accept)?; - let new_fut = me.make_service.make_service_ref(&io); - Poll::Ready(Some(Ok(Connecting { - future: new_fut, - io: Some(io), - protocol: me.protocol.clone(), - }))) - } else { - Poll::Ready(None) - } - } - - pub(super) fn poll_watch( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - watcher: &W, - ) -> Poll> - where - E: NewSvcExec, - W: Watcher, - { - loop { - if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) { - let fut = NewSvcTask::new(connecting, watcher.clone()); - self.as_mut().project().protocol.exec.execute_new_svc(fut); - } else { - return Poll::Ready(Ok(())); - } - } - } -} - -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -impl Future for Server -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - S::Error: Into>, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec<>::Future, B>, - E: NewSvcExec, -{ - type Output = crate::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - self.poll_watch(cx, &NoopWatcher) - } -} - -impl fmt::Debug for Server { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut st = f.debug_struct("Server"); - st.field("listener", &self.incoming); - st.finish() - } -} - -// ===== impl Builder ===== - -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -impl Builder { - /// Start a new builder, wrapping an incoming stream and low-level options. - pub fn new(incoming: I, protocol: Http_) -> Self { - Builder { incoming, protocol } - } - - /// Sets whether to use keep-alive for HTTP/1 connections. - /// - /// Default is `true`. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_keepalive(mut self, val: bool) -> Self { - self.protocol.http1_keep_alive(val); - self - } - - /// Set whether HTTP/1 connections should support half-closures. - /// - /// Clients can chose to shutdown their write-side while waiting - /// for the server to respond. Setting this to `true` will - /// prevent closing the connection immediately if `read` - /// detects an EOF in the middle of a request. - /// - /// Default is `false`. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_half_close(mut self, val: bool) -> Self { - self.protocol.http1_half_close(val); - self - } - - /// Set the maximum buffer size. - /// - /// Default is ~ 400kb. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_max_buf_size(mut self, val: usize) -> Self { - self.protocol.max_buf_size(val); - self - } - - // Sets whether to bunch up HTTP/1 writes until the read buffer is empty. - // - // This isn't really desirable in most cases, only really being useful in - // silly pipeline benchmarks. - #[doc(hidden)] - #[cfg(feature = "http1")] - pub fn http1_pipeline_flush(mut self, val: bool) -> Self { - self.protocol.pipeline_flush(val); - self - } - - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - #[cfg(feature = "http1")] - pub fn http1_writev(mut self, enabled: bool) -> Self { - self.protocol.http1_writev(enabled); - self - } - - /// Set whether HTTP/1 connections will write header names as title case at - /// the socket level. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_title_case_headers(mut self, val: bool) -> Self { - self.protocol.http1_title_case_headers(val); - self - } - - /// Set whether to support preserving original header cases. - /// - /// Currently, this will record the original cases received, and store them - /// in a private extension on the `Request`. It will also look for and use - /// such an extension in any provided `Response`. - /// - /// Since the relevant extension is still private, there is no way to - /// interact with the original cases. The only effect this can have now is - /// to forward the cases in a proxy-like fashion. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_preserve_header_case(mut self, val: bool) -> Self { - self.protocol.http1_preserve_header_case(val); - self - } - - /// Set a timeout for reading client request headers. If a client does not - /// transmit the entire header within this time, the connection is closed. - /// - /// Default is None. - #[cfg(all(feature = "http1", feature = "runtime"))] - #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))] - pub fn http1_header_read_timeout(mut self, read_timeout: Duration) -> Self { - self.protocol.http1_header_read_timeout(read_timeout); - self - } - - /// Sets whether HTTP/1 is required. - /// - /// Default is `false`. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_only(mut self, val: bool) -> Self { - self.protocol.http1_only(val); - self - } - - /// Sets whether HTTP/2 is required. - /// - /// Default is `false`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_only(mut self, val: bool) -> Self { - self.protocol.http2_only(val); - self - } - - /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 - /// stream-level flow control. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_stream_window_size(mut self, sz: impl Into>) -> Self { - self.protocol.http2_initial_stream_window_size(sz.into()); - self - } - - /// Sets the max connection-level flow control for HTTP2 - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_connection_window_size(mut self, sz: impl Into>) -> Self { - self.protocol - .http2_initial_connection_window_size(sz.into()); - self - } - - /// Sets whether to use an adaptive flow control. - /// - /// Enabling this will override the limits set in - /// `http2_initial_stream_window_size` and - /// `http2_initial_connection_window_size`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_adaptive_window(mut self, enabled: bool) -> Self { - self.protocol.http2_adaptive_window(enabled); - self - } - - /// Sets the maximum frame size to use for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_frame_size(mut self, sz: impl Into>) -> Self { - self.protocol.http2_max_frame_size(sz); - self - } - - /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 - /// connections. - /// - /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_concurrent_streams(mut self, max: impl Into>) -> Self { - self.protocol.http2_max_concurrent_streams(max.into()); - self - } - - /// Sets an interval for HTTP2 Ping frames should be sent to keep a - /// connection alive. - /// - /// Pass `None` to disable HTTP2 keep-alive. - /// - /// Default is currently disabled. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(all(feature = "runtime", feature = "http2"))] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_interval(mut self, interval: impl Into>) -> Self { - self.protocol.http2_keep_alive_interval(interval); - self - } - - /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. - /// - /// If the ping is not acknowledged within the timeout, the connection will - /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. - /// - /// Default is 20 seconds. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(all(feature = "runtime", feature = "http2"))] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self { - self.protocol.http2_keep_alive_timeout(timeout); - self - } - - /// Set the maximum write buffer size for each HTTP/2 stream. - /// - /// Default is currently ~400KB, but may change. - /// - /// # Panics - /// - /// The value must be no larger than `u32::MAX`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_send_buf_size(mut self, max: usize) -> Self { - self.protocol.http2_max_send_buf_size(max); - self - } - - /// Enables the [extended CONNECT protocol]. - /// - /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 - #[cfg(feature = "http2")] - pub fn http2_enable_connect_protocol(mut self) -> Self { - self.protocol.http2_enable_connect_protocol(); - self - } - - /// Sets the `Executor` to deal with connection tasks. - /// - /// Default is `tokio::spawn`. - pub fn executor(self, executor: E2) -> Builder { - Builder { - incoming: self.incoming, - protocol: self.protocol.with_executor(executor), - } - } - - /// Consume this `Builder`, creating a [`Server`](Server). - pub fn serve(self, make_service: S) -> Server - where - I: Accept, - I::Error: Into>, - I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - S::Error: Into>, - B: HttpBody + 'static, - B::Error: Into>, - E: NewSvcExec, - E: ConnStreamExec<>::Future, B>, - { - Server { - incoming: self.incoming, - make_service, - protocol: self.protocol.clone(), - } - } -} - -// Used by `Server` to optionally watch a `Connection` future. -// -// The regular `hyper::Server` just uses a `NoopWatcher`, which does -// not need to watch anything, and so returns the `Connection` untouched. -// -// The `Server::with_graceful_shutdown` needs to keep track of all active -// connections, and signal that they start to shutdown when prompted, so -// it has a `GracefulWatcher` implementation to do that. -pub trait Watcher, E>: Clone { - type Future: Future>; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future; -} - -#[allow(missing_debug_implementations)] -#[derive(Copy, Clone)] -pub struct NoopWatcher; - -impl Watcher for NoopWatcher -where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: HttpService, - E: ConnStreamExec, - S::ResBody: 'static, - ::Error: Into>, -{ - type Future = UpgradeableConnection; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future { - conn - } -} - -// used by exec.rs -pub(crate) mod new_svc { - use std::error::Error as StdError; - use tokio::io::{AsyncRead, AsyncWrite}; - use tracing::debug; - - use super::{Connecting, Watcher}; - use crate::body::{Body, HttpBody}; - use crate::common::exec::ConnStreamExec; - use crate::common::{task, Future, Pin, Poll, Unpin}; - use crate::service::HttpService; - use pin_project_lite::pin_project; - - // This is a `Future` spawned to an `Executor` inside - // the `Server`. By being a nameable type, we can be generic over the - // user's `Service::Future`, and thus an `Executor` can execute it. - // - // Doing this allows for the server to conditionally require `Send` futures, - // depending on the `Executor` configured. - // - // Users cannot import this type, nor the associated `NewSvcExec`. Instead, - // a blanket implementation for `Executor` is sufficient. - - pin_project! { - #[allow(missing_debug_implementations)] - pub struct NewSvcTask, E, W: Watcher> { - #[pin] - state: State, - } - } - - pin_project! { - #[project = StateProj] - pub(super) enum State, E, W: Watcher> { - Connecting { - #[pin] - connecting: Connecting, - watcher: W, - }, - Connected { - #[pin] - future: W::Future, - }, - } - } - - impl, E, W: Watcher> NewSvcTask { - pub(super) fn new(connecting: Connecting, watcher: W) -> Self { - NewSvcTask { - state: State::Connecting { - connecting, - watcher, - }, - } - } - } - - impl Future for NewSvcTask - where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - N: Future>, - NE: Into>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec, - W: Watcher, - { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - // If it weren't for needing to name this type so the `Send` bounds - // could be projected to the `Serve` executor, this could just be - // an `async fn`, and much safer. Woe is me. - - let mut me = self.project(); - loop { - let next = { - match me.state.as_mut().project() { - StateProj::Connecting { - connecting, - watcher, - } => { - let res = ready!(connecting.poll(cx)); - let conn = match res { - Ok(conn) => conn, - Err(err) => { - let err = crate::Error::new_user_make_service(err); - debug!("connecting error: {}", err); - return Poll::Ready(()); - } - }; - let future = watcher.watch(conn.with_upgrades()); - State::Connected { future } - } - StateProj::Connected { future } => { - return future.poll(cx).map(|res| { - if let Err(err) = res { - debug!("connection error: {}", err); - } - }); - } - } - }; - - me.state.set(next); - } - } - } -} - -pin_project! { - /// A future building a new `Service` to a `Connection`. - /// - /// Wraps the future returned from `MakeService` into one that returns - /// a `Connection`. - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] - pub struct Connecting { - #[pin] - future: F, - io: Option, - protocol: Http_, - } -} - -impl Future for Connecting -where - I: AsyncRead + AsyncWrite + Unpin, - F: Future>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec, -{ - type Output = Result, FE>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut me = self.project(); - let service = ready!(me.future.poll(cx))?; - let io = Option::take(&mut me.io).expect("polled after complete"); - Poll::Ready(Ok(me.protocol.serve_connection(io, service))) - } -} diff --git a/src/server/server_stub.rs b/src/server/server_stub.rs deleted file mode 100644 index 87b1f5131f..0000000000 --- a/src/server/server_stub.rs +++ /dev/null @@ -1,16 +0,0 @@ -use std::fmt; - -use crate::common::exec::Exec; - -/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. -/// -/// Needs at least one of the `http1` and `http2` features to be activated to actually be useful. -pub struct Server { - _marker: std::marker::PhantomData<(I, S, E)>, -} - -impl fmt::Debug for Server { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Server").finish() - } -} diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs deleted file mode 100644 index 96937d0827..0000000000 --- a/src/server/shutdown.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::error::Error as StdError; - -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::debug; - -use super::accept::Accept; -use super::conn::UpgradeableConnection; -use super::server::{Server, Watcher}; -use crate::body::{Body, HttpBody}; -use crate::common::drain::{self, Draining, Signal, Watch, Watching}; -use crate::common::exec::{ConnStreamExec, NewSvcExec}; -use crate::common::{task, Future, Pin, Poll, Unpin}; -use crate::service::{HttpService, MakeServiceRef}; - -pin_project! { - #[allow(missing_debug_implementations)] - pub struct Graceful { - #[pin] - state: State, - } -} - -pin_project! { - #[project = StateProj] - pub(super) enum State { - Running { - drain: Option<(Signal, Watch)>, - #[pin] - server: Server, - #[pin] - signal: F, - }, - Draining { draining: Draining }, - } -} - -impl Graceful { - pub(super) fn new(server: Server, signal: F) -> Self { - let drain = Some(drain::channel()); - Graceful { - state: State::Running { - drain, - server, - signal, - }, - } - } -} - -impl Future for Graceful -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - S::Error: Into>, - B: HttpBody + 'static, - B::Error: Into>, - F: Future, - E: ConnStreamExec<>::Future, B>, - E: NewSvcExec, -{ - type Output = crate::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut me = self.project(); - loop { - let next = { - match me.state.as_mut().project() { - StateProj::Running { - drain, - server, - signal, - } => match signal.poll(cx) { - Poll::Ready(()) => { - debug!("signal received, starting graceful shutdown"); - let sig = drain.take().expect("drain channel").0; - State::Draining { - draining: sig.drain(), - } - } - Poll::Pending => { - let watch = drain.as_ref().expect("drain channel").1.clone(); - return server.poll_watch(cx, &GracefulWatcher(watch)); - } - }, - StateProj::Draining { ref mut draining } => { - return Pin::new(draining).poll(cx).map(Ok); - } - } - }; - me.state.set(next); - } - } -} - -#[allow(missing_debug_implementations)] -#[derive(Clone)] -pub struct GracefulWatcher(Watch); - -impl Watcher for GracefulWatcher -where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: HttpService, - E: ConnStreamExec, - S::ResBody: 'static, - ::Error: Into>, -{ - type Future = - Watching, fn(Pin<&mut UpgradeableConnection>)>; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future { - self.0.clone().watch(conn, on_drain) - } -} - -fn on_drain(conn: Pin<&mut UpgradeableConnection>) -where - S: HttpService, - S::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin, - S::ResBody: HttpBody + 'static, - ::Error: Into>, - E: ConnStreamExec, -{ - conn.graceful_shutdown() -} diff --git a/src/service/make.rs b/src/service/make.rs index 1d4347a71e..2b0f8ef19f 100644 --- a/src/service/make.rs +++ b/src/service/make.rs @@ -1,10 +1,6 @@ -use std::error::Error as StdError; -use std::fmt; - use tokio::io::{AsyncRead, AsyncWrite}; -use super::{HttpService, Service}; -use crate::body::HttpBody; +use super::Service; use crate::common::{task, Future, Poll}; // The same "trait alias" as tower::MakeConnection, but inlined to reduce @@ -38,115 +34,6 @@ where } } -// Just a sort-of "trait alias" of `MakeService`, not to be implemented -// by anyone, only used as bounds. -pub trait MakeServiceRef: self::sealed::Sealed<(Target, ReqBody)> { - type ResBody: HttpBody; - type Error: Into>; - type Service: HttpService; - type MakeError: Into>; - type Future: Future>; - - // Acting like a #[non_exhaustive] for associated types of this trait. - // - // Basically, no one outside of hyper should be able to set this type - // or declare bounds on it, so it should prevent people from creating - // trait objects or otherwise writing code that requires using *all* - // of the associated types. - // - // Why? So we can add new associated types to this alias in the future, - // if necessary. - type __DontNameMe: self::sealed::CantImpl; - - fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll>; - - fn make_service_ref(&mut self, target: &Target) -> Self::Future; -} - -impl MakeServiceRef for T -where - T: for<'a> Service<&'a Target, Error = ME, Response = S, Future = F>, - E: Into>, - ME: Into>, - S: HttpService, - F: Future>, - IB: HttpBody, - OB: HttpBody, -{ - type Error = E; - type Service = S; - type ResBody = OB; - type MakeError = ME; - type Future = F; - - type __DontNameMe = self::sealed::CantName; - - fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.poll_ready(cx) - } - - fn make_service_ref(&mut self, target: &Target) -> Self::Future { - self.call(target) - } -} - -impl self::sealed::Sealed<(Target, B1)> for T -where - T: for<'a> Service<&'a Target, Response = S>, - S: HttpService, - B1: HttpBody, - B2: HttpBody, -{ -} - -/// Create a `MakeService` from a function. -pub fn make_service_fn(f: F) -> MakeServiceFn -where - F: FnMut(&Target) -> Ret, - Ret: Future, -{ - MakeServiceFn { f } -} - -/// `MakeService` returned from [`make_service_fn`] -#[derive(Clone, Copy)] -pub struct MakeServiceFn { - f: F, -} - -impl<'t, F, Ret, Target, Svc, MkErr> Service<&'t Target> for MakeServiceFn -where - F: FnMut(&Target) -> Ret, - Ret: Future>, - MkErr: Into>, -{ - type Error = MkErr; - type Response = Svc; - type Future = Ret; - - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, target: &'t Target) -> Self::Future { - (self.f)(target) - } -} - -impl fmt::Debug for MakeServiceFn { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MakeServiceFn").finish() - } -} - mod sealed { pub trait Sealed {} - - #[allow(unreachable_pub)] // This is intentional. - pub trait CantImpl {} - - #[allow(missing_debug_implementations)] - pub enum CantName {} - - impl CantImpl for CantName {} } diff --git a/src/service/mod.rs b/src/service/mod.rs index 22f850ca47..baa093cd25 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -10,8 +10,6 @@ //! //! - `HttpService`: This is blanketly implemented for all types that //! implement `Service, Response = http::Response>`. -//! - `MakeService`: When a `Service` returns a new `Service` as its "response", -//! we consider it a `MakeService`. Again, blanketly implemented in those cases. //! - `MakeConnection`: A `Service` that returns a "connection", a type that //! implements `AsyncRead` and `AsyncWrite`. //! @@ -24,16 +22,6 @@ //! The helper [`service_fn`](service_fn) should be sufficient for most cases, but //! if you need to implement `Service` for a type manually, you can follow the example //! in `service_struct_impl.rs`. -//! -//! # MakeService -//! -//! Since a `Service` is bound to a single connection, a [`Server`](crate::Server) -//! needs a way to make them as it accepts connections. This is what a -//! `MakeService` does. -//! -//! Resources that need to be shared by all `Service`s can be put into a -//! `MakeService`, and then passed to individual `Service`s when `call` -//! is called. pub use tower_service::Service; @@ -43,13 +31,11 @@ mod make; mod oneshot; mod util; +#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] pub(super) use self::http::HttpService; #[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))] pub(super) use self::make::MakeConnection; -#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] -pub(super) use self::make::MakeServiceRef; #[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))] pub(super) use self::oneshot::{oneshot, Oneshot}; -pub use self::make::make_service_fn; pub use self::util::service_fn;