Skip to content

Commit

Permalink
Http (#233)
Browse files Browse the repository at this point in the history
* fix: multiplex panic (#222)

* feat: log more information on error

* support shutdown hooks (#228)

* support shutdown hooks

* fix: cargo fmt

* init

* handler, extractor (#221)

* handler, extractor

* layer (#224)

* downgrade hyper version

* add graceful shutdown

* rebase main

* fix graceful shutdown

---------

Co-authored-by: Pure White <[email protected]>
Co-authored-by: Yanhao <[email protected]>
Co-authored-by: Gwo Tzu-Hsing <[email protected]>
Co-authored-by: Zheng Li <[email protected]>
  • Loading branch information
5 people authored Oct 18, 2023
1 parent 1154315 commit 095389d
Show file tree
Hide file tree
Showing 18 changed files with 530 additions and 473 deletions.
656 changes: 301 additions & 355 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ volo-http = { path = "../volo-http" }
volo-gen = { path = "./volo-gen" }
bytes.workspace = true
http.workspace = true
hyper = { version = "1.0.0-rc.4", features = ["server", "http1", "http2"] }
hyper = { version = "1.0.0-rc.3", features = ["server", "http1", "http2"] }
motore.workspace = true
serde.workspace = true
15 changes: 9 additions & 6 deletions examples/src/http/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use serde::{Deserialize, Serialize};
use volo_http::{
handler::HandlerService,
request::Json,
route::{Route, Router, Server, ServiceLayerExt},
route::{Route, Router, ServiceLayerExt},
server::Server,
HttpContext,
};

Expand Down Expand Up @@ -69,7 +70,7 @@ async fn test(

#[tokio::main(flavor = "multi_thread")]
async fn main() {
Router::new()
let app = Router::new()
.route(
"/",
Route::builder()
Expand All @@ -86,8 +87,10 @@ async fn main() {
.post(HandlerService::new(test))
.build(),
)
.layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1))))
.serve(SocketAddr::from(([127, 0, 0, 1], 3000)))
.await
.unwrap();
.layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1))));

let addr: SocketAddr = "[::]:9091".parse().unwrap();
let addr = volo::net::Address::from(addr);

Server::new(app).run(addr).await.unwrap();
}
7 changes: 4 additions & 3 deletions volo-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ categories = ["asynchronous", "network-programming", "web-programming"]
keywords = ["async", "http"]

[dependencies]
hyper = { version = "1.0.0-rc.4", features = ["server", "http1", "http2"] }
volo = { version = "0.5", path = "../volo" }
hyper = { version = "=1.0.0-rc.3", features = ["server", "http1", "http2"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1.0-rc.3"
http-body-util = "=0.1.0-rc.2"
http = { version = "0.2" }
hyper-util = { git = "https://github.com/hyperium/hyper-util.git" }
matchit = { version = "0.7" }
motore = { version = "0.3" }
tracing.workspace = true
Expand All @@ -29,6 +29,7 @@ thiserror.workspace = true
mime = "0.3"
serde = "1"
async-trait.workspace = true
parking_lot.workspace = true

[dev-dependencies]
serde = { version = "1", features = ["derive"] }
33 changes: 14 additions & 19 deletions volo-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,22 @@ pub mod param;
pub mod request;
pub mod response;
pub mod route;
pub mod server;

use std::{future::Future, net::SocketAddr};
use std::future::Future;

use http::{Extensions, HeaderMap, HeaderValue, Method, Uri, Version};
use hyper::{
body::{Body, Incoming},
Request, Response,
};
use param::Params;
use volo::net::Address;

pub type DynError = Box<dyn std::error::Error + Send + Sync>;

pub struct HttpContextInner {
pub(crate) peer: SocketAddr,

pub(crate) method: Method,
pub(crate) uri: Uri,
pub(crate) version: Version,
pub(crate) headers: HeaderMap<HeaderValue>,
pub(crate) extensions: Extensions,
}

pub struct HttpContext {
pub peer: SocketAddr,
pub peer: Address,
pub method: Method,
pub uri: Uri,
pub version: Version,
Expand All @@ -43,14 +35,14 @@ pub struct HttpContext {

#[derive(Clone)]
pub struct MotoreService<S> {
peer: SocketAddr,
inner: S,
pub peer: Address,
pub inner: S,
}

impl<OB, S> hyper::service::Service<Request<Incoming>> for MotoreService<S>
where
OB: Body<Error = DynError>,
S: motore::Service<(), (HttpContextInner, Incoming), Response = Response<OB>> + Clone,
S: motore::Service<HttpContext, Incoming, Response = Response<OB>> + Clone,
S::Error: Into<DynError>,
{
type Response = S::Response;
Expand All @@ -59,20 +51,23 @@ where

type Future = impl Future<Output = Result<Self::Response, Self::Error>>;

fn call(&self, req: Request<Incoming>) -> Self::Future {
fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let s = self.inner.clone();
let peer = self.peer;
let peer = self.peer.clone();
async move {
let (parts, req) = req.into_parts();
let cx = HttpContextInner {
let mut cx = HttpContext {
peer,
method: parts.method,
uri: parts.uri,
version: parts.version,
headers: parts.headers,
extensions: parts.extensions,
params: Params {
inner: Vec::with_capacity(0),
},
};
s.call(&mut (), (cx, req)).await
s.call(&mut cx, req).await
}
}
}
2 changes: 1 addition & 1 deletion volo-http/src/param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::slice::Iter;
use bytes::{BufMut, Bytes, BytesMut};

pub struct Params {
inner: Vec<(Bytes, Bytes)>,
pub(crate) inner: Vec<(Bytes, Bytes)>,
}

impl From<matchit::Params<'_, '_>> for Params {
Expand Down
71 changes: 7 additions & 64 deletions volo-http/src/route.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
use std::{future::Future, net::SocketAddr};
use std::future::Future;

use http::{Method, Response, StatusCode};
use http_body_util::Full;
use hyper::{
body::{Body, Bytes, Incoming},
server::conn::http1,
};
use hyper_util::rt::TokioIo;
use hyper::body::{Bytes, Incoming};
use motore::layer::Layer;
use tokio::net::TcpListener;

use crate::{
dispatch::DispatchService, request::FromRequest, response::RespBody, DynError, HttpContext,
HttpContextInner, MotoreService,
};

pub type DynService = motore::BoxCloneService<HttpContext, Incoming, Response<RespBody>, DynError>;
Expand All @@ -22,37 +16,24 @@ pub struct Router {
inner: matchit::Router<DynService>,
}

impl motore::Service<(), (HttpContextInner, Incoming)> for Router {
impl motore::Service<HttpContext, Incoming> for Router {
type Response = Response<RespBody>;

type Error = DynError;

type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + Send + 'cx
where
HttpContextInner: 'cx,
HttpContext: 'cx,
Self: 'cx;

fn call<'cx, 's>(
&'s self,
_cx: &'cx mut (),
cxreq: (HttpContextInner, Incoming),
) -> Self::Future<'cx>
fn call<'cx, 's>(&'s self, cx: &'cx mut HttpContext, req: Incoming) -> Self::Future<'cx>
where
's: 'cx,
{
async move {
let (cx, req) = cxreq;
if let Ok(matched) = self.inner.at(cx.uri.path()) {
let mut context = HttpContext {
peer: cx.peer,
method: cx.method,
uri: cx.uri.clone(),
version: cx.version,
headers: cx.headers,
extensions: cx.extensions,
params: matched.params.into(),
};
matched.value.call(&mut context, req).await
cx.params = matched.params.into();
matched.value.call(cx, req).await
} else {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -99,44 +80,6 @@ impl<S> ServiceLayerExt for S {
}
}

#[async_trait::async_trait]
pub trait Server {
async fn serve(self, addr: SocketAddr) -> Result<(), DynError>;
}
#[async_trait::async_trait]
impl<S, OB> Server for S
where
S: motore::Service<(), (HttpContextInner, Incoming), Response = Response<OB>>
+ Clone
+ Send
+ Sync
+ 'static,
OB: Body<Error = DynError> + Send + 'static,
<OB as Body>::Data: Send,
<S as motore::Service<(), (HttpContextInner, Incoming)>>::Error: Into<DynError>,
{
async fn serve(self, addr: SocketAddr) -> Result<(), DynError> {
let listener = TcpListener::bind(addr).await?;

let service = self;
loop {
let s = service.clone();
let (stream, peer) = listener.accept().await?;

let io = TokioIo::new(stream);

tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, MotoreService { peer, inner: s })
.await
{
tracing::warn!("error serving connection: {:?}", err);
}
});
}
}
}

#[derive(Default, Clone)]
pub struct Route {
options: Option<DynService>,
Expand Down
Loading

0 comments on commit 095389d

Please sign in to comment.