Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use wasi-streams in the wasi-http implementation #7056

Merged
merged 62 commits into from
Sep 21, 2023
Merged
Changes from 1 commit
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
2a1d46b
Start refactoring wasi-http
elliottt Sep 13, 2023
119c17a
Checkpoint
elliottt Sep 14, 2023
5f13f1c
Initial implementation of response future handling
elliottt Sep 14, 2023
96f9c71
Lazily initialize response headers and body
elliottt Sep 14, 2023
6546205
make wasmtime-wasi-http compile
Sep 14, 2023
21e09b8
wasi-http wit: make a way to reject outgoing-request in outgoing-handler
Sep 14, 2023
48ba40e
wasi: sync wit from wasi-http
Sep 14, 2023
a9a4dad
outgoing handler impl: report errors to userland
Sep 14, 2023
0718d60
test-programs: get wasi-http-components kicking over, delete modules …
Sep 14, 2023
1e2bfc6
Process headers
elliottt Sep 14, 2023
5d412e3
Add HostIncomingBody::new
elliottt Sep 14, 2023
7a79086
Add trailers functions
elliottt Sep 14, 2023
c2e7e74
Add TODO for body task outline
elliottt Sep 15, 2023
b663678
Rework incoming-response-consume to return a future-trailers value as…
elliottt Sep 15, 2023
a2250b3
Fix the wit
elliottt Sep 15, 2023
f2795b0
First cut at the worker loop
elliottt Sep 15, 2023
ac8cf02
wasi-http: change how we represent bodies/trailers, and annotate own/…
Sep 15, 2023
ef3ce0a
Update types_impl.rs for wit changes
elliottt Sep 15, 2023
7f6ad99
Split body management into its own module
elliottt Sep 15, 2023
cb6981b
Checkpoint
elliottt Sep 15, 2023
fc6b6f4
more work on incoming body and future trailers
Sep 15, 2023
41379a2
Fill out some more functions
elliottt Sep 15, 2023
39f93f0
Implement future-trailers-{subscribe,get}
elliottt Sep 18, 2023
ebaa0b6
Implement drop-future-trailers
elliottt Sep 18, 2023
a4f69b4
Rework fields, but make the borrow checker mad
elliottt Sep 18, 2023
b44fd65
Fix borrow error
alexcrichton Sep 18, 2023
15a9ae5
wasi-http-tests: fix build
Sep 18, 2023
3bbcbdf
test-runner: report errors with stdout/stderr properly
Sep 18, 2023
b81e7b7
fix two trivial wasi-http tests
Sep 18, 2023
5bf7f12
Remove unnecessary drops
elliottt Sep 18, 2023
91f9b98
Convert a `bail!` to a `todo!`
elliottt Sep 18, 2023
486e4f8
Remove a TODO that documented the body worker structure
elliottt Sep 18, 2023
a38546e
fill in a bunch more of OutputBody
Sep 19, 2023
b57c0f4
Remove the custom FrameFut future in favor of using http_body_util
elliottt Sep 19, 2023
be93779
Move the outgoing body types to body.rs
elliottt Sep 19, 2023
b8b255b
Rework the handling of outgoing bodies
elliottt Sep 19, 2023
33a1ee2
Fix the `outgoing request get` test
elliottt Sep 19, 2023
bf4b148
Avoid deadlocking the post tests
elliottt Sep 19, 2023
836e9f8
future_incoming_request_get shouldn't delete the resource
elliottt Sep 19, 2023
e3e9940
Fix the invalid_dnsname test
elliottt Sep 19, 2023
001f249
implement drop-future-incoming-response
elliottt Sep 19, 2023
e2cb195
Fix invalid_port and invalid_dnsname tests
elliottt Sep 19, 2023
d6f15c2
Fix the post test
elliottt Sep 19, 2023
83d43ae
Passing a too large string to println! caused the large post test to …
elliottt Sep 19, 2023
f953b06
Format
elliottt Sep 19, 2023
27c5ac5
Plumb through `between_bytes_timeout`
elliottt Sep 19, 2023
43ff3b9
Downgrade hyper
elliottt Sep 20, 2023
a6d57f0
Revert "Downgrade hyper"
elliottt Sep 20, 2023
f1cc65e
Restore old https connection setup
elliottt Sep 20, 2023
e3addd4
Sync the wasi and wasi-http http deps
elliottt Sep 20, 2023
008f85a
Fix tests
elliottt Sep 20, 2023
ccab9e5
Fix wasi-http integration
elliottt Sep 20, 2023
93dc947
sync implementation of wasi-http
elliottt Sep 20, 2023
27ef314
Slightly more robust error checking
elliottt Sep 20, 2023
3c13cf3
Ignore the wasi-http cli test
elliottt Sep 21, 2023
ab991fd
Consistent ignore attributes between sync and async tests
elliottt Sep 21, 2023
fc58a2e
Fix doc errors
elliottt Sep 21, 2023
6810c9f
code motion: introduce intermediate `HostIncomingBodyBuilder` rather …
Sep 21, 2023
5a24679
explain design
Sep 21, 2023
4739421
Turn FieldMap into a type synonym
elliottt Sep 21, 2023
39cbda9
Tidy up some future state (#7073)
alexcrichton Sep 21, 2023
6506d10
body HostInputStream: report runtime errors with StreamRuntimeError
Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Checkpoint
elliottt committed Sep 20, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 119c17ac2338250bd7420ab45210afb0864da936
133 changes: 121 additions & 12 deletions crates/wasi-http/src/http_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::bindings::http::types::{
FutureIncomingResponse, OutgoingRequest, RequestOptions, Scheme,
};
use crate::types::{HostIncomingResponse, TableHttpExt, HostFutureIncomingResponse};
use crate::types::{HostFutureIncomingResponse, HostIncomingResponse, TableHttpExt, IncomingResponseInternal};
use crate::WasiHttpView;
use anyhow::Context;
use bytes::{Bytes, BytesMut};
@@ -23,10 +23,113 @@ impl<T: WasiHttpView> crate::bindings::http::outgoing_handler::Host for T {
request_id: OutgoingRequest,
options: Option<RequestOptions>,
) -> wasmtime::Result<FutureIncomingResponse> {
let connect_timeout = Duration::from_millis(
options
.and_then(|opts| opts.connect_timeout_ms)
.unwrap_or(600 * 1000) as u64,
);

let first_byte_timeout = Duration::from_millis(
options
.and_then(|opts| opts.first_byte_timeout_ms)
.unwrap_or(600 * 1000) as u64,
);

let between_bytes_timeout = Duration::from_millis(
options
.and_then(|opts| opts.between_bytes_timeout_ms)
.unwrap_or(600 * 1000) as u64,
);

let mut req = self.table().delete_outgoing_request(request_id)?;

let method = match req.method {
crate::bindings::http::types::Method::Get => Method::GET,
crate::bindings::http::types::Method::Head => Method::HEAD,
crate::bindings::http::types::Method::Post => Method::POST,
crate::bindings::http::types::Method::Put => Method::PUT,
crate::bindings::http::types::Method::Delete => Method::DELETE,
crate::bindings::http::types::Method::Connect => Method::CONNECT,
crate::bindings::http::types::Method::Options => Method::OPTIONS,
crate::bindings::http::types::Method::Trace => Method::TRACE,
crate::bindings::http::types::Method::Patch => Method::PATCH,
crate::bindings::http::types::Method::Other(method) => {
return Err(crate::bindings::http::types::Error::InvalidUrl(format!(
"unknown method {method}"
))
.into());
}
};

let (use_tls, scheme, port) = match req.scheme.unwrap_or(Scheme::Https) {
Scheme::Http => (false, "http://", 80),
Scheme::Https => (true, "https://", 443),
Scheme::Other(scheme) => {
return Err(crate::bindings::http::types::Error::InvalidUrl(format!(
"unsupported scheme {scheme}"
))
.into())
}
};

let authority = if req.authority.find(':').is_some() {
req.authority.clone()
} else {
format!("{}:{port}", req.authority)
};

let mut builder = hyper::Request::builder()
.method(method)
.uri(format!("{scheme}{authority}{}", req.path_with_query))
.header(hyper::header::HOST, &authority);

for (k, v) in req.headers.0 {
for item in v {
builder = builder.header(&k, item);
}
}

// TODO: we should not use `pipe` here, and should instead construct a type that
// implements HostOutputStream and hyper::Body.
let body = if let Some(body) = req.body {
todo!("finish implementing request body handling")
} else {
Empty::<Bytes>::new().boxed()
};

let request = builder.body(body).map_err(http_protocol_error)?;

let handle = preview2::spawn(async move {
todo!("put the old contents of handle_async in here")
let tcp_stream = TcpStream::connect(authority.clone()).await?;
let (mut sender, conn) = if use_tls {
if cfg!(any(target_arch = "riscv64", target_arch = "s390x")) {
anyhow::bail!(crate::bindings::http::types::Error::UnexpectedError(
"unsupported architecture for SSL".to_string(),
));
}

todo!("tls")
} else {
timeout(
connect_timeout,
// TODO: we should plumb the builder through the http context, and use it here
hyper::client::conn::http1::handshake(tcp_stream),
)
.await
.map_err(|_| timeout_error("connection"))??
};

let worker = preview2::spawn(async move {
conn.await.context("hyper connection failed")?;
Ok::<_, anyhow::Error>(())
});

let resp = timeout(first_byte_timeout, sender.send_request(request))
.await
.map_err(|_| timeout_error("first byte"))?
.map_err(hyper_protocol_error)?;

Ok(IncomingResponseInternal { resp, worker })
});

let fut = self
@@ -37,14 +140,20 @@ impl<T: WasiHttpView> crate::bindings::http::outgoing_handler::Host for T {
}
}

fn port_for_scheme(scheme: &Option<Scheme>) -> &str {
match scheme {
Some(s) => match s {
Scheme::Http => ":80",
Scheme::Https => ":443",
// This should never happen.
_ => panic!("unsupported scheme!"),
},
None => ":443",
}
fn timeout_error(kind: &str) -> anyhow::Error {
anyhow::anyhow!(crate::bindings::http::types::Error::TimeoutError(format!(
"{kind} timed out"
)))
}

fn http_protocol_error(e: http::Error) -> anyhow::Error {
anyhow::anyhow!(crate::bindings::http::types::Error::ProtocolError(
e.to_string()
))
}

fn hyper_protocol_error(e: hyper::Error) -> anyhow::Error {
anyhow::anyhow!(crate::bindings::http::types::Error::ProtocolError(
e.to_string()
))
}
80 changes: 29 additions & 51 deletions crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
@@ -34,51 +34,9 @@ pub struct HostOutgoingRequest {

#[derive(Clone, Debug)]
pub struct HostIncomingResponse {
pub active: bool,
pub status: u16,
pub headers: Option<u32>,
pub body: Option<u32>,
pub trailers: Option<u32>,
}

impl HostIncomingResponse {
pub fn new() -> Self {
Self {
active: false,
status: 0,
headers: None,
body: None,
trailers: None,
}
}

pub fn status(&self) -> u16 {
self.status
}

pub fn headers(&self) -> Option<u32> {
self.headers
}

pub fn set_headers(&mut self, headers: u32) {
self.headers = Some(headers);
}

pub fn body(&self) -> Option<u32> {
self.body
}

pub fn set_body(&mut self, body: u32) {
self.body = Some(body);
}

pub fn trailers(&self) -> Option<u32> {
self.trailers
}

pub fn set_trailers(&mut self, trailers: u32) {
self.trailers = Some(trailers);
}
pub headers: u32,
pub body: u32,
}

#[derive(Clone, Debug)]
@@ -90,13 +48,28 @@ impl HostFields {
}
}

impl From<&hyper::HeaderMap> for HostFields {
fn from(headers: &hyper::HeaderMap) -> Self {
todo!()
}
}

pub(crate) struct IncomingResponseInternal {
pub resp: hyper::Response<hyper::body::Incoming>,
pub worker: AbortOnDropJoinHandle<anyhow::Result<()>>,
}

type FutureIncomingResponseHandle = AbortOnDropJoinHandle<anyhow::Result<IncomingResponseInternal>>;

pub struct HostFutureIncomingResponse {
handle: futures::future::MaybeDone<AbortOnDropJoinHandle<HostIncomingResponse>>,
pub handle: futures::future::MaybeDone<FutureIncomingResponseHandle>,
}

impl HostFutureIncomingResponse {
pub fn new(handle: AbortOnDropJoinHandle<HostIncomingResponse>) -> Self {
Self { handle: futures::future::maybe_done(handle) }
pub fn new(handle: FutureIncomingResponseHandle) -> Self {
Self {
handle: futures::future::maybe_done(handle),
}
}
}

@@ -134,7 +107,10 @@ pub trait TableHttpExt {
&mut self,
id: u32,
) -> Result<&mut HostFutureIncomingResponse, TableError>;
fn delete_future_incoming_response(&mut self, id: u32) -> Result<(), TableError>;
fn delete_future_incoming_response(
&mut self,
id: u32,
) -> Result<HostFutureIncomingResponse, TableError>;
}

#[async_trait::async_trait]
@@ -208,8 +184,10 @@ impl TableHttpExt for Table {
) -> Result<&mut HostFutureIncomingResponse, TableError> {
self.get_mut::<HostFutureIncomingResponse>(id)
}
fn delete_future_incoming_response(&mut self, id: u32) -> Result<(), TableError> {
let _ = self.delete::<HostFutureIncomingResponse>(id)?;
Ok(())
fn delete_future_incoming_response(
&mut self,
id: u32,
) -> Result<HostFutureIncomingResponse, TableError> {
self.delete(id)
}
}
91 changes: 82 additions & 9 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
use std::any::Any;

use crate::bindings::http::types::{
Error, Fields, FutureIncomingResponse, Headers, IncomingRequest, IncomingResponse, Method,
OutgoingRequest, OutgoingResponse, ResponseOutparam, Scheme, StatusCode,
};
use crate::types::{HostFields, HostOutgoingRequest, TableHttpExt};
use crate::types::{
HostFields, HostIncomingResponse, HostOutgoingRequest, IncomingResponseInternal, TableHttpExt, HostFutureIncomingResponse,
};
use crate::WasiHttpView;
use anyhow::{anyhow, bail, Context};
use bytes::Bytes;
use wasmtime_wasi::preview2::TableStreamExt;
use wasmtime_wasi::preview2::{
bindings::io::streams::{InputStream, OutputStream},
bindings::poll::poll::Pollable,
pipe::pipe, HostPollable, TablePollableExt,
pipe::pipe,
HostInputStream, HostPollable, PollableFuture, StreamState, TablePollableExt,
};
use wasmtime_wasi::preview2::{AbortOnDropJoinHandle, TableStreamExt};

struct HostIncoming {
body: hyper::body::Incoming,
worker: AbortOnDropJoinHandle<anyhow::Result<()>>,
}

#[async_trait::async_trait]
impl HostInputStream for HostIncoming {
fn read(&mut self, size: usize) -> anyhow::Result<(Bytes, StreamState)> {
todo!()
}

async fn ready(&mut self) -> anyhow::Result<()> {
todo!()
}
}

#[async_trait::async_trait]
impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
@@ -222,7 +243,7 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
.table()
.get_incoming_response(response)
.context("[incoming_response_status] getting response")?;
Ok(r.status())
Ok(r.status)
}
async fn incoming_response_headers(
&mut self,
@@ -232,7 +253,7 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
.table()
.get_incoming_response(response)
.context("[incoming_response_headers] getting response")?;
Ok(r.headers().unwrap_or(0 as Headers))
Ok(r.headers)
}
async fn incoming_response_consume(
&mut self,
@@ -265,14 +286,66 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
}
async fn future_incoming_response_get(
&mut self,
future: FutureIncomingResponse,
id: FutureIncomingResponse,
) -> wasmtime::Result<Option<Result<IncomingResponse, Error>>> {
todo!()
if let futures::future::MaybeDone::Future(_) =
self.table().get_future_incoming_response(id)?.handle
{
return Ok(None);
}

let mut fut = self.table().delete_future_incoming_response(id)?;
let resp = match std::pin::Pin::new(&mut fut.handle)
.take_output()
.expect("future output only taken once")
{
Ok(resp) => resp,
Err(e) => {
// Trapping if it's not possible to downcast to an wasi-http error
let e = e.downcast::<Error>()?;
return Ok(Some(Err(e)));
}
};

let status = resp.resp.status().as_u16();

let headers = self.table().push_fields(resp.resp.headers().into())?;

let body = self.table().push_input_stream(Box::new(HostIncoming {
body: resp.resp.into_body(),
worker: resp.worker,
}))?;

let resp = self.table().push_incoming_response(HostIncomingResponse {
status,
headers,
body,
})?;

Ok(Some(Ok(resp)))
}
async fn listen_to_future_incoming_response(
&mut self,
future: FutureIncomingResponse,
id: FutureIncomingResponse,
) -> wasmtime::Result<Pollable> {
todo!()
let _ = self.table().get_future_incoming_response(id)?;

fn make_future<'a>(elem: &'a mut dyn Any) -> PollableFuture<'a> {
let resp = elem
.downcast_mut::<HostFutureIncomingResponse>()
.expect("parent resource is HostFutureIncomingResponse");

Box::pin(async move {
let _ = resp.handle.await;
Ok(())
})
}

let pollable = self.table().push_host_pollable(HostPollable::TableEntry {
index: id,
make_future,
})?;

Ok(pollable)
}
}