diff --git a/dapps/src/apps/cache.rs b/dapps/src/apps/cache.rs index b5acbcb15ae..be9521cf92a 100644 --- a/dapps/src/apps/cache.rs +++ b/dapps/src/apps/cache.rs @@ -18,13 +18,13 @@ use std::fs; use std::sync::{Arc}; -use std::sync::atomic::{AtomicBool, Ordering}; use linked_hash_map::LinkedHashMap; use page::LocalPageEndpoint; +use handlers::FetchControl; pub enum ContentStatus { - Fetching(Arc), + Fetching(Arc), Ready(LocalPageEndpoint), } @@ -57,10 +57,10 @@ impl ContentCache { while len > expected_size { let entry = self.cache.pop_front().unwrap(); match entry.1 { - ContentStatus::Fetching(ref abort) => { + ContentStatus::Fetching(ref fetch) => { trace!(target: "dapps", "Aborting {} because of limit.", entry.0); // Mark as aborted - abort.store(true, Ordering::SeqCst); + fetch.abort() }, ContentStatus::Ready(ref endpoint) => { trace!(target: "dapps", "Removing {} because of limit.", entry.0); diff --git a/dapps/src/apps/fetcher.rs b/dapps/src/apps/fetcher.rs index 502fbe4aad9..cd7a1643b14 100644 --- a/dapps/src/apps/fetcher.rs +++ b/dapps/src/apps/fetcher.rs @@ -23,7 +23,6 @@ use std::{fs, env, fmt}; use std::io::{self, Read, Write}; use std::path::PathBuf; use std::sync::Arc; -use std::sync::atomic::{AtomicBool}; use rustc_serialize::hex::FromHex; use hyper; @@ -76,10 +75,12 @@ impl AppFetcher { } pub fn contains(&self, app_id: &str) -> bool { - let mut dapps = self.dapps.lock(); - // Check if we already have the app - if dapps.get(app_id).is_some() { - return true; + { + let mut dapps = self.dapps.lock(); + // Check if we already have the app + if dapps.get(app_id).is_some() { + return true; + } } // fallback to resolver if let Ok(app_id) = app_id.from_hex() { @@ -115,25 +116,19 @@ impl AppFetcher { (None, endpoint.to_async_handler(path, control)) }, // App is already being fetched - Some(&mut ContentStatus::Fetching(_)) => { - (None, Box::new(ContentHandler::error_with_refresh( - StatusCode::ServiceUnavailable, - "Download In Progress", - "This dapp is already being downloaded. Please wait...", - None, - )) as Box) + Some(&mut ContentStatus::Fetching(ref fetch_control)) => { + trace!(target: "dapps", "Content fetching in progress. Waiting..."); + (None, fetch_control.to_handler(control)) }, // We need to start fetching app None => { + trace!(target: "dapps", "Content fetching unavailable. Fetching..."); let app_hex = app_id.from_hex().expect("to_handler is called only when `contains` returns true."); let app = self.resolver.resolve(app_hex); if let Some(app) = app { - let abort = Arc::new(AtomicBool::new(false)); - - (Some(ContentStatus::Fetching(abort.clone())), Box::new(ContentFetcherHandler::new( + let (handler, fetch_control) = ContentFetcherHandler::new( app, - abort, control, path.using_dapps_domains, DappInstaller { @@ -141,7 +136,9 @@ impl AppFetcher { dapps_path: self.dapps_path.clone(), dapps: self.dapps.clone(), } - )) as Box) + ); + + (Some(ContentStatus::Fetching(fetch_control)), Box::new(handler) as Box) } else { // This may happen when sync status changes in between // `contains` and `to_handler` diff --git a/dapps/src/error_tpl.html b/dapps/src/error_tpl.html index 6551431a65c..c6b4db0e7f5 100644 --- a/dapps/src/error_tpl.html +++ b/dapps/src/error_tpl.html @@ -3,7 +3,6 @@ - {meta} {title} diff --git a/dapps/src/handlers/content.rs b/dapps/src/handlers/content.rs index f283fbb6ade..4dc011475a1 100644 --- a/dapps/src/handlers/content.rs +++ b/dapps/src/handlers/content.rs @@ -23,6 +23,7 @@ use hyper::status::StatusCode; use util::version; +#[derive(Clone)] pub struct ContentHandler { code: StatusCode, content: String, @@ -57,18 +58,6 @@ impl ContentHandler { Self::html(code, format!( include_str!("../error_tpl.html"), title=title, - meta="", - message=message, - details=details.unwrap_or_else(|| ""), - version=version(), - )) - } - - pub fn error_with_refresh(code: StatusCode, title: &str, message: &str, details: Option<&str>) -> Self { - Self::html(code, format!( - include_str!("../error_tpl.html"), - title=title, - meta="", message=message, details=details.unwrap_or_else(|| ""), version=version(), diff --git a/dapps/src/handlers/fetch.rs b/dapps/src/handlers/fetch.rs index 98242f2b3af..33a826ee724 100644 --- a/dapps/src/handlers/fetch.rs +++ b/dapps/src/handlers/fetch.rs @@ -16,17 +16,18 @@ //! Hyper Server Handler that fetches a file during a request (proxy). -use std::fmt; +use std::{fs, fmt}; use std::path::PathBuf; use std::sync::{mpsc, Arc}; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Instant, Duration}; +use util::Mutex; -use hyper::{header, server, Decoder, Encoder, Next, Method, Control}; +use hyper::{server, Decoder, Encoder, Next, Method, Control}; use hyper::net::HttpStream; use hyper::status::StatusCode; -use handlers::ContentHandler; +use handlers::{ContentHandler, Redirection}; use handlers::client::{Client, FetchResult}; use apps::redirection_address; use apps::urlhint::GithubApp; @@ -36,12 +37,9 @@ const FETCH_TIMEOUT: u64 = 30; enum FetchState { NotStarted(GithubApp), + InProgress(mpsc::Receiver), Error(ContentHandler), - InProgress { - deadline: Instant, - receiver: mpsc::Receiver, - }, - Done(Manifest), + Done(Manifest, Redirection), } pub trait ContentValidator { @@ -51,10 +49,93 @@ pub trait ContentValidator { fn done(&self, Option<&Manifest>); } -pub struct ContentFetcherHandler { +pub struct FetchControl { abort: Arc, - control: Option, + listeners: Mutex)>>, + deadline: Instant, +} + +impl Default for FetchControl { + fn default() -> Self { + FetchControl { + abort: Arc::new(AtomicBool::new(false)), + listeners: Mutex::new(Vec::new()), + deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT), + } + } +} + +impl FetchControl { + fn notify FetchState>(&self, status: F) { + let mut listeners = self.listeners.lock(); + for (control, sender) in listeners.drain(..) { + if let Err(e) = sender.send(status()) { + trace!(target: "dapps", "Waiting listener notification failed: {:?}", e); + } else { + let _ = control.ready(Next::read()); + } + } + } + + fn set_status(&self, status: &FetchState) { + match *status { + FetchState::Error(ref handler) => self.notify(|| FetchState::Error(handler.clone())), + FetchState::Done(ref manifest, ref handler) => self.notify(|| FetchState::Done(manifest.clone(), handler.clone())), + FetchState::NotStarted(_) | FetchState::InProgress(_) => {}, + } + } + + pub fn abort(&self) { + self.abort.store(true, Ordering::SeqCst); + } + + pub fn to_handler(&self, control: Control) -> Box + Send> { + let (tx, rx) = mpsc::channel(); + self.listeners.lock().push((control, tx)); + + Box::new(WaitingHandler { + receiver: rx, + state: None, + }) + } +} + +pub struct WaitingHandler { + receiver: mpsc::Receiver, + state: Option, +} + +impl server::Handler for WaitingHandler { + fn on_request(&mut self, _request: server::Request) -> Next { + Next::wait() + } + + fn on_request_readable(&mut self, _decoder: &mut Decoder) -> Next { + self.state = self.receiver.try_recv().ok(); + Next::write() + } + + fn on_response(&mut self, res: &mut server::Response) -> Next { + match self.state { + Some(FetchState::Done(_, ref mut handler)) => handler.on_response(res), + Some(FetchState::Error(ref mut handler)) => handler.on_response(res), + _ => Next::end(), + } + } + + fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { + match self.state { + Some(FetchState::Done(_, ref mut handler)) => handler.on_response_writable(encoder), + Some(FetchState::Error(ref mut handler)) => handler.on_response_writable(encoder), + _ => Next::end(), + } + } +} + +pub struct ContentFetcherHandler { + fetch_control: Arc, status: FetchState, + control: Option, client: Option, using_dapps_domains: bool, dapp: H, @@ -63,7 +144,7 @@ pub struct ContentFetcherHandler { impl Drop for ContentFetcherHandler { fn drop(&mut self) { let manifest = match self.status { - FetchState::Done(ref manifest) => Some(manifest), + FetchState::Done(ref manifest, _) => Some(manifest), _ => None, }; self.dapp.done(manifest); @@ -74,20 +155,22 @@ impl ContentFetcherHandler { pub fn new( app: GithubApp, - abort: Arc, control: Control, using_dapps_domains: bool, - handler: H) -> Self { + handler: H) -> (Self, Arc) { + let fetch_control = Arc::new(FetchControl::default()); let client = Client::new(); - ContentFetcherHandler { - abort: abort, + let handler = ContentFetcherHandler { + fetch_control: fetch_control.clone(), control: Some(control), client: Some(client), status: FetchState::NotStarted(app), using_dapps_domains: using_dapps_domains, dapp: handler, - } + }; + + (handler, fetch_control) } fn close_client(client: &mut Option) { @@ -96,13 +179,13 @@ impl ContentFetcherHandler { .close(); } - fn fetch_app(client: &mut Client, app: &GithubApp, abort: Arc, control: Control) -> Result, String> { - client.request(app.url(), abort, Box::new(move || { + let res = client.request(app.url(), abort, Box::new(move || { trace!(target: "dapps", "Fetching finished."); // Ignoring control errors let _ = control.ready(Next::read()); - })).map_err(|e| format!("{:?}", e)) + })).map_err(|e| format!("{:?}", e)); + res } } @@ -115,12 +198,9 @@ impl server::Handler for ContentFetcherHandler< trace!(target: "dapps", "Fetching dapp: {:?}", app); let control = self.control.take().expect("on_request is called only once, thus control is always Some"); let client = self.client.as_mut().expect("on_request is called before client is closed."); - let fetch = Self::fetch_app(client, app, self.abort.clone(), control); + let fetch = Self::fetch_app(client, app, self.fetch_control.abort.clone(), control); match fetch { - Ok(receiver) => FetchState::InProgress { - deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT), - receiver: receiver, - }, + Ok(receiver) => FetchState::InProgress(receiver), Err(e) => FetchState::Error(ContentHandler::error( StatusCode::BadGateway, "Unable To Start Dapp Download", @@ -140,6 +220,7 @@ impl server::Handler for ContentFetcherHandler< } else { None }; if let Some(status) = status { + self.fetch_control.set_status(&status); self.status = status; } @@ -149,7 +230,7 @@ impl server::Handler for ContentFetcherHandler< fn on_request_readable(&mut self, decoder: &mut Decoder) -> Next { let (status, next) = match self.status { // Request may time out - FetchState::InProgress { ref deadline, .. } if *deadline < Instant::now() => { + FetchState::InProgress(_) if self.fetch_control.deadline < Instant::now() => { trace!(target: "dapps", "Fetching dapp failed because of timeout."); let timeout = ContentHandler::error( StatusCode::GatewayTimeout, @@ -160,7 +241,7 @@ impl server::Handler for ContentFetcherHandler< Self::close_client(&mut self.client); (Some(FetchState::Error(timeout)), Next::write()) }, - FetchState::InProgress { ref receiver, .. } => { + FetchState::InProgress(ref receiver) => { // Check if there is an answer let rec = receiver.try_recv(); match rec { @@ -179,11 +260,13 @@ impl server::Handler for ContentFetcherHandler< Some(&format!("{:?}", e)) )) }, - Ok(manifest) => FetchState::Done(manifest) + Ok(manifest) => { + let address = redirection_address(self.using_dapps_domains, &manifest.id); + FetchState::Done(manifest, Redirection::new(&address)) + }, }; // Remove temporary zip file - // TODO [todr] Uncomment me - // let _ = fs::remove_file(path); + let _ = fs::remove_file(path); (Some(state), Next::write()) }, Ok(Err(e)) => { @@ -205,6 +288,7 @@ impl server::Handler for ContentFetcherHandler< }; if let Some(status) = status { + self.fetch_control.set_status(&status); self.status = status; } @@ -213,12 +297,7 @@ impl server::Handler for ContentFetcherHandler< fn on_response(&mut self, res: &mut server::Response) -> Next { match self.status { - FetchState::Done(ref manifest) => { - trace!(target: "dapps", "Fetching dapp finished. Redirecting to {}", manifest.id); - res.set_status(StatusCode::Found); - res.headers_mut().set(header::Location(redirection_address(self.using_dapps_domains, &manifest.id))); - Next::write() - }, + FetchState::Done(_, ref mut handler) => handler.on_response(res), FetchState::Error(ref mut handler) => handler.on_response(res), _ => Next::end(), } @@ -226,9 +305,9 @@ impl server::Handler for ContentFetcherHandler< fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { match self.status { + FetchState::Done(_, ref mut handler) => handler.on_response_writable(encoder), FetchState::Error(ref mut handler) => handler.on_response_writable(encoder), _ => Next::end(), } } } - diff --git a/dapps/src/handlers/mod.rs b/dapps/src/handlers/mod.rs index 6f6423b58ab..62b13eaa87b 100644 --- a/dapps/src/handlers/mod.rs +++ b/dapps/src/handlers/mod.rs @@ -27,7 +27,7 @@ pub use self::auth::AuthRequiredHandler; pub use self::echo::EchoHandler; pub use self::content::ContentHandler; pub use self::redirect::Redirection; -pub use self::fetch::{ContentFetcherHandler, ContentValidator}; +pub use self::fetch::{ContentFetcherHandler, ContentValidator, FetchControl}; use url::Url; use hyper::{server, header, net, uri}; diff --git a/dapps/src/handlers/redirect.rs b/dapps/src/handlers/redirect.rs index 8b6158266fd..e43d32e24f0 100644 --- a/dapps/src/handlers/redirect.rs +++ b/dapps/src/handlers/redirect.rs @@ -20,15 +20,20 @@ use hyper::{header, server, Decoder, Encoder, Next}; use hyper::net::HttpStream; use hyper::status::StatusCode; +#[derive(Clone)] pub struct Redirection { to_url: String } impl Redirection { - pub fn new(url: &str) -> Box { - Box::new(Redirection { + pub fn new(url: &str) -> Self { + Redirection { to_url: url.to_owned() - }) + } + } + + pub fn boxed(url: &str) -> Box { + Box::new(Self::new(url)) } } diff --git a/dapps/src/router/mod.rs b/dapps/src/router/mod.rs index c93456d710b..24ceb1f3e2e 100644 --- a/dapps/src/router/mod.rs +++ b/dapps/src/router/mod.rs @@ -104,7 +104,7 @@ impl server::Handler for Router { // Redirect any GET request to home. _ if *req.method() == hyper::method::Method::Get => { let address = apps::redirection_address(false, self.main_page); - Redirection::new(address.as_str()) + Redirection::boxed(address.as_str()) }, // RPC by default _ => {