Skip to content

Commit

Permalink
chore(transport): Use hyper_util::service::TowerToHyperService (#1729)
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto authored Jun 15, 2024
1 parent 5310dc4 commit 53cbd0e
Showing 1 changed file with 9 additions and 64 deletions.
73 changes: 9 additions & 64 deletions tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use tracing::{debug, trace};
pub use crate::service::{Routes, RoutesBuilder};

pub use conn::{Connected, TcpConnectInfo};
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use hyper_util::{
rt::{TokioExecutor, TokioIo, TokioTimer},
service::TowerToHyperService,
};
#[cfg(feature = "tls")]
pub use tls::ServerTlsConfig;

Expand Down Expand Up @@ -63,7 +66,7 @@ use tower::{
layer::util::{Identity, Stack},
layer::Layer,
limit::concurrency::ConcurrencyLimitLayer,
util::{BoxCloneService, Either, Oneshot},
util::{BoxCloneService, Either},
Service, ServiceBuilder, ServiceExt,
};

Expand Down Expand Up @@ -595,7 +598,9 @@ impl<L> Server<L> {
let req_svc = svc
.call(&io)
.await
.map_err(super::Error::from_source)?;
.map_err(super::Error::from_source)?
.map_request(|req: Request<Incoming>| req.map(boxed));

let hyper_svc = TowerToHyperService::new(req_svc);

serve_connection(io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()));
Expand Down Expand Up @@ -627,7 +632,7 @@ fn serve_connection<IO, S>(
builder: ConnectionBuilder,
mut watcher: Option<tokio::sync::watch::Receiver<()>>,
) where
S: Service<Request<BoxBody>, Response = Response<BoxBody>> + Clone + Send + 'static,
S: Service<Request<Incoming>, Response = Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
Expand Down Expand Up @@ -663,66 +668,6 @@ fn serve_connection<IO, S>(

type ConnectionBuilder = hyper_util::server::conn::auto::Builder<TokioExecutor>;

/// An adaptor which converts a [`tower::Service`] to a [`hyper::service::Service`].
///
/// The [`hyper::service::Service`] trait is used by hyper to handle incoming requests,
/// and does not support the `poll_ready` method that is used by tower services.
#[derive(Debug, Copy, Clone)]
pub(crate) struct TowerToHyperService<S> {
service: S,
}

impl<S> TowerToHyperService<S> {
/// Create a new `TowerToHyperService` from a tower service.
pub(crate) fn new(service: S) -> Self {
Self { service }
}
}

impl<S> hyper::service::Service<Request<Incoming>> for TowerToHyperService<S>
where
S: tower_service::Service<Request<BoxBody>> + Clone,
S::Error: Into<BoxError> + 'static,
{
type Response = S::Response;
type Error = super::Error;
type Future = TowerToHyperServiceFuture<S, Request<BoxBody>>;

fn call(&self, req: Request<Incoming>) -> Self::Future {
let req = req.map(crate::body::boxed);
TowerToHyperServiceFuture {
future: self.service.clone().oneshot(req),
}
}
}

/// Future returned by [`TowerToHyperService`].
#[derive(Debug)]
#[pin_project]
pub(crate) struct TowerToHyperServiceFuture<S, R>
where
S: tower_service::Service<R>,
{
#[pin]
future: Oneshot<S, R>,
}

impl<S, R> Future for TowerToHyperServiceFuture<S, R>
where
S: tower_service::Service<R>,
S::Error: Into<BoxError> + 'static,
{
type Output = Result<S::Response, super::Error>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project()
.future
.poll(cx)
.map_err(super::Error::from_source)
}
}

impl<L> Router<L> {
pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
Self { server, routes }
Expand Down

0 comments on commit 53cbd0e

Please sign in to comment.