Skip to content

Commit

Permalink
refactor: gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
Arqu committed May 31, 2022
1 parent 686eb5b commit 6d8ea20
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 134 deletions.
299 changes: 171 additions & 128 deletions iroh-gateway/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_recursion::async_recursion;
use axum::{
body::{self, Body, BoxBody, HttpBody},
body::{self, Body, HttpBody},
error_handling::HandleErrorLayer,
extract::{Extension, Path, Query},
http::{header::*, StatusCode},
Expand All @@ -10,26 +10,26 @@ use axum::{
};
use bytes::Bytes;
use handlebars::Handlebars;
use iroh_resolver::resolver::UnixfsType;
use iroh_resolver::resolver::{CidOrDomain, UnixfsType};
use iroh_rpc_client::Client as RpcClient;
use serde::{Deserialize, Serialize};
use serde_json::{
json,
value::{Map, Value as Json},
};
use serde_qs;
use urlencoding::encode;
use std::{
collections::HashMap,
error::Error,
fmt::Write,
sync::Arc,
time::{self, Duration},
fmt::Write,
};
use url::Url;
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
use tracing::{info, info_span};
use url::Url;
use urlencoding::encode;

use crate::{
client::{Client, Request},
Expand All @@ -38,7 +38,8 @@ use crate::{
error::GatewayError,
headers::*,
metrics::{get_current_trace_id, Metrics},
response::{get_response_format, GatewayResponse, ResponseFormat}, templates,
response::{get_response_format, GatewayResponse, ResponseFormat},
templates,
};

#[derive(Debug)]
Expand Down Expand Up @@ -85,8 +86,8 @@ impl Core {
pub async fn new(config: Config, metrics: Metrics) -> anyhow::Result<Self> {
let rpc_client = RpcClient::new(&config.rpc.client_config).await?;
let mut templates = HashMap::new();
templates.insert("dir_list".to_string(), templates::dir_list.to_string());
templates.insert("not_found".to_string(), templates::not_found.to_string());
templates.insert("dir_list".to_string(), templates::DIR_LIST.to_string());
templates.insert("not_found".to_string(), templates::NOT_FOUND.to_string());

Ok(Self {
state: Arc::new(State {
Expand Down Expand Up @@ -163,55 +164,10 @@ async fn get_handler(

let uri_param = query_params.uri.clone().unwrap_or_default();
if !uri_param.is_empty() {
let u = Url::parse(&uri_param);
if u.is_err() {
return Err(error(
StatusCode::BAD_REQUEST,
"invalid uri parameter",
&state,
));
}
let u = u.unwrap();
let uri_scheme = u.scheme().to_string();
if uri_scheme != "ipfs" && uri_scheme != "ipns" {
return Err(error(
StatusCode::BAD_REQUEST,
"invalid uri scheme, must be ipfs or ipns",
&state,
));
}
let mut uri_path = u.path().to_string();
let uri_query = u.query();
if uri_query.is_some() {
let encoded_query = encode(uri_query.unwrap());
write!(
uri_path,
"?{}",
encoded_query
).map_err(|e| error(StatusCode::BAD_REQUEST, &e.to_string(), &state))?;
}
let uri_host = u.host().unwrap().to_string();
let redirect_uri = format!("{}://{}{}", uri_scheme, uri_host, uri_path);
return Ok(GatewayResponse::redirect_permanently(&redirect_uri));
}

if request_headers.contains_key(&HEADER_SERVICE_WORKER) {
let sw = request_headers.get(&HEADER_SERVICE_WORKER).unwrap();
if sw.to_str().unwrap() == "script" && cpath.is_empty() {
return Err(error(
StatusCode::BAD_REQUEST,
"Service Worker not supported",
&state,
));
}
}
if request_headers.contains_key(&HEADER_X_IPFS_GATEWAY_PREFIX) {
return Err(error(
StatusCode::BAD_REQUEST,
"Unsupported HTTP header",
&state,
));
return protocol_handler_redirect(uri_param, &state);
}
service_worker_check(&request_headers, cpath.to_string(), &state)?;
unsuported_header_check(&request_headers, &state)?;

let full_content_path = format!("/{}/{}{}", scheme, cid, cpath);
let resolved_path: iroh_resolver::resolver::Path = full_content_path
Expand All @@ -233,17 +189,8 @@ async fn get_handler(

let mut headers = HeaderMap::new();

if request_headers.contains_key("If-None-Match") {
// todo(arqu): handle dir etags
let cid_etag = get_etag(resolved_cid, Some(format.clone()));
let inm = request_headers
.get("If-None-Match")
.unwrap()
.to_str()
.unwrap();
if etag_matches(inm, &cid_etag) {
return response(StatusCode::NOT_MODIFIED, BoxBody::default(), headers);
}
if let Some(resp) = etag_check(&request_headers, resolved_cid, &format, &state) {
return Ok(resp);
}

// init headers
Expand Down Expand Up @@ -272,6 +219,93 @@ async fn get_handler(
}
}

#[tracing::instrument()]
fn protocol_handler_redirect(
uri_param: String,
state: &State,
) -> Result<GatewayResponse, GatewayError> {
let u = Url::parse(&uri_param);
if u.is_err() {
return Err(error(
StatusCode::BAD_REQUEST,
"invalid uri parameter",
state,
));
}
let u = u.unwrap();
let uri_scheme = u.scheme().to_string();
if uri_scheme != "ipfs" && uri_scheme != "ipns" {
return Err(error(
StatusCode::BAD_REQUEST,
"invalid uri scheme, must be ipfs or ipns",
state,
));
}
let mut uri_path = u.path().to_string();
let uri_query = u.query();
if uri_query.is_some() {
let encoded_query = encode(uri_query.unwrap());
write!(uri_path, "?{}", encoded_query)
.map_err(|e| error(StatusCode::BAD_REQUEST, &e.to_string(), state))?;
}
let uri_host = u.host().unwrap().to_string();
let redirect_uri = format!("{}://{}{}", uri_scheme, uri_host, uri_path);
Ok(GatewayResponse::redirect_permanently(&redirect_uri))
}

#[tracing::instrument()]
fn service_worker_check(
request_headers: &HeaderMap,
cpath: String,
state: &State,
) -> Result<(), GatewayError> {
if request_headers.contains_key(&HEADER_SERVICE_WORKER) {
let sw = request_headers.get(&HEADER_SERVICE_WORKER).unwrap();
if sw.to_str().unwrap() == "script" && cpath.is_empty() {
return Err(error(
StatusCode::BAD_REQUEST,
"Service Worker not supported",
state,
));
}
}
Ok(())
}

#[tracing::instrument()]
fn unsuported_header_check(request_headers: &HeaderMap, state: &State) -> Result<(), GatewayError> {
if request_headers.contains_key(&HEADER_X_IPFS_GATEWAY_PREFIX) {
return Err(error(
StatusCode::BAD_REQUEST,
"Unsupported HTTP header",
state,
));
}
Ok(())
}

#[tracing::instrument()]
fn etag_check(
request_headers: &HeaderMap,
resolved_cid: &CidOrDomain,
format: &ResponseFormat,
state: &State,
) -> Option<GatewayResponse> {
if request_headers.contains_key("If-None-Match") {
// todo(arqu): handle dir etags
let cid_etag = get_etag(resolved_cid, Some(format.clone()));
let inm = request_headers
.get("If-None-Match")
.unwrap()
.to_str()
.unwrap();
if etag_matches(inm, &cid_etag) {
return Some(GatewayResponse::not_modified());
}
}
None
}

#[tracing::instrument()]
async fn serve_raw(
req: &Request,
Expand All @@ -289,8 +323,7 @@ async fn serve_raw(
Arc::clone(&state),
)
.await
.unwrap();
// .map_err(|e| error(StatusCode::INTERNAL_SERVER_ERROR, &e))?;
.map_err(|e| error(StatusCode::INTERNAL_SERVER_ERROR, &e, &state))?;

set_content_disposition_headers(
&mut headers,
Expand Down Expand Up @@ -359,8 +392,8 @@ async fn serve_fs(
add_ipfs_roots_headers(&mut headers, metadata.clone());
match metadata.unixfs_type {
Some(UnixfsType::Dir) => {
let dir_list = match body.data().await {
Some(data) => match data {
if let Some(dir_list_data) = body.data().await {
let dir_list = match dir_list_data {
Ok(b) => b,
Err(_) => {
return Err(error(
Expand All @@ -369,64 +402,15 @@ async fn serve_fs(
&state,
));
}
},
None => {
return Err(error(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to read dir listing",
&state,
));
}
};
let dir_list = String::from_utf8(dir_list.to_vec()).unwrap();
let dir_list_lines = dir_list.lines();
let force_dir = req.query_params.force_dir.unwrap_or(false);
if !force_dir {
for line in dir_list_lines.clone() {
if line == "index.html" {
if !req.content_path.ends_with('/') {
let redirect_path = format!(
"{}/{}",
req.content_path,
req.query_params.to_query_string()
);
return Ok(GatewayResponse::redirect(&redirect_path));
}
let mut new_req = req.clone();
new_req
.resolved_path
.extend_tail(vec!["index.html".to_string()]);
new_req.content_path = format!("{}/index.html", req.content_path);
return serve_fs(&new_req, state, headers, start_time).await;
}
}
}

headers.insert(CONTENT_TYPE, HeaderValue::from_str("text/html").unwrap());
// todo(arqu): set etag
// set_etag_headers(&mut headers, metadata.dir_hash.clone());

let mut template_data: Map<String, Json> = Map::new();
let mut root_path = req.content_path.clone();
if !root_path.ends_with('/') {
root_path.push('/');
};
return serve_fs_dir(&dir_list, req, state, headers, start_time).await;
} else {
return Err(error(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to read dir listing",
&state,
));
}
let links = dir_list_lines
.map(|line| {
let mut link = Map::new();
link.insert("name".to_string(), Json::String(get_filename(line)));
link.insert(
"path".to_string(),
Json::String(format!("{}{}", root_path, line)),
);
link
})
.collect::<Vec<Map<String, Json>>>();
template_data.insert("links".to_string(), json!(links));
let reg = Handlebars::new();
let dir_template = state.handlebars.get("dir_list").unwrap();
let res = reg.render_template(dir_template, &template_data).unwrap();
return response(StatusCode::OK, Body::from(res), headers);
}
Some(_) => {
// todo(arqu): error on no size
Expand Down Expand Up @@ -459,6 +443,65 @@ async fn serve_fs(
response(StatusCode::OK, body, headers)
}

#[tracing::instrument()]
async fn serve_fs_dir(
dir_list: &Bytes,
req: &Request,
state: Arc<State>,
mut headers: HeaderMap,
start_time: std::time::Instant,
) -> Result<GatewayResponse, GatewayError> {
let dir_list = String::from_utf8(dir_list.to_vec()).unwrap();
let dir_list_lines = dir_list.lines();
let force_dir = req.query_params.force_dir.unwrap_or(false);
if !force_dir {
for line in dir_list_lines.clone() {
if line == "index.html" {
if !req.content_path.ends_with('/') {
let redirect_path = format!(
"{}/{}",
req.content_path,
req.query_params.to_query_string()
);
return Ok(GatewayResponse::redirect(&redirect_path));
}
let mut new_req = req.clone();
new_req
.resolved_path
.extend_tail(vec!["index.html".to_string()]);
new_req.content_path = format!("{}/index.html", req.content_path);
return serve_fs(&new_req, state, headers, start_time).await;
}
}
}

headers.insert(CONTENT_TYPE, HeaderValue::from_str("text/html").unwrap());
// todo(arqu): set etag
// set_etag_headers(&mut headers, metadata.dir_hash.clone());

let mut template_data: Map<String, Json> = Map::new();
let mut root_path = req.content_path.clone();
if !root_path.ends_with('/') {
root_path.push('/');
}
let links = dir_list_lines
.map(|line| {
let mut link = Map::new();
link.insert("name".to_string(), Json::String(get_filename(line)));
link.insert(
"path".to_string(),
Json::String(format!("{}{}", root_path, line)),
);
link
})
.collect::<Vec<Map<String, Json>>>();
template_data.insert("links".to_string(), json!(links));
let reg = Handlebars::new();
let dir_template = state.handlebars.get("dir_list").unwrap();
let res = reg.render_template(dir_template, &template_data).unwrap();
response(StatusCode::OK, Body::from(res), headers)
}

#[tracing::instrument(skip(body))]
fn response<B>(
status_code: StatusCode,
Expand Down
2 changes: 1 addition & 1 deletion iroh-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ mod error;
mod headers;
pub mod metrics;
mod response;
mod templates;
mod templates;
Loading

0 comments on commit 6d8ea20

Please sign in to comment.