Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Update jsonrpc dependencies and rewrite dapps to futures. (#6522)
Browse files Browse the repository at this point in the history
* Bump version.

* Fix RPC crate.

* Fix BoxFuture in crates.

* Compiles and passes tests!

* Get rid of .boxed()

* Fixing issues with the UI.

* Remove minihttp. Support threads.

* Reimplement files serving to do it in chunks.

* Increase chunk size.

* Remove some unecessary copying.

* Fix tests.

* Fix stratum warning and ipfs todo.

* Switch to proper branch of jsonrpc.

* Update Cargo.lock.

* Update docs.

* Include dapps-glue in workspace.

* fixed merge artifacts

* Fix test compilation.
  • Loading branch information
tomusdrw authored and arkpar committed Oct 5, 2017
1 parent 492da38 commit e8b418c
Show file tree
Hide file tree
Showing 118 changed files with 2,063 additions and 2,881 deletions.
384 changes: 191 additions & 193 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ futures-cpupool = "0.1"
fdlimit = "0.1"
ws2_32-sys = "0.2"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" }
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.8" }
ethsync = { path = "sync" }
ethcore = { path = "ethcore" }
ethcore-util = { path = "util" }
Expand Down Expand Up @@ -117,4 +117,4 @@ lto = false
panic = "abort"

[workspace]
members = ["ethstore/cli", "ethkey/cli", "evmbin", "whisper", "chainspec", "dapps/node-health"]
members = ["ethstore/cli", "ethkey/cli", "evmbin", "whisper", "chainspec", "dapps/js-glue"]
16 changes: 7 additions & 9 deletions dapps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,23 @@ authors = ["Parity Technologies <[email protected]>"]
[dependencies]
base32 = "0.3"
futures = "0.1"
linked-hash-map = "0.3"
futures-cpupool = "0.1"
linked-hash-map = "0.5"
log = "0.3"
parity-dapps-glue = "1.7"
parity-dapps-glue = "1.8"
parking_lot = "0.4"
mime = "0.2"
mime_guess = "1.6.1"
mime_guess = "2.0.0-alpha.2"
rand = "0.3"
rustc-hex = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
time = "0.1.35"
unicase = "1.3"
url = "1.0"
unicase = "1.4"
zip = { version = "0.1", default-features = false }
itertools = "0.5"

jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" }
jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" }
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.8" }
jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.8" }

ethcore-util = { path = "../util" }
ethcore-bigint = { path = "../util/bigint" }
Expand Down
4 changes: 2 additions & 2 deletions dapps/js-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
description = "Base Package for all Parity built-in dapps"
name = "parity-dapps-glue"
version = "1.7.0"
version = "1.8.0"
license = "GPL-3.0"
authors = ["Parity Technologies <[email protected]>"]
build = "build.rs"
Expand All @@ -12,7 +12,7 @@ syntex = { version = "0.58", optional = true }

[dependencies]
glob = { version = "0.2.11" }
mime_guess = { version = "1.6.1" }
mime_guess = { version = "2.0.0-alpha.2" }
aster = { version = "0.41", default-features = false }
quasi = { version = "0.32", default-features = false }
quasi_macros = { version = "0.32", optional = true }
Expand Down
8 changes: 4 additions & 4 deletions dapps/node-health/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;
use std::time;
use futures::{Future, BoxFuture};
use futures::Future;
use futures::sync::oneshot;
use types::{HealthInfo, HealthStatus, Health};
use time::{TimeChecker, MAX_DRIFT};
Expand All @@ -44,7 +44,7 @@ impl NodeHealth {
}

/// Query latest health report.
pub fn health(&self) -> BoxFuture<Health, ()> {
pub fn health(&self) -> Box<Future<Item = Health, Error = ()> + Send> {
trace!(target: "dapps", "Checking node health.");
// Check timediff
let sync_status = self.sync_status.clone();
Expand All @@ -63,7 +63,7 @@ impl NodeHealth {
},
);

rx.map_err(|err| {
Box::new(rx.map_err(|err| {
warn!(target: "dapps", "Health request cancelled: {:?}", err);
}).and_then(move |time| {
// Check peers
Expand Down Expand Up @@ -117,6 +117,6 @@ impl NodeHealth {
};

Ok(Health { peers, sync, time})
}).boxed()
}))
}
}
10 changes: 6 additions & 4 deletions dapps/node-health/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::collections::VecDeque;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;

use futures::{self, Future, BoxFuture};
use futures::{self, Future};
use futures::future::{self, IntoFuture};
use futures_cpupool::{CpuPool, CpuFuture};
use ntp;
Expand Down Expand Up @@ -195,6 +195,8 @@ const UPDATE_TIMEOUT_INCOMPLETE_SECS: u64 = 10;
/// Maximal valid time drift.
pub const MAX_DRIFT: i64 = 500;

type BoxFuture<A, B> = Box<Future<Item = A, Error = B> + Send>;

#[derive(Debug, Clone)]
/// A time checker.
pub struct TimeChecker<N: Ntp = SimpleNtp> {
Expand Down Expand Up @@ -224,7 +226,7 @@ impl<N: Ntp> TimeChecker<N> where <N::Future as IntoFuture>::Future: Send + 'sta
pub fn update(&self) -> BoxFuture<i64, Error> {
trace!(target: "dapps", "Updating time from NTP.");
let last_result = self.last_result.clone();
self.ntp.drift().into_future().then(move |res| {
Box::new(self.ntp.drift().into_future().then(move |res| {
let res = res.map(|d| d.num_milliseconds());

if let Err(Error::NoServersAvailable) = res {
Expand Down Expand Up @@ -255,7 +257,7 @@ impl<N: Ntp> TimeChecker<N> where <N::Future as IntoFuture>::Future: Send + 'sta
let res = select_result(results.iter());
*last_result.write() = (valid_till, results);
res
}).boxed()
}))
}

/// Returns a current time drift or error if last request to NTP server failed.
Expand All @@ -264,7 +266,7 @@ impl<N: Ntp> TimeChecker<N> where <N::Future as IntoFuture>::Future: Send + 'sta
{
let res = self.last_result.read();
if res.0 > time::Instant::now() {
return futures::done(select_result(res.1.iter())).boxed();
return Box::new(futures::done(select_result(res.1.iter())));
}
}
// or update and return result
Expand Down
164 changes: 51 additions & 113 deletions dapps/src/api/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,144 +16,82 @@

use std::sync::Arc;

use hyper::{server, net, Decoder, Encoder, Next, Control};
use hyper::method::Method;
use hyper::status::StatusCode;
use hyper::{Method, StatusCode};

use api::{response, types};
use api::response;
use apps::fetcher::Fetcher;
use handlers::{self, extract_url};
use endpoint::{Endpoint, Handler, EndpointPath};
use node_health::{NodeHealth, HealthStatus, Health};
use parity_reactor::Remote;
use endpoint::{Endpoint, Request, Response, EndpointPath};
use futures::{future, Future};
use node_health::{NodeHealth, HealthStatus};

#[derive(Clone)]
pub struct RestApi {
fetcher: Arc<Fetcher>,
health: NodeHealth,
remote: Remote,
}

impl Endpoint for RestApi {
fn respond(&self, mut path: EndpointPath, req: Request) -> Response {
if let Method::Options = *req.method() {
return Box::new(future::ok(response::empty()));
}

let endpoint = path.app_params.get(0).map(String::to_owned);
let hash = path.app_params.get(1).map(String::to_owned);

// at this point path.app_id contains 'api', adjust it to the hash properly, otherwise
// we will try and retrieve 'api' as the hash when doing the /api/content route
if let Some(ref hash) = hash {
path.app_id = hash.to_owned();
}

trace!(target: "dapps", "Handling /api request: {:?}/{:?}", endpoint, hash);
match endpoint.as_ref().map(String::as_str) {
Some("ping") => Box::new(future::ok(response::ping(req))),
Some("health") => self.health(),
Some("content") => self.resolve_content(hash.as_ref().map(String::as_str), path, req),
_ => Box::new(future::ok(response::not_found())),
}
}
}

impl RestApi {
pub fn new(
fetcher: Arc<Fetcher>,
health: NodeHealth,
remote: Remote,
) -> Box<Endpoint> {
Box::new(RestApi {
fetcher,
health,
remote,
})
}
}

impl Endpoint for RestApi {
fn to_async_handler(&self, path: EndpointPath, control: Control) -> Box<Handler> {
Box::new(RestApiRouter::new((*self).clone(), path, control))
}
}

struct RestApiRouter {
api: RestApi,
path: Option<EndpointPath>,
control: Option<Control>,
handler: Box<Handler>,
}

impl RestApiRouter {
fn new(api: RestApi, path: EndpointPath, control: Control) -> Self {
RestApiRouter {
path: Some(path),
control: Some(control),
api: api,
handler: Box::new(response::as_json_error(StatusCode::NotFound, &types::ApiError {
code: "404".into(),
title: "Not Found".into(),
detail: "Resource you requested has not been found.".into(),
})),
}
}

fn resolve_content(&self, hash: Option<&str>, path: EndpointPath, control: Control) -> Option<Box<Handler>> {
fn resolve_content(&self, hash: Option<&str>, path: EndpointPath, req: Request) -> Response {
trace!(target: "dapps", "Resolving content: {:?} from path: {:?}", hash, path);
match hash {
Some(hash) if self.api.fetcher.contains(hash) => {
Some(self.api.fetcher.to_async_handler(path, control))
Some(hash) if self.fetcher.contains(hash) => {
self.fetcher.respond(path, req)
},
_ => None
_ => Box::new(future::ok(response::not_found())),
}
}

fn health(&self, control: Control) -> Box<Handler> {
let map = move |health: Result<Result<Health, ()>, ()>| {
let status = match health {
Ok(Ok(ref health)) => {
if [&health.peers.status, &health.sync.status].iter().any(|x| *x != &HealthStatus::Ok) {
StatusCode::PreconditionFailed // HTTP 412
} else {
StatusCode::Ok // HTTP 200
}
},
_ => StatusCode::ServiceUnavailable, // HTTP 503
};

response::as_json(status, &health)
};
let health = self.api.health.health();
let remote = self.api.remote.clone();
Box::new(handlers::AsyncHandler::new(health, map, remote, control))
}
}

impl server::Handler<net::HttpStream> for RestApiRouter {
fn on_request(&mut self, request: server::Request<net::HttpStream>) -> Next {
if let Method::Options = *request.method() {
self.handler = response::empty();
return Next::write();
}

// TODO [ToDr] Consider using `path.app_params` instead
let url = extract_url(&request);
if url.is_none() {
// Just return 404 if we can't parse URL
return Next::write();
}

let url = url.expect("Check for None early-exists above; qed");
let mut path = self.path.take().expect("on_request called only once, and path is always defined in new; qed");
let control = self.control.take().expect("on_request called only once, and control is always defined in new; qed");

let endpoint = url.path.get(1).map(|v| v.as_str());
let hash = url.path.get(2).map(|v| v.as_str());
// at this point path.app_id contains 'api', adjust it to the hash properly, otherwise
// we will try and retrieve 'api' as the hash when doing the /api/content route
if let Some(ref hash) = hash { path.app_id = hash.clone().to_owned() }

let handler = endpoint.and_then(|v| match v {
"ping" => Some(response::ping()),
"health" => Some(self.health(control)),
"content" => self.resolve_content(hash, path, control),
_ => None
});

// Overwrite default
if let Some(h) = handler {
self.handler = h;
}

self.handler.on_request(request)
}

fn on_request_readable(&mut self, decoder: &mut Decoder<net::HttpStream>) -> Next {
self.handler.on_request_readable(decoder)
}

fn on_response(&mut self, res: &mut server::Response) -> Next {
self.handler.on_response(res)
}

fn on_response_writable(&mut self, encoder: &mut Encoder<net::HttpStream>) -> Next {
self.handler.on_response_writable(encoder)
fn health(&self) -> Response {
Box::new(self.health.health()
.then(|health| {
let status = match health {
Ok(ref health) => {
if [&health.peers.status, &health.sync.status].iter().any(|x| *x != &HealthStatus::Ok) {
StatusCode::PreconditionFailed // HTTP 412
} else {
StatusCode::Ok // HTTP 200
}
},
_ => StatusCode::ServiceUnavailable, // HTTP 503
};

Ok(response::as_json(status, &health).into())
})
)
}
}
25 changes: 13 additions & 12 deletions dapps/src/api/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,28 @@

use serde::Serialize;
use serde_json;
use hyper::status::StatusCode;
use hyper::{self, mime, StatusCode};

use endpoint::Handler;
use handlers::{ContentHandler, EchoHandler};

pub fn empty() -> Box<Handler> {
Box::new(ContentHandler::ok("".into(), mime!(Text/Plain)))
pub fn empty() -> hyper::Response {
ContentHandler::ok("".into(), mime::TEXT_PLAIN).into()
}

pub fn as_json<T: Serialize>(status: StatusCode, val: &T) -> ContentHandler {
pub fn as_json<T: Serialize>(status: StatusCode, val: &T) -> hyper::Response {
let json = serde_json::to_string(val)
.expect("serialization to string is infallible; qed");
ContentHandler::new(status, json, mime!(Application/Json))
ContentHandler::new(status, json, mime::APPLICATION_JSON).into()
}

pub fn as_json_error<T: Serialize>(status: StatusCode, val: &T) -> ContentHandler {
let json = serde_json::to_string(val)
.expect("serialization to string is infallible; qed");
ContentHandler::new(status, json, mime!(Application/Json))
pub fn ping(req: hyper::Request) -> hyper::Response {
EchoHandler::new(req).into()
}

pub fn ping() -> Box<Handler> {
Box::new(EchoHandler::default())
pub fn not_found() -> hyper::Response {
as_json(StatusCode::NotFound, &::api::types::ApiError {
code: "404".into(),
title: "Not Found".into(),
detail: "Resource you requested has not been found.".into(),
})
}
4 changes: 2 additions & 2 deletions dapps/src/apps/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
use std::fs;

use linked_hash_map::LinkedHashMap;
use page::LocalPageEndpoint;
use page::local;
use handlers::FetchControl;

pub enum ContentStatus {
Fetching(FetchControl),
Ready(LocalPageEndpoint),
Ready(local::Dapp),
}

#[derive(Default)]
Expand Down
Loading

0 comments on commit e8b418c

Please sign in to comment.