diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 82b6ea12686..5e970b837fd 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -44,13 +44,13 @@ pub mod provider; #[cfg(feature = "ipc")] pub mod provider { - #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues + #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues include!(concat!(env!("OUT_DIR"), "/provider.rs")); } #[cfg(feature = "ipc")] pub mod remote { - pub use provider::LightProviderClient; + pub use provider::LightProviderClient; } mod types; diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs index 64ddd19a355..33009d7f607 100644 --- a/ethcore/light/src/net/context.rs +++ b/ethcore/light/src/net/context.rs @@ -20,7 +20,7 @@ use network::{NetworkContext, PeerId, NodeId}; use super::{Announcement, LightProtocol, ReqId}; use super::error::Error; -use request::Requests; +use request::NetworkRequests as Requests; /// An I/O context which allows sending and receiving packets as well as /// disconnecting peers. This is used as a generalization of the portions diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 1919724514d..798f68fe525 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -33,7 +33,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use provider::Provider; -use request::{Request, Requests, Response}; +use request::{Request, NetworkRequests as Requests, Response}; use self::request_credits::{Credits, FlowParams}; use self::context::{Ctx, TickCtx}; @@ -108,9 +108,14 @@ mod timeout { } /// A request id. +#[cfg(not(test))] #[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)] pub struct ReqId(usize); +#[cfg(test)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)] +pub struct ReqId(pub usize); + impl fmt::Display for ReqId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Request #{}", self.0) diff --git a/ethcore/light/src/net/request_set.rs b/ethcore/light/src/net/request_set.rs index 094fa189420..c5608050f54 100644 --- a/ethcore/light/src/net/request_set.rs +++ b/ethcore/light/src/net/request_set.rs @@ -25,7 +25,7 @@ use std::collections::{BTreeMap, HashMap}; use std::iter::FromIterator; use request::Request; -use request::Requests; +use request::NetworkRequests as Requests; use net::{timeout, ReqId}; use util::U256; diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 6dc5fbe7ec5..94788a72707 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -39,14 +39,14 @@ use std::sync::Arc; // helper for encoding a single request into a packet. // panics on bad backreference. -fn encode_single(request: Request) -> Requests { +fn encode_single(request: Request) -> NetworkRequests { let mut builder = RequestBuilder::default(); builder.push(request).unwrap(); builder.build() } // helper for making a packet out of `Requests`. -fn make_packet(req_id: usize, requests: &Requests) -> Vec { +fn make_packet(req_id: usize, requests: &NetworkRequests) -> Vec { let mut stream = RlpStream::new_list(2); stream.append(&req_id).append_list(&requests.requests()); stream.out() diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index c756844c9a1..e61c126d6f5 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -18,20 +18,17 @@ //! The request service is implemented using Futures. Higher level request handlers //! will take the raw data received here and extract meaningful results from it. -// TODO [ToDr] Suppressing deprecation warnings. Rob will fix the API anyway. -#![allow(deprecated)] - use std::collections::HashMap; +use std::marker::PhantomData; use std::sync::Arc; use ethcore::basic_account::BasicAccount; use ethcore::encoded; use ethcore::receipt::Receipt; -use ethcore::state::ProvedExecution; use ethcore::executed::{Executed, ExecutionError}; -use futures::{Async, Poll, Future}; -use futures::sync::oneshot::{self, Sender, Receiver}; +use futures::{future, Async, Poll, Future, BoxFuture}; +use futures::sync::oneshot::{self, Sender, Receiver, Canceled}; use network::PeerId; use rlp::RlpStream; use util::{Bytes, RwLock, Mutex, U256, H256}; @@ -39,10 +36,19 @@ use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY, SHA3_EMPTY_LIST_RLP}; use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use cache::Cache; -use request::{self as basic_request, Request as NetworkRequest, Response as NetworkResponse}; +use request::{self as basic_request, Request as NetworkRequest}; +use self::request::CheckedRequest; + +pub use self::request::{Request, Response}; + +#[cfg(test)] +mod tests; pub mod request; +/// The result of execution +pub type ExecutionResult = Result; + // relevant peer info. struct Peer { status: Status, @@ -50,146 +56,154 @@ struct Peer { } impl Peer { - // Whether a given peer can handle a specific request. - fn can_handle(&self, pending: &Pending) -> bool { - match *pending { - Pending::HeaderProof(ref req, _) => - self.capabilities.serve_headers && self.status.head_num > req.num(), - Pending::HeaderByHash(_, _) => self.capabilities.serve_headers, - Pending::Block(ref req, _) => - self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.header.number()), - Pending::BlockReceipts(ref req, _) => - self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.0.number()), - Pending::Account(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()), - Pending::Code(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.block_id.1), - Pending::TxProof(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()), - } - } -} + // whether this peer can fulfill the + fn can_fulfill(&self, c: &Capabilities) -> bool { + let caps = &self.capabilities; -// Which portions of a CHT proof should be sent. -enum ChtProofSender { - Both(Sender<(H256, U256)>), - Hash(Sender), - ChainScore(Sender), + caps.serve_headers == c.serve_headers && + caps.serve_chain_since >= c.serve_chain_since && + caps.serve_state_since >= c.serve_chain_since + } } // Attempted request info and sender to put received value. -enum Pending { - HeaderProof(request::HeaderProof, ChtProofSender), - HeaderByHash(request::HeaderByHash, Sender), - Block(request::Body, Sender), - BlockReceipts(request::BlockReceipts, Sender>), - Account(request::Account, Sender), - Code(request::Code, Sender), - TxProof(request::TransactionProof, Sender>), +struct Pending { + requests: basic_request::Requests, + net_requests: basic_request::Requests, + required_capabilities: Capabilities, + responses: Vec, + sender: oneshot::Sender>, } -impl Pending { - // Create a network request. - fn make_request(&self) -> NetworkRequest { - match *self { - Pending::HeaderByHash(ref req, _) => NetworkRequest::Headers(basic_request::IncompleteHeadersRequest { - start: basic_request::HashOrNumber::Hash(req.0).into(), - skip: 0, - max: 1, - reverse: false, - }), - Pending::HeaderProof(ref req, _) => NetworkRequest::HeaderProof(basic_request::IncompleteHeaderProofRequest { - num: req.num().into(), - }), - Pending::Block(ref req, _) => NetworkRequest::Body(basic_request::IncompleteBodyRequest { - hash: req.hash.into(), - }), - Pending::BlockReceipts(ref req, _) => NetworkRequest::Receipts(basic_request::IncompleteReceiptsRequest { - hash: req.0.hash().into(), - }), - Pending::Account(ref req, _) => NetworkRequest::Account(basic_request::IncompleteAccountRequest { - block_hash: req.header.hash().into(), - address_hash: ::util::Hashable::sha3(&req.address).into(), - }), - Pending::Code(ref req, _) => NetworkRequest::Code(basic_request::IncompleteCodeRequest { - block_hash: req.block_id.0.into(), - code_hash: req.code_hash.into(), - }), - Pending::TxProof(ref req, _) => NetworkRequest::Execution(basic_request::IncompleteExecutionRequest { - block_hash: req.header.hash().into(), - from: req.tx.sender(), - gas: req.tx.gas, - gas_price: req.tx.gas_price, - action: req.tx.action.clone(), - value: req.tx.value, - data: req.tx.data.clone(), - }), +// helper to guess capabilities required for a given batch of network requests. +fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities { + let mut caps = Capabilities { + serve_headers: false, + serve_chain_since: None, + serve_state_since: None, + tx_relay: false, + }; + + let update_since = |current: &mut Option, new| + *current = match *current { + Some(x) => Some(::std::cmp::min(x, new)), + None => Some(new), + }; + + for request in requests { + match *request { + // TODO: might be worth returning a required block number for this also. + CheckedRequest::HeaderProof(_, _) => + caps.serve_headers = true, + CheckedRequest::HeaderByHash(_, _) => + caps.serve_headers = true, + CheckedRequest::Body(ref req, _) => + update_since(&mut caps.serve_chain_since, req.header.number()), + CheckedRequest::Receipts(ref req, _) => + update_since(&mut caps.serve_chain_since, req.0.number()), + CheckedRequest::Account(ref req, _) => + update_since(&mut caps.serve_state_since, req.header.number()), + CheckedRequest::Code(ref req, _) => + update_since(&mut caps.serve_state_since, req.block_id.1), + CheckedRequest::Execution(ref req, _) => + update_since(&mut caps.serve_state_since, req.header.number()), } } + + caps +} + +/// A future extracting the concrete output type of the generic adapter +/// from a vector of responses. +pub struct OnResponses { + receiver: Receiver>, + _marker: PhantomData, +} + +impl Future for OnResponses { + type Item = T::Out; + type Error = Canceled; + + fn poll(&mut self) -> Poll { + self.receiver.poll().map(|async| async.map(T::extract_from)) + } } /// On demand request service. See module docs for more details. /// Accumulates info about all peers' capabilities and dispatches /// requests to them accordingly. +// lock in declaration order. pub struct OnDemand { + pending: RwLock>, peers: RwLock>, - pending_requests: RwLock>, + in_transit: RwLock>, cache: Arc>, - orphaned_requests: RwLock>, - start_nonce: U256, + no_immediate_dispatch: bool, } -const RECEIVER_IN_SCOPE: &'static str = "Receiver is still in scope, so it's not dropped; qed"; - impl OnDemand { /// Create a new `OnDemand` service with the given cache. - pub fn new(cache: Arc>, account_start_nonce: U256) -> Self { + pub fn new(cache: Arc>) -> Self { OnDemand { + pending: RwLock::new(Vec::new()), peers: RwLock::new(HashMap::new()), - pending_requests: RwLock::new(HashMap::new()), + in_transit: RwLock::new(HashMap::new()), cache: cache, - orphaned_requests: RwLock::new(Vec::new()), - start_nonce: account_start_nonce, + no_immediate_dispatch: true, } } + // make a test version: this doesn't dispatch pending requests + // until you trigger it manually. + #[cfg(test)] + fn new_test(cache: Arc>) -> Self { + let mut me = OnDemand::new(cache); + me.no_immediate_dispatch = true; + + me + } + /// Request a header's hash by block number and CHT root hash. /// Returns the hash. - pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver { - let (sender, receiver) = oneshot::channel(); + pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture { let cached = { let mut cache = self.cache.lock(); cache.block_hash(&req.num()) }; match cached { - Some(hash) => sender.send(hash).expect(RECEIVER_IN_SCOPE), - None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))), + Some(hash) => future::ok(hash).boxed(), + None => { + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .map(|(h, _)| h) + .boxed() + }, } - receiver } /// Request a canonical block's chain score. /// Returns the chain score. - pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver { - let (sender, receiver) = oneshot::channel(); + pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture { let cached = { let mut cache = self.cache.lock(); cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash)) }; match cached { - Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE), - None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))), + Some(score) => future::ok(score).boxed(), + None => { + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .map(|(_, s)| s) + .boxed() + }, } - - receiver } /// Request a canonical block's hash and chain score by number. /// Returns the hash and chain score. - pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> { - let (sender, receiver) = oneshot::channel(); + pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<(H256, U256), Canceled> { let cached = { let mut cache = self.cache.lock(); let hash = cache.block_hash(&req.num()); @@ -200,31 +214,33 @@ impl OnDemand { }; match cached { - (Some(hash), Some(score)) => sender.send((hash, score)).expect(RECEIVER_IN_SCOPE), - _ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))), + (Some(hash), Some(score)) => future::ok((hash, score)).boxed(), + _ => { + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .boxed() + }, } - - receiver } /// Request a header by hash. This is less accurate than by-number because we don't know /// where in the chain this header lies, and therefore can't find a peer who is supposed to have /// it as easily. - pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver { - let (sender, receiver) = oneshot::channel(); + pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> BoxFuture { match { self.cache.lock().block_header(&req.0) } { - Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE), - None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)), + Some(hdr) => future::ok(hdr).boxed(), + None => { + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .boxed() + }, } - receiver } /// Request a block, given its header. Block bodies are requestable by hash only, /// and the header is required anyway to verify and complete the block body /// -- this just doesn't obscure the network query. - pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver { - let (sender, receiver) = oneshot::channel(); - + pub fn block(&self, ctx: &BasicContext, req: request::Body) -> BoxFuture { // fast path for empty body. if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP { let mut stream = RlpStream::new_list(3); @@ -232,7 +248,7 @@ impl OnDemand { stream.begin_list(0); stream.begin_list(0); - sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); + future::ok(encoded::Block::new(stream.out())).boxed() } else { match { self.cache.lock().block_body(&req.hash) } { Some(body) => { @@ -242,98 +258,124 @@ impl OnDemand { stream.append_raw(&body.at(0).as_raw(), 1); stream.append_raw(&body.at(1).as_raw(), 1); - sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); + future::ok(encoded::Block::new(stream.out())).boxed() + } + None => { + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .boxed() } - None => self.dispatch(ctx, Pending::Block(req, sender)), } } - receiver } /// Request the receipts for a block. The header serves two purposes: /// provide the block hash to fetch receipts for, and for verification of the receipts root. - pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver> { - let (sender, receiver) = oneshot::channel(); - + pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> BoxFuture, Canceled> { // fast path for empty receipts. if req.0.receipts_root() == SHA3_NULL_RLP { - sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE); - } else { - match { self.cache.lock().block_receipts(&req.0.hash()) } { - Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE), - None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)), - } + return future::ok(Vec::new()).boxed() } - receiver + match { self.cache.lock().block_receipts(&req.0.hash()) } { + Some(receipts) => future::ok(receipts).boxed(), + None => { + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .boxed() + }, + } } /// Request an account by address and block header -- which gives a hash to query and a state root /// to verify against. - pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver { - let (sender, receiver) = oneshot::channel(); - self.dispatch(ctx, Pending::Account(req, sender)); - receiver + /// `None` here means that no account by the queried key exists in the queried state. + pub fn account(&self, ctx: &BasicContext, req: request::Account) -> BoxFuture, Canceled> { + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .boxed() } /// Request code by address, known code hash, and block header. - pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver { - let (sender, receiver) = oneshot::channel(); - + pub fn code(&self, ctx: &BasicContext, req: request::Code) -> BoxFuture { // fast path for no code. if req.code_hash == SHA3_EMPTY { - sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE) + future::ok(Vec::new()).boxed() } else { - self.dispatch(ctx, Pending::Code(req, sender)); + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .boxed() } - - receiver } /// Request proof-of-execution for a transaction. - pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver> { - let (sender, receiver) = oneshot::channel(); + pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> BoxFuture { + self.request(ctx, req) + .expect("request given fully fleshed out; qed") + .boxed() + } - self.dispatch(ctx, Pending::TxProof(req, sender)); + /// Submit a vector of requests to be processed together. + /// + /// Fails if back-references are not coherent. + /// The returned vector of responses will correspond to the requests exactly. + pub fn request_raw(&self, ctx: &BasicContext, requests: Vec) + -> Result>, basic_request::NoSuchOutput> + { + let (sender, receiver) = oneshot::channel(); - receiver - } + if requests.is_empty() { + assert!(sender.send(Vec::new()).is_ok(), "receiver still in scope; qed"); + return Ok(receiver); + } - // dispatch the request, with a "suitability" function to filter acceptable peers. - fn dispatch(&self, ctx: &BasicContext, pending: Pending) { let mut builder = basic_request::RequestBuilder::default(); - builder.push(pending.make_request()) - .expect("make_request always returns fully complete request; qed"); - - let complete = builder.build(); - - let kind = complete.requests()[0].kind(); - for (id, peer) in self.peers.read().iter() { - if !peer.can_handle(&pending) { continue } - match ctx.request_from(*id, complete.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "{}: Assigned {:?} to peer {}", - req_id, kind, id); - - self.pending_requests.write().insert( - req_id, - pending, - ); - return - } - Err(net::Error::NoCredits) => {} - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } + + let responses = Vec::with_capacity(requests.len()); + for request in requests { + builder.push(CheckedRequest::from(request))?; } - self.orphaned_requests.write().push(pending); + let requests = builder.build(); + let net_requests = requests.clone().map_requests(|req| req.into_net_request()); + let capabilities = guess_capabilities(requests.requests()); + + self.pending.write().push(Pending { + requests: requests, + net_requests: net_requests, + required_capabilities: capabilities, + responses: responses, + sender: sender, + }); + + self.attempt_dispatch(ctx); + + Ok(receiver) } + /// Submit a strongly-typed batch of requests. + /// + /// Fails if back-reference are not coherent. + pub fn request(&self, ctx: &BasicContext, requests: T) -> Result, basic_request::NoSuchOutput> + where T: request::RequestAdapter + { + self.request_raw(ctx, requests.make_requests()).map(|recv| OnResponses { + receiver: recv, + _marker: PhantomData, + }) + } - // dispatch orphaned requests, and discard those for which the corresponding + // maybe dispatch pending requests. + // sometimes + fn attempt_dispatch(&self, ctx: &BasicContext) { + if !self.no_immediate_dispatch { + self.dispatch_pending(ctx) + } + } + + // dispatch pending requests, and discard those for which the corresponding // receiver has been dropped. - fn dispatch_orphaned(&self, ctx: &BasicContext) { + fn dispatch_pending(&self, ctx: &BasicContext) { // wrapper future for calling `poll_cancel` on our `Senders` to preserve // the invariant that it's always within a task. struct CheckHangup<'a, T: 'a>(&'a mut Sender); @@ -356,35 +398,44 @@ impl OnDemand { CheckHangup(send).wait().expect("CheckHangup always returns ok; qed") } - if self.orphaned_requests.read().is_empty() { return } - - let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new()); - - trace!(target: "on_demand", "Attempting to dispatch {} orphaned requests.", to_dispatch.len()); - for mut orphaned in to_dispatch { - let hung_up = match orphaned { - Pending::HeaderProof(_, ref mut sender) => match *sender { - ChtProofSender::Both(ref mut s) => check_hangup(s), - ChtProofSender::Hash(ref mut s) => check_hangup(s), - ChtProofSender::ChainScore(ref mut s) => check_hangup(s), - }, - Pending::HeaderByHash(_, ref mut sender) => check_hangup(sender), - Pending::Block(_, ref mut sender) => check_hangup(sender), - Pending::BlockReceipts(_, ref mut sender) => check_hangup(sender), - Pending::Account(_, ref mut sender) => check_hangup(sender), - Pending::Code(_, ref mut sender) => check_hangup(sender), - Pending::TxProof(_, ref mut sender) => check_hangup(sender), - }; - - if !hung_up { self.dispatch(ctx, orphaned) } - } + if self.pending.read().is_empty() { return } + let mut pending = self.pending.write(); + + // iterate over all pending requests, and check them for hang-up. + // then, try and find a peer who can serve it. + let peers = self.peers.read(); + *pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter() + .filter_map(|mut pending| match check_hangup(&mut pending.sender) { + false => Some(pending), + true => None, + }) + .filter_map(|pending| { + for (peer_id, peer) in peers.iter() { // .shuffle? + // TODO: see which requests can be answered by the cache? + + if !peer.can_fulfill(&pending.required_capabilities) { + continue + } + + match ctx.request_from(*peer_id, pending.net_requests.clone()) { + Ok(req_id) => { + self.in_transit.write().insert(req_id, pending); + return None + } + Err(net::Error::NoCredits) => {} + Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e), + } + } + Some(pending) + }) + .collect(); // `pending` now contains all requests we couldn't dispatch. } } impl Handler for OnDemand { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); - self.dispatch_orphaned(ctx.as_basic()); + self.attempt_dispatch(ctx.as_basic()); } fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { @@ -392,16 +443,16 @@ impl Handler for OnDemand { let ctx = ctx.as_basic(); { - let mut orphaned = self.orphaned_requests.write(); + let mut pending = self.pending.write(); for unfulfilled in unfulfilled { - if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { + if let Some(unfulfilled) = self.in_transit.write().remove(unfulfilled) { trace!(target: "on_demand", "Attempting to reassign dropped request"); - orphaned.push(pending); + pending.push(unfulfilled); } } } - self.dispatch_orphaned(ctx); + self.attempt_dispatch(ctx); } fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { @@ -413,183 +464,70 @@ impl Handler for OnDemand { } } - self.dispatch_orphaned(ctx.as_basic()); + self.attempt_dispatch(ctx.as_basic()); } fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) { - let peer = ctx.peer(); - let req = match self.pending_requests.write().remove(&req_id) { + use request::IncompleteRequest; + + let mut pending = match self.in_transit.write().remove(&req_id) { Some(req) => req, None => return, }; - let response = match responses.get(0) { - Some(response) => response, - None => { - trace!(target: "on_demand", "Ignoring empty response for request {}", req_id); - self.dispatch(ctx.as_basic(), req); - return; - } - }; - - trace!(target: "on_demand", "Handling response for request {}, kind={:?}", req_id, response.kind()); - - // handle the response appropriately for the request. - // all branches which do not return early lead to disabling of the peer - // due to misbehavior. - match req { - Pending::HeaderProof(req, sender) => { - if let NetworkResponse::HeaderProof(ref response) = *response { - match req.check_response(&response.proof) { - Ok((hash, score)) => { - let mut cache = self.cache.lock(); - cache.insert_block_hash(req.num(), hash); - cache.insert_chain_score(hash, score); - - match sender { - ChtProofSender::Both(sender) => { let _ = sender.send((hash, score)); } - ChtProofSender::Hash(sender) => { let _ = sender.send(hash); } - ChtProofSender::ChainScore(sender) => { let _ = sender.send(score); } - } - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e), - } - } - } - Pending::HeaderByHash(req, sender) => { - if let NetworkResponse::Headers(ref response) = *response { - if let Some(header) = response.headers.get(0) { - match req.check_response(header) { - Ok(header) => { - self.cache.lock().insert_block_header(req.0, header.clone()); - let _ = sender.send(header); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e), - } - } - } - } - Pending::Block(req, sender) => { - if let NetworkResponse::Body(ref response) = *response { - match req.check_response(&response.body) { - Ok(block) => { - self.cache.lock().insert_block_body(req.hash, response.body.clone()); - let _ = sender.send(block); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for block request: {:?}", e), - } - } - } - Pending::BlockReceipts(req, sender) => { - if let NetworkResponse::Receipts(ref response) = *response { - match req.check_response(&response.receipts) { - Ok(receipts) => { - let hash = req.0.hash(); - self.cache.lock().insert_block_receipts(hash, receipts.clone()); - let _ = sender.send(receipts); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for receipts request: {:?}", e), - } + // for each incoming response + // 1. ensure verification data filled. (still TODO since on_demand doesn't use back-references yet) + // 2. pending.requests.supply_response + // 3. if extracted on-demand response, keep it for later. + for response in responses { + match pending.requests.supply_response(&*self.cache, response) { + Ok(response) => { + pending.responses.push(response) } - } - Pending::Account(req, sender) => { - if let NetworkResponse::Account(ref response) = *response { - match req.check_response(&response.proof) { - Ok(account) => { - let account = account.unwrap_or_else(|| { - BasicAccount { - balance: 0.into(), - nonce: self.start_nonce, - code_hash: SHA3_EMPTY, - storage_root: SHA3_NULL_RLP - } - }); - - // TODO: validate against request outputs. - // needs engine + env info as part of request. - let _ = sender.send(account); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e), - } - } - } - Pending::Code(req, sender) => { - if let NetworkResponse::Code(ref response) = *response { - match req.check_response(response.code.as_slice()) { - Ok(()) => { - let _ = sender.send(response.code.clone()); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for code request: {:?}", e), - } - } - } - Pending::TxProof(req, sender) => { - if let NetworkResponse::Execution(ref response) = *response { - match req.check_response(&response.items) { - ProvedExecution::Complete(executed) => { - let _ = sender.send(Ok(executed)); - return - } - ProvedExecution::Failed(err) => { - let _ = sender.send(Err(err)); - return - } - ProvedExecution::BadProof => warn!(target: "on_demand", "Error handling response for transaction proof request"), - } + Err(e) => { + let peer = ctx.peer(); + debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e); + ctx.disable_peer(peer); + + break; } } } - ctx.disable_peer(peer); - } + pending.requests.fill_unanswered(); + if pending.requests.is_complete() { + let _ = pending.sender.send(pending.responses); - fn tick(&self, ctx: &BasicContext) { - self.dispatch_orphaned(ctx) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::sync::Arc; - - use cache::Cache; - use net::{Announcement, BasicContext, ReqId, Error as LesError}; - use request::Requests; - - use network::{PeerId, NodeId}; - use time::Duration; - use util::{H256, Mutex}; + return; + } - struct FakeContext; - impl BasicContext for FakeContext { - fn persistent_peer_id(&self, _: PeerId) -> Option { None } - fn request_from(&self, _: PeerId, _: Requests) -> Result { - unimplemented!() + // update network requests (unless we're done, in which case fulfill the future.) + let mut builder = basic_request::RequestBuilder::default(); + let num_answered = pending.requests.num_answered(); + let mut mapping = move |idx| idx - num_answered; + + for request in pending.requests.requests().iter().skip(num_answered) { + let mut net_req = request.clone().into_net_request(); + + // all back-references with request index less than `num_answered` have + // been filled by now. all remaining requests point to nothing earlier + // than the next unanswered request. + net_req.adjust_refs(&mut mapping); + builder.push(net_req) + .expect("all back-references to answered requests have been filled; qed"); } - fn make_announcement(&self, _: Announcement) { } - fn disconnect_peer(&self, _: PeerId) { } - fn disable_peer(&self, _: PeerId) { } - } - #[test] - fn detects_hangup() { - let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6)))); - let on_demand = OnDemand::new(cache, 0.into()); - let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default())); + // update pending fields and re-queue. + let capabilities = guess_capabilities(&pending.requests.requests()[num_answered..]); + pending.net_requests = builder.build(); + pending.required_capabilities = capabilities; - assert!(on_demand.orphaned_requests.read().len() == 1); - drop(result); + self.pending.write().push(pending); + self.attempt_dispatch(ctx.as_basic()); + } - on_demand.dispatch_orphaned(&FakeContext); - assert!(on_demand.orphaned_requests.read().is_empty()); + fn tick(&self, ctx: &BasicContext) { + self.attempt_dispatch(ctx) } } diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index 8a37ddf7b41..c9a5c4d9b26 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -26,17 +26,374 @@ use ethcore::receipt::Receipt; use ethcore::state::{self, ProvedExecution}; use ethcore::transaction::SignedTransaction; +use request::{self as net_request, IncompleteRequest, Output, OutputKind}; + use rlp::{RlpStream, UntrustedRlp}; -use util::{Address, Bytes, DBValue, HashDB, H256, U256}; +use util::{Address, Bytes, DBValue, HashDB, Mutex, H256, U256}; use util::memorydb::MemoryDB; use util::sha3::Hashable; use util::trie::{Trie, TrieDB, TrieError}; +const SUPPLIED_MATCHES: &'static str = "supplied responses always match produced requests; enforced by `check_response`; qed"; + +/// Core unit of the API: submit batches of these to be answered with `Response`s. +#[derive(Clone)] +pub enum Request { + /// A request for a header proof. + HeaderProof(HeaderProof), + /// A request for a header by hash. + HeaderByHash(HeaderByHash), + /// A request for block receipts. + Receipts(BlockReceipts), + /// A request for a block body. + Body(Body), + /// A request for an account. + Account(Account), + /// A request for a contract's code. + Code(Code), + /// A request for proof of execution. + Execution(TransactionProof), +} + +/// A request argument. +pub trait RequestArg { + /// the response type. + type Out; + + /// Create the request type. + /// `extract` must not fail when presented with the corresponding + /// `Response`. + fn make(self) -> Request; + + /// May not panic if the response corresponds with the request + /// from `make`. + /// Is free to panic otherwise. + fn extract(r: Response) -> Self::Out; +} + +/// An adapter can be thought of as a grouping of request argument types. +/// This is implemented for various tuples and convenient types. +pub trait RequestAdapter { + /// The output type. + type Out; + + /// Infallibly produce requests. When `extract_from` is presented + /// with the corresponding response vector, it may not fail. + fn make_requests(self) -> Vec; + + /// Extract the output type from the given responses. + /// If they are the corresponding responses to the requests + /// made by `make_requests`, do not panic. + fn extract_from(Vec) -> Self::Out; +} + +// helper to implement `RequestArg` and `From` for a single request kind. +macro_rules! impl_single { + ($variant: ident, $me: ty, $out: ty) => { + impl RequestArg for $me { + type Out = $out; + + fn make(self) -> Request { + Request::$variant(self) + } + + fn extract(r: Response) -> $out { + match r { + Response::$variant(x) => x, + _ => panic!(SUPPLIED_MATCHES), + } + } + } + + impl From<$me> for Request { + fn from(me: $me) -> Request { + Request::$variant(me) + } + } + } +} + +// implement traits for each kind of request. +impl_single!(HeaderProof, HeaderProof, (H256, U256)); +impl_single!(HeaderByHash, HeaderByHash, encoded::Header); +impl_single!(Receipts, BlockReceipts, Vec); +impl_single!(Body, Body, encoded::Block); +impl_single!(Account, Account, Option); +impl_single!(Code, Code, Bytes); +impl_single!(Execution, TransactionProof, super::ExecutionResult); + +macro_rules! impl_args { + () => { + impl RequestAdapter for T { + type Out = T::Out; + + fn make_requests(self) -> Vec { + vec![self.make()] + } + + fn extract_from(mut responses: Vec) -> Self::Out { + T::extract(responses.pop().expect(SUPPLIED_MATCHES)) + } + } + }; + ($first: ident, $($next: ident,)*) => { + impl< + $first: RequestArg, + $($next: RequestArg,)* + > + RequestAdapter for ($first, $($next,)*) { + type Out = ($first::Out, $($next::Out,)*); + + fn make_requests(self) -> Vec { + let ($first, $($next,)*) = self; + + vec![ + $first.make(), + $($next.make(),)* + ] + } + + fn extract_from(responses: Vec) -> Self::Out { + let mut iter = responses.into_iter(); + ( + $first::extract(iter.next().expect(SUPPLIED_MATCHES)), + $($next::extract(iter.next().expect(SUPPLIED_MATCHES)),)* + ) + } + } + impl_args!($($next,)*); + } +} + +mod impls { + #![allow(non_snake_case)] + + use super::{RequestAdapter, RequestArg, Request, Response, SUPPLIED_MATCHES}; + + impl_args!(A, B, C, D, E, F, G, H, I, J, K, L,); +} + +/// Requests coupled with their required data for verification. +/// This is used internally but not part of the public API. +#[derive(Clone)] +#[allow(missing_docs)] +pub enum CheckedRequest { + HeaderProof(HeaderProof, net_request::IncompleteHeaderProofRequest), + HeaderByHash(HeaderByHash, net_request::IncompleteHeadersRequest), + Receipts(BlockReceipts, net_request::IncompleteReceiptsRequest), + Body(Body, net_request::IncompleteBodyRequest), + Account(Account, net_request::IncompleteAccountRequest), + Code(Code, net_request::IncompleteCodeRequest), + Execution(TransactionProof, net_request::IncompleteExecutionRequest), +} + +impl From for CheckedRequest { + fn from(req: Request) -> Self { + match req { + Request::HeaderByHash(req) => { + let net_req = net_request::IncompleteHeadersRequest { + start: net_request::HashOrNumber::Hash(req.0).into(), + skip: 0, + max: 1, + reverse: false, + }; + CheckedRequest::HeaderByHash(req, net_req) + } + Request::HeaderProof(req) => { + let net_req = net_request::IncompleteHeaderProofRequest { + num: req.num().into(), + }; + CheckedRequest::HeaderProof(req, net_req) + } + Request::Body(req) => { + let net_req = net_request::IncompleteBodyRequest { + hash: req.hash.into(), + }; + CheckedRequest::Body(req, net_req) + } + Request::Receipts(req) => { + let net_req = net_request::IncompleteReceiptsRequest { + hash: req.0.hash().into(), + }; + CheckedRequest::Receipts(req, net_req) + } + Request::Account(req) => { + let net_req = net_request::IncompleteAccountRequest { + block_hash: req.header.hash().into(), + address_hash: ::util::Hashable::sha3(&req.address).into(), + }; + CheckedRequest::Account(req, net_req) + } + Request::Code(req) => { + let net_req = net_request::IncompleteCodeRequest { + block_hash: req.block_id.0.into(), + code_hash: req.code_hash.into(), + }; + CheckedRequest::Code(req, net_req) + } + Request::Execution(req) => { + let net_req = net_request::IncompleteExecutionRequest { + block_hash: req.header.hash().into(), + from: req.tx.sender(), + gas: req.tx.gas, + gas_price: req.tx.gas_price, + action: req.tx.action.clone(), + value: req.tx.value, + data: req.tx.data.clone(), + }; + CheckedRequest::Execution(req, net_req) + } + } + } +} + +impl CheckedRequest { + /// Convert this into a network request. + pub fn into_net_request(self) -> net_request::Request { + use ::request::Request as NetRequest; + + match self { + CheckedRequest::HeaderProof(_, req) => NetRequest::HeaderProof(req), + CheckedRequest::HeaderByHash(_, req) => NetRequest::Headers(req), + CheckedRequest::Receipts(_, req) => NetRequest::Receipts(req), + CheckedRequest::Body(_, req) => NetRequest::Body(req), + CheckedRequest::Account(_, req) => NetRequest::Account(req), + CheckedRequest::Code(_, req) => NetRequest::Code(req), + CheckedRequest::Execution(_, req) => NetRequest::Execution(req), + } + } +} + +macro_rules! match_me { + ($me: expr, ($check: pat, $req: pat) => $e: expr) => { + match $me { + CheckedRequest::HeaderProof($check, $req) => $e, + CheckedRequest::HeaderByHash($check, $req) => $e, + CheckedRequest::Receipts($check, $req) => $e, + CheckedRequest::Body($check, $req) => $e, + CheckedRequest::Account($check, $req) => $e, + CheckedRequest::Code($check, $req) => $e, + CheckedRequest::Execution($check, $req) => $e, + } + } +} + +impl IncompleteRequest for CheckedRequest { + type Complete = net_request::CompleteRequest; + type Response = net_request::Response; + + /// Check prior outputs against the needed inputs. + /// + /// This is called to ensure consistency of this request with + /// others in the same packet. + fn check_outputs(&self, f: F) -> Result<(), net_request::NoSuchOutput> + where F: FnMut(usize, usize, OutputKind) -> Result<(), net_request::NoSuchOutput> + { + match_me!(*self, (_, ref req) => req.check_outputs(f)) + } + + /// Note that this request will produce the following outputs. + fn note_outputs(&self, f: F) where F: FnMut(usize, OutputKind) { + match_me!(*self, (_, ref req) => req.note_outputs(f)) + } + + /// Fill fields of the request. + /// + /// This function is provided an "output oracle" which allows fetching of + /// prior request outputs. + /// Only outputs previously checked with `check_outputs` may be available. + fn fill(&mut self, f: F) where F: Fn(usize, usize) -> Result { + match_me!(*self, (_, ref mut req) => req.fill(f)) + } + + /// Will succeed if all fields have been filled, will fail otherwise. + fn complete(self) -> Result { + use ::request::CompleteRequest; + + match self { + CheckedRequest::HeaderProof(_, req) => req.complete().map(CompleteRequest::HeaderProof), + CheckedRequest::HeaderByHash(_, req) => req.complete().map(CompleteRequest::Headers), + CheckedRequest::Receipts(_, req) => req.complete().map(CompleteRequest::Receipts), + CheckedRequest::Body(_, req) => req.complete().map(CompleteRequest::Body), + CheckedRequest::Account(_, req) => req.complete().map(CompleteRequest::Account), + CheckedRequest::Code(_, req) => req.complete().map(CompleteRequest::Code), + CheckedRequest::Execution(_, req) => req.complete().map(CompleteRequest::Execution), + } + } + + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + match_me!(*self, (_, ref mut req) => req.adjust_refs(mapping)) + } +} + +impl net_request::CheckedRequest for CheckedRequest { + type Extract = Response; + type Error = Error; + type Environment = Mutex<::cache::Cache>; + + /// Check whether the response matches (beyond the type). + fn check_response(&self, cache: &Mutex<::cache::Cache>, response: &Self::Response) -> Result { + use ::request::Response as NetResponse; + + // helper for expecting a specific response for a given request. + macro_rules! expect { + ($res: pat => $e: expr) => { + match *response { + $res => $e, + _ => Err(Error::WrongKind), + } + } + } + + // check response against contained prover. + match *self { + CheckedRequest::HeaderProof(ref prover, _) => expect!(NetResponse::HeaderProof(ref res) => + prover.check_response(cache, &res.proof).map(Response::HeaderProof)), + CheckedRequest::HeaderByHash(ref prover, _) => expect!(NetResponse::Headers(ref res) => + prover.check_response(cache, &res.headers).map(Response::HeaderByHash)), + CheckedRequest::Receipts(ref prover, _) => expect!(NetResponse::Receipts(ref res) => + prover.check_response(cache, &res.receipts).map(Response::Receipts)), + CheckedRequest::Body(ref prover, _) => expect!(NetResponse::Body(ref res) => + prover.check_response(cache, &res.body).map(Response::Body)), + CheckedRequest::Account(ref prover, _) => expect!(NetResponse::Account(ref res) => + prover.check_response(cache, &res.proof).map(Response::Account)), + CheckedRequest::Code(ref prover, _) => expect!(NetResponse::Code(ref res) => + prover.check_response(cache, &res.code).map(Response::Code)), + CheckedRequest::Execution(ref prover, _) => expect!(NetResponse::Execution(ref res) => + prover.check_response(cache, &res.items).map(Response::Execution)), + } + } +} + +/// Responses to on-demand requests. +/// All of these are checked. +pub enum Response { + /// Response to a header proof request. + /// Returns the hash and chain score. + HeaderProof((H256, U256)), + /// Response to a header-by-hash request. + HeaderByHash(encoded::Header), + /// Response to a receipts request. + Receipts(Vec), + /// Response to a block body request. + Body(encoded::Block), + /// Response to an Account request. + // TODO: `unwrap_or(engine_defaults)` + Account(Option), + /// Response to a request for code. + Code(Vec), + /// Response to a request for proved execution. + Execution(super::ExecutionResult), +} + /// Errors in verification. #[derive(Debug, PartialEq)] pub enum Error { /// RLP decoder error. Decoder(::rlp::DecoderError), + /// Empty response. + Empty, /// Trie lookup error (result of bad proof) Trie(TrieError), /// Bad inclusion proof @@ -47,6 +404,8 @@ pub enum Error { WrongHash(H256, H256), /// Wrong trie root. WrongTrieRoot(H256, H256), + /// Wrong response kind. + WrongKind, } impl From<::rlp::DecoderError> for Error { @@ -93,9 +452,15 @@ impl HeaderProof { pub fn cht_root(&self) -> H256 { self.cht_root } /// Check a response with a CHT proof, get a hash and total difficulty back. - pub fn check_response(&self, proof: &[Bytes]) -> Result<(H256, U256), Error> { + pub fn check_response(&self, cache: &Mutex<::cache::Cache>, proof: &[Bytes]) -> Result<(H256, U256), Error> { match ::cht::check_proof(proof, self.num, self.cht_root) { - Some((expected_hash, td)) => Ok((expected_hash, td)), + Some((expected_hash, td)) => { + let mut cache = cache.lock(); + cache.insert_block_hash(self.num, expected_hash); + cache.insert_chain_score(expected_hash, td); + + Ok((expected_hash, td)) + } None => Err(Error::BadProof), } } @@ -107,10 +472,14 @@ pub struct HeaderByHash(pub H256); impl HeaderByHash { /// Check a response for the header. - pub fn check_response(&self, header: &encoded::Header) -> Result { + pub fn check_response(&self, cache: &Mutex<::cache::Cache>, headers: &[encoded::Header]) -> Result { + let header = headers.get(0).ok_or(Error::Empty)?; let hash = header.sha3(); match hash == self.0 { - true => Ok(header.clone()), + true => { + cache.lock().insert_block_header(hash, header.clone()); + Ok(header.clone()) + } false => Err(Error::WrongHash(self.0, hash)), } } @@ -136,7 +505,7 @@ impl Body { } /// Check a response for this block body. - pub fn check_response(&self, body: &encoded::Body) -> Result { + pub fn check_response(&self, cache: &Mutex<::cache::Cache>, body: &encoded::Body) -> Result { // check the integrity of the the body against the header let tx_root = ::util::triehash::ordered_trie_root(body.rlp().at(0).iter().map(|r| r.as_raw().to_vec())); if tx_root != self.header.transactions_root() { @@ -154,6 +523,8 @@ impl Body { stream.append_raw(body.rlp().at(0).as_raw(), 1); stream.append_raw(body.rlp().at(1).as_raw(), 1); + cache.lock().insert_block_body(self.hash, body.clone()); + Ok(encoded::Block::new(stream.out())) } } @@ -164,12 +535,15 @@ pub struct BlockReceipts(pub encoded::Header); impl BlockReceipts { /// Check a response with receipts against the stored header. - pub fn check_response(&self, receipts: &[Receipt]) -> Result, Error> { + pub fn check_response(&self, cache: &Mutex<::cache::Cache>, receipts: &[Receipt]) -> Result, Error> { let receipts_root = self.0.receipts_root(); let found_root = ::util::triehash::ordered_trie_root(receipts.iter().map(|r| ::rlp::encode(r).to_vec())); match receipts_root == found_root { - true => Ok(receipts.to_vec()), + true => { + cache.lock().insert_block_receipts(receipts_root, receipts.to_vec()); + Ok(receipts.to_vec()) + } false => Err(Error::WrongTrieRoot(receipts_root, found_root)), } } @@ -186,7 +560,7 @@ pub struct Account { impl Account { /// Check a response with an account against the stored header. - pub fn check_response(&self, proof: &[Bytes]) -> Result, Error> { + pub fn check_response(&self, _: &Mutex<::cache::Cache>, proof: &[Bytes]) -> Result, Error> { let state_root = self.header.state_root(); let mut db = MemoryDB::new(); @@ -208,6 +582,7 @@ impl Account { } /// Request for account code. +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Code { /// Block hash, number pair. pub block_id: (H256, u64), @@ -217,10 +592,10 @@ pub struct Code { impl Code { /// Check a response with code against the code hash. - pub fn check_response(&self, code: &[u8]) -> Result<(), Error> { + pub fn check_response(&self, _: &Mutex<::cache::Cache>, code: &[u8]) -> Result, Error> { let found_hash = code.sha3(); if found_hash == self.code_hash { - Ok(()) + Ok(code.to_vec()) } else { Err(Error::WrongHash(self.code_hash, found_hash)) } @@ -228,6 +603,7 @@ impl Code { } /// Request for transaction execution, along with the parts necessary to verify the proof. +#[derive(Clone)] pub struct TransactionProof { /// The transaction to request proof of. pub tx: SignedTransaction, @@ -241,25 +617,32 @@ pub struct TransactionProof { impl TransactionProof { /// Check the proof, returning the proved execution or indicate that the proof was bad. - pub fn check_response(&self, state_items: &[DBValue]) -> ProvedExecution { + pub fn check_response(&self, _: &Mutex<::cache::Cache>, state_items: &[DBValue]) -> Result { let root = self.header.state_root(); let mut env_info = self.env_info.clone(); env_info.gas_limit = self.tx.gas.clone(); - state::check_proof( + + let proved_execution = state::check_proof( state_items, root, &self.tx, &*self.engine, - &env_info, - ) + &self.env_info, + ); + + match proved_execution { + ProvedExecution::BadProof => Err(Error::BadProof), + ProvedExecution::Failed(e) => Ok(Err(e)), + ProvedExecution::Complete(e) => Ok(Ok(e)), + } } } #[cfg(test)] mod tests { use super::*; - use util::{MemoryDB, Address, H256}; + use util::{MemoryDB, Address, Mutex, H256}; use util::trie::{Trie, TrieMut, SecTrieDB, SecTrieDBMut}; use util::trie::recorder::Recorder; @@ -268,6 +651,10 @@ mod tests { use ethcore::encoded; use ethcore::receipt::Receipt; + fn make_cache() -> ::cache::Cache { + ::cache::Cache::new(Default::default(), ::time::Duration::seconds(1)) + } + #[test] fn no_invalid_header_by_number() { assert!(HeaderProof::new(0, Default::default()).is_none()) @@ -297,7 +684,8 @@ mod tests { let proof = cht.prove(10_000, 0).unwrap().unwrap(); let req = HeaderProof::new(10_000, cht.root()).unwrap(); - assert!(req.check_response(&proof[..]).is_ok()); + let cache = Mutex::new(make_cache()); + assert!(req.check_response(&cache, &proof[..]).is_ok()); } #[test] @@ -308,7 +696,8 @@ mod tests { let hash = header.hash(); let raw_header = encoded::Header::new(::rlp::encode(&header).to_vec()); - assert!(HeaderByHash(hash).check_response(&raw_header).is_ok()) + let cache = Mutex::new(make_cache()); + assert!(HeaderByHash(hash).check_response(&cache, &[raw_header]).is_ok()) } #[test] @@ -324,8 +713,9 @@ mod tests { hash: header.hash(), }; + let cache = Mutex::new(make_cache()); let response = encoded::Body::new(body_stream.drain().to_vec()); - assert!(req.check_response(&response).is_ok()) + assert!(req.check_response(&cache, &response).is_ok()) } #[test] @@ -346,7 +736,8 @@ mod tests { let req = BlockReceipts(encoded::Header::new(::rlp::encode(&header).to_vec())); - assert!(req.check_response(&receipts).is_ok()) + let cache = Mutex::new(make_cache()); + assert!(req.check_response(&cache, &receipts).is_ok()) } #[test] @@ -395,7 +786,8 @@ mod tests { address: addr, }; - assert!(req.check_response(&proof[..]).is_ok()); + let cache = Mutex::new(make_cache()); + assert!(req.check_response(&cache, &proof[..]).is_ok()); } #[test] @@ -406,7 +798,8 @@ mod tests { code_hash: ::util::Hashable::sha3(&code), }; - assert!(req.check_response(&code).is_ok()); - assert!(req.check_response(&[]).is_err()); + let cache = Mutex::new(make_cache()); + assert!(req.check_response(&cache, &code).is_ok()); + assert!(req.check_response(&cache, &[]).is_err()); } } diff --git a/ethcore/light/src/on_demand/tests.rs b/ethcore/light/src/on_demand/tests.rs new file mode 100644 index 00000000000..d5789c5e184 --- /dev/null +++ b/ethcore/light/src/on_demand/tests.rs @@ -0,0 +1,397 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Tests for the on-demand service. + +use cache::Cache; +use ethcore::encoded; +use ethcore::header::{Header, Seal}; +use futures::Future; +use network::{PeerId, NodeId}; +use net::*; +use util::{H256, Mutex}; +use time::Duration; +use ::request::{self as basic_request, Response}; + +use std::sync::Arc; + +use super::{request, OnDemand, Peer}; + +// useful contexts to give the service. +enum Context { + NoOp, + WithPeer(PeerId), + RequestFrom(PeerId, ReqId), + Punish(PeerId), +} + +impl EventContext for Context { + fn peer(&self) -> PeerId { + match *self { + Context::WithPeer(id) + | Context::RequestFrom(id, _) + | Context::Punish(id) => id, + _ => panic!("didn't expect to have peer queried."), + } + } + + fn as_basic(&self) -> &BasicContext { self } +} + +impl BasicContext for Context { + /// Returns the relevant's peer persistent Id (aka NodeId). + fn persistent_peer_id(&self, _: PeerId) -> Option { + panic!("didn't expect to provide persistent ID") + } + + fn request_from(&self, peer_id: PeerId, _: ::request::NetworkRequests) -> Result { + match *self { + Context::RequestFrom(id, req_id) => if peer_id == id { Ok(req_id) } else { Err(Error::NoCredits) }, + _ => panic!("didn't expect to have requests dispatched."), + } + } + + fn make_announcement(&self, _: Announcement) { + panic!("didn't expect to make announcement") + } + + fn disconnect_peer(&self, id: PeerId) { + self.disable_peer(id) + } + + fn disable_peer(&self, peer_id: PeerId) { + match *self { + Context::Punish(id) if id == peer_id => {}, + _ => panic!("Unexpectedly punished peer."), + } + } +} + +// test harness. +struct Harness { + service: OnDemand, +} + +impl Harness { + fn create() -> Self { + let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::minutes(1)))); + Harness { + service: OnDemand::new_test(cache), + } + } + + fn inject_peer(&self, id: PeerId, peer: Peer) { + self.service.peers.write().insert(id, peer); + } +} + +fn dummy_status() -> Status { + Status { + protocol_version: 1, + network_id: 999, + head_td: 1.into(), + head_hash: H256::default(), + head_num: 1359, + genesis_hash: H256::default(), + last_head: None, + } +} + +fn dummy_capabilities() -> Capabilities { + Capabilities { + serve_headers: true, + serve_chain_since: Some(1), + serve_state_since: Some(1), + tx_relay: true, + } +} + +#[test] +fn detects_hangup() { + let on_demand = Harness::create().service; + let result = on_demand.header_by_hash(&Context::NoOp, request::HeaderByHash(H256::default())); + + assert_eq!(on_demand.pending.read().len(), 1); + drop(result); + + on_demand.dispatch_pending(&Context::NoOp); + assert!(on_demand.pending.read().is_empty()); +} + +#[test] +fn single_request() { + let harness = Harness::create(); + + let peer_id = 10101; + let req_id = ReqId(14426); + + harness.inject_peer(peer_id, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + let header = Header::default(); + let encoded = encoded::Header::new(header.rlp(Seal::With)); + + let recv = harness.service.request_raw( + &Context::NoOp, + vec![request::HeaderByHash(header.hash()).into()] + ).unwrap(); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id)); + + assert_eq!(harness.service.pending.read().len(), 0); + + harness.service.on_responses( + &Context::WithPeer(peer_id), + req_id, + &[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] })] + ); + + assert!(recv.wait().is_ok()); +} + +#[test] +fn no_capabilities() { + let harness = Harness::create(); + + let peer_id = 10101; + + let mut capabilities = dummy_capabilities(); + capabilities.serve_headers = false; + + harness.inject_peer(peer_id, Peer { + status: dummy_status(), + capabilities: capabilities, + }); + + let _recv = harness.service.request_raw( + &Context::NoOp, + vec![request::HeaderByHash(Default::default()).into()] + ).unwrap(); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::NoOp); + + assert_eq!(harness.service.pending.read().len(), 1); +} + +#[test] +fn reassign() { + let harness = Harness::create(); + + let peer_ids = (10101, 12345); + let req_ids = (ReqId(14426), ReqId(555)); + + harness.inject_peer(peer_ids.0, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + let header = Header::default(); + let encoded = encoded::Header::new(header.rlp(Seal::With)); + + let recv = harness.service.request_raw( + &Context::NoOp, + vec![request::HeaderByHash(header.hash()).into()] + ).unwrap(); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_ids.0, req_ids.0)); + assert_eq!(harness.service.pending.read().len(), 0); + + harness.service.on_disconnect(&Context::WithPeer(peer_ids.0), &[req_ids.0]); + assert_eq!(harness.service.pending.read().len(), 1); + + harness.inject_peer(peer_ids.1, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_ids.1, req_ids.1)); + assert_eq!(harness.service.pending.read().len(), 0); + + harness.service.on_responses( + &Context::WithPeer(peer_ids.1), + req_ids.1, + &[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] })] + ); + + assert!(recv.wait().is_ok()); +} + +#[test] +fn partial_response() { + let harness = Harness::create(); + + let peer_id = 111; + let req_ids = (ReqId(14426), ReqId(555)); + + harness.inject_peer(peer_id, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + let make = |num| { + let mut hdr = Header::default(); + hdr.set_number(num); + + let encoded = encoded::Header::new(hdr.rlp(Seal::With)); + (hdr, encoded) + }; + + let (header1, encoded1) = make(5); + let (header2, encoded2) = make(23452); + + // request two headers. + let recv = harness.service.request_raw( + &Context::NoOp, + vec![ + request::HeaderByHash(header1.hash()).into(), + request::HeaderByHash(header2.hash()).into(), + ], + ).unwrap(); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_ids.0)); + assert_eq!(harness.service.pending.read().len(), 0); + + // supply only the first one. + harness.service.on_responses( + &Context::WithPeer(peer_id), + req_ids.0, + &[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded1] })] + ); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_ids.1)); + assert_eq!(harness.service.pending.read().len(), 0); + + // supply the next one. + harness.service.on_responses( + &Context::WithPeer(peer_id), + req_ids.1, + &[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded2] })] + ); + + assert!(recv.wait().is_ok()); +} + +#[test] +fn part_bad_part_good() { + let harness = Harness::create(); + + let peer_id = 111; + let req_ids = (ReqId(14426), ReqId(555)); + + harness.inject_peer(peer_id, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + let make = |num| { + let mut hdr = Header::default(); + hdr.set_number(num); + + let encoded = encoded::Header::new(hdr.rlp(Seal::With)); + (hdr, encoded) + }; + + let (header1, encoded1) = make(5); + let (header2, encoded2) = make(23452); + + // request two headers. + let recv = harness.service.request_raw( + &Context::NoOp, + vec![ + request::HeaderByHash(header1.hash()).into(), + request::HeaderByHash(header2.hash()).into(), + ], + ).unwrap(); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_ids.0)); + assert_eq!(harness.service.pending.read().len(), 0); + + // supply only the first one, but followed by the wrong kind of response. + // the first header should be processed. + harness.service.on_responses( + &Context::Punish(peer_id), + req_ids.0, + &[ + Response::Headers(basic_request::HeadersResponse { headers: vec![encoded1] }), + Response::Receipts(basic_request::ReceiptsResponse { receipts: vec![] } ), + ] + ); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.inject_peer(peer_id, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_ids.1)); + assert_eq!(harness.service.pending.read().len(), 0); + + // supply the next one. + harness.service.on_responses( + &Context::WithPeer(peer_id), + req_ids.1, + &[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded2] })] + ); + + assert!(recv.wait().is_ok()); +} + +#[test] +fn wrong_kind() { + let harness = Harness::create(); + + let peer_id = 10101; + let req_id = ReqId(14426); + + harness.inject_peer(peer_id, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + let _recv = harness.service.request_raw( + &Context::NoOp, + vec![request::HeaderByHash(Default::default()).into()] + ).unwrap(); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id)); + + assert_eq!(harness.service.pending.read().len(), 0); + + harness.service.on_responses( + &Context::Punish(peer_id), + req_id, + &[Response::Receipts(basic_request::ReceiptsResponse { receipts: vec![] })] + ); + + assert_eq!(harness.service.pending.read().len(), 1); +} diff --git a/ethcore/light/src/transaction_queue.rs b/ethcore/light/src/transaction_queue.rs index d17a863f511..7ce2bd53dd8 100644 --- a/ethcore/light/src/transaction_queue.rs +++ b/ethcore/light/src/transaction_queue.rs @@ -131,7 +131,7 @@ impl TransactionQueue { if self.by_hash.contains_key(&hash) { return Err(TransactionError::AlreadyImported) } - let res = match self.by_account.entry(sender) { + let res = match self.by_account.entry(sender) { Entry::Vacant(entry) => { entry.insert(AccountTransactions { cur_nonce: CurrentNonce::Assumed(nonce), diff --git a/ethcore/light/src/types/request/builder.rs b/ethcore/light/src/types/request/builder.rs index 77f1389c2bb..dff33513aa1 100644 --- a/ethcore/light/src/types/request/builder.rs +++ b/ethcore/light/src/types/request/builder.rs @@ -20,22 +20,30 @@ use std::collections::HashMap; use request::{ - IncompleteRequest, CompleteRequest, Request, - OutputKind, Output, NoSuchOutput, Response, ResponseError, + IncompleteRequest, OutputKind, Output, NoSuchOutput, ResponseError, ResponseLike, }; /// Build chained requests. Push them onto the series with `push`, /// and produce a `Requests` object with `build`. Outputs are checked for consistency. -#[derive(Debug, Default, Clone, PartialEq, Eq)] -pub struct RequestBuilder { +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RequestBuilder { output_kinds: HashMap<(usize, usize), OutputKind>, - requests: Vec, + requests: Vec, +} + +impl Default for RequestBuilder { + fn default() -> Self { + RequestBuilder { + output_kinds: HashMap::new(), + requests: Vec::new(), + } + } } -impl RequestBuilder { +impl RequestBuilder { /// Attempt to push a request onto the request chain. Fails if the request /// references a non-existent output of a prior request. - pub fn push(&mut self, request: Request) -> Result<(), NoSuchOutput> { + pub fn push(&mut self, request: T) -> Result<(), NoSuchOutput> { request.check_outputs(|req, idx, kind| { match self.output_kinds.get(&(req, idx)) { Some(k) if k == &kind => Ok(()), @@ -54,7 +62,7 @@ impl RequestBuilder { } /// Convert this into a "requests" object. - pub fn build(self) -> Requests { + pub fn build(self) -> Requests { Requests { outputs: HashMap::new(), requests: self.requests, @@ -65,44 +73,41 @@ impl RequestBuilder { /// Requests pending responses. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct Requests { +pub struct Requests { outputs: HashMap<(usize, usize), Output>, - requests: Vec, + requests: Vec, answered: usize, } -impl Requests { - /// For each request, produce responses for each. - /// The responses vector produced goes up to the point where the responder - /// first returns `None`, an invalid response, or until all requests have been responded to. - pub fn respond_to_all(mut self, responder: F) -> Vec - where F: Fn(CompleteRequest) -> Option - { - let mut responses = Vec::new(); - - while let Some(response) = self.next_complete().and_then(&responder) { - match self.supply_response(&response) { - Ok(()) => responses.push(response), - Err(e) => { - debug!(target: "pip", "produced bad response to request: {:?}", e); - return responses; - } - } - } - - responses - } - +impl Requests { /// Get access to the underlying slice of requests. // TODO: unimplemented -> Vec, // do we _have to_ allocate? - pub fn requests(&self) -> &[Request] { &self.requests } + pub fn requests(&self) -> &[T] { &self.requests } /// Get the number of answered requests. pub fn num_answered(&self) -> usize { self.answered } + /// Whether the batch is complete. + pub fn is_complete(&self) -> bool { + self.answered == self.requests.len() + } + + /// Map requests from one type into another. + pub fn map_requests(self, f: F) -> Requests + where F: FnMut(T) -> U, U: IncompleteRequest + { + Requests { + outputs: self.outputs, + requests: self.requests.into_iter().map(f).collect(), + answered: self.answered, + } + } +} + +impl Requests { /// Get the next request as a filled request. Returns `None` when all requests answered. - pub fn next_complete(&self) -> Option { - if self.answered == self.requests.len() { + pub fn next_complete(&self) -> Option { + if self.is_complete() { None } else { Some(self.requests[self.answered].clone() @@ -111,14 +116,29 @@ impl Requests { } } + /// Sweep through all unanswered requests, filling them as necessary. + pub fn fill_unanswered(&mut self) { + let outputs = &mut self.outputs; + + for req in self.requests.iter_mut().skip(self.answered) { + req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput)) + } + } +} + +impl Requests { /// Supply a response for the next request. /// Fails on: wrong request kind, all requests answered already. - pub fn supply_response(&mut self, response: &Response) -> Result<(), ResponseError> { + pub fn supply_response(&mut self, env: &T::Environment, response: &T::Response) + -> Result> + { let idx = self.answered; // check validity. - if idx == self.requests.len() { return Err(ResponseError::Unexpected) } - if self.requests[idx].kind() != response.kind() { return Err(ResponseError::WrongKind) } + if self.is_complete() { return Err(ResponseError::Unexpected) } + + let extracted = self.requests[idx] + .check_response(env, response).map_err(ResponseError::Validity)?; let outputs = &mut self.outputs; response.fill_outputs(|out_idx, output| { @@ -135,7 +155,30 @@ impl Requests { req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput)) } - Ok(()) + Ok(extracted) + } +} + +impl Requests { + /// For each request, produce a response. + /// The responses vector produced goes up to the point where the responder + /// first returns `None`, an invalid response, or until all requests have been responded to. + pub fn respond_to_all(mut self, responder: F) -> Vec + where F: Fn(super::CompleteRequest) -> Option + { + let mut responses = Vec::new(); + + while let Some(response) = self.next_complete().and_then(&responder) { + match self.supply_response(&(), &response) { + Ok(()) => responses.push(response), + Err(e) => { + debug!(target: "pip", "produced bad response to request: {:?}", e); + return responses; + } + } + } + + responses } } diff --git a/ethcore/light/src/types/request/mod.rs b/ethcore/light/src/types/request/mod.rs index 3099f8fedcc..3dd2db6290d 100644 --- a/ethcore/light/src/types/request/mod.rs +++ b/ethcore/light/src/types/request/mod.rs @@ -69,11 +69,15 @@ pub use self::builder::{RequestBuilder, Requests}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct NoSuchOutput; +/// Wrong kind of response corresponding to request. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct WrongKind; + /// Error on processing a response. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum ResponseError { - /// Wrong kind of response. - WrongKind, +pub enum ResponseError { + /// Error in validity. + Validity(T), /// No responses expected. Unexpected, } @@ -96,6 +100,12 @@ impl Field { _ => Err(NoSuchOutput), } } + + fn adjust_req(&mut self, mut mapping: F) where F: FnMut(usize) -> usize { + if let Field::BackReference(ref mut req_idx, _) = *self { + *req_idx = mapping(*req_idx) + } + } } impl From for Field { @@ -197,6 +207,9 @@ impl Encodable for HashOrNumber { } } +/// Type alias for "network requests". +pub type NetworkRequests = Requests; + /// All request types, as they're sent over the network. /// They may be incomplete, with back-references to outputs /// of prior requests. @@ -296,6 +309,7 @@ impl Encodable for Request { impl IncompleteRequest for Request { type Complete = CompleteRequest; + type Response = Response; fn check_outputs(&self, f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -350,6 +364,33 @@ impl IncompleteRequest for Request { Request::Execution(req) => req.complete().map(CompleteRequest::Execution), } } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + match *self { + Request::Headers(ref mut req) => req.adjust_refs(mapping), + Request::HeaderProof(ref mut req) => req.adjust_refs(mapping), + Request::Receipts(ref mut req) => req.adjust_refs(mapping), + Request::Body(ref mut req) => req.adjust_refs(mapping), + Request::Account(ref mut req) => req.adjust_refs(mapping), + Request::Storage(ref mut req) => req.adjust_refs(mapping), + Request::Code(ref mut req) => req.adjust_refs(mapping), + Request::Execution(ref mut req) => req.adjust_refs(mapping), + } + } +} + +impl CheckedRequest for Request { + type Extract = (); + type Error = WrongKind; + type Environment = (); + + fn check_response(&self, _: &(), response: &Response) -> Result<(), WrongKind> { + if self.kind() == response.kind() { + Ok(()) + } else { + Err(WrongKind) + } + } } /// Kinds of requests. @@ -421,9 +462,9 @@ pub enum Response { Execution(ExecutionResponse), } -impl Response { +impl ResponseLike for Response { /// Fill reusable outputs by writing them into the function. - pub fn fill_outputs(&self, f: F) where F: FnMut(usize, Output) { + fn fill_outputs(&self, f: F) where F: FnMut(usize, Output) { match *self { Response::Headers(ref res) => res.fill_outputs(f), Response::HeaderProof(ref res) => res.fill_outputs(f), @@ -435,7 +476,9 @@ impl Response { Response::Execution(ref res) => res.fill_outputs(f), } } +} +impl Response { /// Inspect the kind of this response. pub fn kind(&self) -> Kind { match *self { @@ -490,6 +533,8 @@ impl Encodable for Response { pub trait IncompleteRequest: Sized { /// The complete variant of this request. type Complete; + /// The response to this request. + type Response: ResponseLike; /// Check prior outputs against the needed inputs. /// @@ -511,6 +556,30 @@ pub trait IncompleteRequest: Sized { /// Attempt to convert this request into its complete variant. /// Will succeed if all fields have been filled, will fail otherwise. fn complete(self) -> Result; + + /// Adjust back-reference request indices. + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize; +} + +/// A request which can be checked against its response for more validity. +pub trait CheckedRequest: IncompleteRequest { + /// Data extracted during the check. + type Extract; + /// Error encountered during the check. + type Error; + /// Environment passed to response check. + type Environment; + + /// Check whether the response matches (beyond the type). + fn check_response(&self, &Self::Environment, &Self::Response) -> Result; +} + +/// A response-like object. +/// +/// These contain re-usable outputs. +pub trait ResponseLike { + /// Write all re-usable outputs into the provided function. + fn fill_outputs(&self, output_store: F) where F: FnMut(usize, Output); } /// Header request. @@ -555,6 +624,7 @@ pub mod header { impl super::IncompleteRequest for Incomplete { type Complete = Complete; + type Response = Response; fn check_outputs(&self, mut f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -586,6 +656,10 @@ pub mod header { reverse: self.reverse, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.start.adjust_req(mapping) + } } /// A complete header request. @@ -608,9 +682,9 @@ pub mod header { pub headers: Vec, } - impl Response { + impl super::ResponseLike for Response { /// Fill reusable outputs by writing them into the function. - pub fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) { } + fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) { } } impl Decodable for Response { @@ -671,6 +745,7 @@ pub mod header_proof { impl super::IncompleteRequest for Incomplete { type Complete = Complete; + type Response = Response; fn check_outputs(&self, mut f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -699,6 +774,10 @@ pub mod header_proof { num: self.num.into_scalar()?, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.num.adjust_req(mapping) + } } /// A complete header proof request. @@ -719,9 +798,9 @@ pub mod header_proof { pub td: U256, } - impl Response { + impl super::ResponseLike for Response { /// Fill reusable outputs by providing them to the function. - pub fn fill_outputs(&self, mut f: F) where F: FnMut(usize, Output) { + fn fill_outputs(&self, mut f: F) where F: FnMut(usize, Output) { f(0, Output::Hash(self.hash)); } } @@ -776,6 +855,7 @@ pub mod block_receipts { impl super::IncompleteRequest for Incomplete { type Complete = Complete; + type Response = Response; fn check_outputs(&self, mut f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -802,6 +882,10 @@ pub mod block_receipts { hash: self.hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.hash.adjust_req(mapping) + } } /// A complete block receipts request. @@ -818,9 +902,9 @@ pub mod block_receipts { pub receipts: Vec } - impl Response { + impl super::ResponseLike for Response { /// Fill reusable outputs by providing them to the function. - pub fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) {} + fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) {} } impl Decodable for Response { @@ -868,6 +952,7 @@ pub mod block_body { impl super::IncompleteRequest for Incomplete { type Complete = Complete; + type Response = Response; fn check_outputs(&self, mut f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -894,6 +979,10 @@ pub mod block_body { hash: self.hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.hash.adjust_req(mapping) + } } /// A complete block body request. @@ -910,9 +999,9 @@ pub mod block_body { pub body: encoded::Body, } - impl Response { + impl super::ResponseLike for Response { /// Fill reusable outputs by providing them to the function. - pub fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) {} + fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) {} } impl Decodable for Response { @@ -971,6 +1060,7 @@ pub mod account { impl super::IncompleteRequest for Incomplete { type Complete = Complete; + type Response = Response; fn check_outputs(&self, mut f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -1013,6 +1103,11 @@ pub mod account { address_hash: self.address_hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mut mapping: F) where F: FnMut(usize) -> usize { + self.block_hash.adjust_req(&mut mapping); + self.address_hash.adjust_req(&mut mapping); + } } /// A complete request for an account. @@ -1039,9 +1134,9 @@ pub mod account { pub storage_root: H256, } - impl Response { + impl super::ResponseLike for Response { /// Fill reusable outputs by providing them to the function. - pub fn fill_outputs(&self, mut f: F) where F: FnMut(usize, Output) { + fn fill_outputs(&self, mut f: F) where F: FnMut(usize, Output) { f(0, Output::Hash(self.code_hash)); f(1, Output::Hash(self.storage_root)); } @@ -1109,6 +1204,7 @@ pub mod storage { impl super::IncompleteRequest for Incomplete { type Complete = Complete; + type Response = Response; fn check_outputs(&self, mut f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -1162,6 +1258,12 @@ pub mod storage { key_hash: self.key_hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mut mapping: F) where F: FnMut(usize) -> usize { + self.block_hash.adjust_req(&mut mapping); + self.address_hash.adjust_req(&mut mapping); + self.key_hash.adjust_req(&mut mapping); + } } /// A complete request for a storage proof. @@ -1184,9 +1286,9 @@ pub mod storage { pub value: H256, } - impl Response { + impl super::ResponseLike for Response { /// Fill reusable outputs by providing them to the function. - pub fn fill_outputs(&self, mut f: F) where F: FnMut(usize, Output) { + fn fill_outputs(&self, mut f: F) where F: FnMut(usize, Output) { f(0, Output::Hash(self.value)); } } @@ -1243,6 +1345,7 @@ pub mod contract_code { impl super::IncompleteRequest for Incomplete { type Complete = Complete; + type Response = Response; fn check_outputs(&self, mut f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -1281,6 +1384,11 @@ pub mod contract_code { code_hash: self.code_hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mut mapping: F) where F: FnMut(usize) -> usize { + self.block_hash.adjust_req(&mut mapping); + self.code_hash.adjust_req(&mut mapping); + } } /// A complete request. @@ -1299,9 +1407,9 @@ pub mod contract_code { pub code: Bytes, } - impl Response { + impl super::ResponseLike for Response { /// Fill reusable outputs by providing them to the function. - pub fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) {} + fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) {} } impl Decodable for Response { @@ -1380,6 +1488,7 @@ pub mod execution { impl super::IncompleteRequest for Incomplete { type Complete = Complete; + type Response = Response; fn check_outputs(&self, mut f: F) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> @@ -1412,6 +1521,10 @@ pub mod execution { data: self.data, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.block_hash.adjust_req(mapping); + } } /// A complete request. @@ -1440,9 +1553,9 @@ pub mod execution { pub items: Vec, } - impl Response { + impl super::ResponseLike for Response { /// Fill reusable outputs by providing them to the function. - pub fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) {} + fn fill_outputs(&self, _: F) where F: FnMut(usize, Output) {} } impl Decodable for Response { diff --git a/ethcore/src/header.rs b/ethcore/src/header.rs index 4517c576437..e8b1fc43090 100644 --- a/ethcore/src/header.rs +++ b/ethcore/src/header.rs @@ -17,12 +17,14 @@ //! Block header. use util::*; -use basic_types::{LogBloom, Seal, ZERO_LOGBLOOM}; +use basic_types::{LogBloom, ZERO_LOGBLOOM}; use time::get_time; use rlp::*; use std::cell::RefCell; +pub use basic_types::Seal; + /// Type for Block number pub type BlockNumber = u64; diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs index 548ee33cdf0..c643daa2aa2 100644 --- a/parity/light_helpers/queue_cull.rs +++ b/parity/light_helpers/queue_cull.rs @@ -67,6 +67,7 @@ impl IoHandler for QueueCull { let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); let best_header = self.client.best_block_header(); + let start_nonce = self.client.engine().account_start_nonce(); info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); self.remote.spawn_with_timeout(move || { @@ -74,7 +75,10 @@ impl IoHandler for QueueCull { // fetch the nonce of each sender in the queue. let nonce_futures = senders.iter() .map(|&address| request::Account { header: best_header.clone(), address: address }) - .map(|request| on_demand.account(ctx, request).map(|acc| acc.nonce)) + .map(move |request| { + on_demand.account(ctx, request) + .map(move |maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) + }) .zip(senders.iter()) .map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce))); diff --git a/parity/run.rs b/parity/run.rs index 018360f1b92..6f0987a7e14 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -228,8 +228,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> } // start on_demand service. - let account_start_nonce = service.client().engine().account_start_nonce(); - let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone(), account_start_nonce)); + let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone())); // set network path. net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned()); diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index 877e4c24b11..68fff78b731 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -281,6 +281,7 @@ impl LightDispatcher { } let best_header = self.client.best_block_header(); + let account_start_nonce = self.client.engine().account_start_nonce(); let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account { header: best_header, address: addr, @@ -288,7 +289,7 @@ impl LightDispatcher { match nonce_future { Some(x) => - x.map(|acc| acc.nonce) + x.map(move |acc| acc.map_or(account_start_nonce, |acc| acc.nonce)) .map_err(|_| errors::no_light_peers()) .boxed(), None => future::err(errors::network_disabled()).boxed() diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 7cf5c3dc959..7132106cbcc 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -57,16 +57,16 @@ pub type ExecutionResult = Result; impl LightFetch { /// Get a block header from the on demand service or client, or error. - pub fn header(&self, id: BlockId) -> BoxFuture, Error> { + pub fn header(&self, id: BlockId) -> BoxFuture { if let Some(h) = self.client.block_header(id) { - return future::ok(Some(h)).boxed() + return future::ok(h).boxed() } let maybe_future = match id { BlockId::Number(n) => { let cht_root = cht::block_to_cht_number(n).and_then(|cn| self.client.cht_root(cn as usize)); match cht_root { - None => return future::ok(None).boxed(), + None => return future::err(errors::unknown_block()).boxed(), Some(root) => { let req = request::HeaderProof::new(n, root) .expect("only fails for 0; client always stores genesis; client already queried; qed"); @@ -82,7 +82,7 @@ impl LightFetch { Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(), None => future::err(errors::network_disabled()).boxed(), } - }).map(Some).boxed() + }).boxed() }) } } @@ -91,7 +91,7 @@ impl LightFetch { self.sync.with_context(|ctx| self.on_demand.header_by_hash(ctx, request::HeaderByHash(h)) .then(|res| future::done(match res { - Ok(h) => Ok(Some(h)), + Ok(h) => Ok(h), Err(e) => Err(errors::on_demand_cancel(e)), })) .boxed() @@ -106,22 +106,21 @@ impl LightFetch { } } - // Get account info at a given block. `None` signifies no such account existing. + /// helper for getting account info at a given block. + /// `None` indicates the account doesn't exist at the given block. pub fn account(&self, address: Address, id: BlockId) -> BoxFuture, Error> { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.header(id).and_then(move |header| { - let header = match header { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - - sync.with_context(|ctx| on_demand.account(ctx, request::Account { + let maybe_fut = sync.with_context(|ctx| on_demand.account(ctx, request::Account { header: header, address: address, - }).map(Some)) - .map(|x| x.map_err(errors::on_demand_cancel).boxed()) - .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) + })); + + match maybe_fut { + Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(), + None => future::err(errors::network_disabled()).boxed(), + } }).boxed() } @@ -176,10 +175,11 @@ impl LightFetch { }).join(header_fut).and_then(move |(tx, hdr)| { // then request proved execution. // TODO: get last-hashes from network. - let (env_info, hdr) = match (client.env_info(id), hdr) { - (Some(env_info), Some(hdr)) => (env_info, hdr), + let env_info = match client.env_info(id) { + Some(env_info) => env_info, _ => return future::err(errors::unknown_block()).boxed(), }; + let request = request::TransactionProof { tx: tx, header: hdr, @@ -198,18 +198,13 @@ impl LightFetch { }).boxed() } - /// Get a block. - pub fn block(&self, id: BlockId) -> BoxFuture, Error> { + /// get a block itself. fails on unknown block ID. + pub fn block(&self, id: BlockId) -> BoxFuture { let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone()); - self.header(id).and_then(move |hdr| { - let req = match hdr { - Some(hdr) => request::Body::new(hdr), - None => return future::ok(None).boxed(), - }; - + self.header(id).map(request::Body::new).and_then(move |req| { match sync.with_context(move |ctx| on_demand.block(ctx, req)) { - Some(fut) => fut.map_err(errors::on_demand_cancel).map(Some).boxed(), + Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(), None => future::err(errors::network_disabled()).boxed(), } }).boxed() diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index e45397f7db0..e02ccc987e9 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -115,12 +115,11 @@ impl EthClient { on_demand: self.on_demand.clone(), sync: self.sync.clone(), cache: self.cache.clone(), - } } - // get a "rich" block structure - fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture, Error> { + // get a "rich" block structure. Fails on unknown block. + fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture { let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone()); let (client, engine) = (self.client.clone(), self.client.engine().clone()); let eip86_transition = self.client.eip86_transition(); @@ -160,49 +159,45 @@ impl EthClient { }; // get the block itself. - self.fetcher().block(id).and_then(move |block| match block { - None => return future::ok(None).boxed(), - Some(block) => { - // then fetch the total difficulty (this is much easier after getting the block). - match client.score(id) { - Some(score) => future::ok(fill_rich(block, Some(score))).map(Some).boxed(), - None => { - // make a CHT request to fetch the chain score. - let req = cht::block_to_cht_number(block.number()) - .and_then(|num| client.cht_root(num as usize)) - .and_then(|root| request::HeaderProof::new(block.number(), root)); - - - let req = match req { - Some(req) => req, - None => { - // somehow the genesis block slipped past other checks. - // return it now. - let score = client.block_header(BlockId::Number(0)) - .expect("genesis always stored; qed") - .difficulty(); - - return future::ok(fill_rich(block, Some(score))).map(Some).boxed() - } - }; - - // three possible outcomes: - // - network is down. - // - we get a score, but our hash is non-canonical. - // - we get ascore, and our hash is canonical. - let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req)); - match maybe_fut { - Some(fut) => fut.map(move |(hash, score)| { - let score = if hash == block.hash() { - Some(score) - } else { - None - }; - - Some(fill_rich(block, score)) - }).map_err(errors::on_demand_cancel).boxed(), - None => return future::err(errors::network_disabled()).boxed(), + self.fetcher().block(id).and_then(move |block| { + // then fetch the total difficulty (this is much easier after getting the block). + match client.score(id) { + Some(score) => future::ok(fill_rich(block, Some(score))).boxed(), + None => { + // make a CHT request to fetch the chain score. + let req = cht::block_to_cht_number(block.number()) + .and_then(|num| client.cht_root(num as usize)) + .and_then(|root| request::HeaderProof::new(block.number(), root)); + + let req = match req { + Some(req) => req, + None => { + // somehow the genesis block slipped past other checks. + // return it now. + let score = client.block_header(BlockId::Number(0)) + .expect("genesis always stored; qed") + .difficulty(); + + return future::ok(fill_rich(block, Some(score))).boxed() } + }; + + // three possible outcomes: + // - network is down. + // - we get a score, but our hash is non-canonical. + // - we get a score, and our hash is canonical. + let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req)); + match maybe_fut { + Some(fut) => fut.map(move |(hash, score)| { + let score = if hash == block.hash() { + Some(score) + } else { + None + }; + + fill_rich(block, score) + }).map_err(errors::on_demand_cancel).boxed(), + None => return future::err(errors::network_disabled()).boxed(), } } } @@ -281,11 +276,11 @@ impl Eth for EthClient { } fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture, Error> { - self.rich_block(BlockId::Hash(hash.into()), include_txs) + self.rich_block(BlockId::Hash(hash.into()), include_txs).map(Some).boxed() } fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture, Error> { - self.rich_block(num.into(), include_txs) + self.rich_block(num.into(), include_txs).map(Some).boxed() } fn transaction_count(&self, address: RpcH160, num: Trailing) -> BoxFuture { @@ -297,11 +292,6 @@ impl Eth for EthClient { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.fetcher().header(BlockId::Hash(hash.into())).and_then(move |hdr| { - let hdr = match hdr { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - if hdr.transactions_root() == SHA3_NULL_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { @@ -317,11 +307,6 @@ impl Eth for EthClient { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.fetcher().header(num.into()).and_then(move |hdr| { - let hdr = match hdr { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - if hdr.transactions_root() == SHA3_NULL_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { @@ -337,11 +322,6 @@ impl Eth for EthClient { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.fetcher().header(BlockId::Hash(hash.into())).and_then(move |hdr| { - let hdr = match hdr { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { @@ -357,11 +337,6 @@ impl Eth for EthClient { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.fetcher().header(num.into()).and_then(move |hdr| { - let hdr = match hdr { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { diff --git a/rpc/src/v1/impls/light/parity.rs b/rpc/src/v1/impls/light/parity.rs index 63e1c64b6bd..015d4fef75f 100644 --- a/rpc/src/v1/impls/light/parity.rs +++ b/rpc/src/v1/impls/light/parity.rs @@ -360,7 +360,7 @@ impl Parity for ParityClient { }) } - fn block_header(&self, number: Trailing) -> BoxFuture, Error> { + fn block_header(&self, number: Trailing) -> BoxFuture { use ethcore::encoded; let engine = self.light_dispatch.client.engine().clone(); @@ -391,7 +391,7 @@ impl Parity for ParityClient { } }; - self.fetcher().header(number.0.into()).map(move |encoded| encoded.map(from_encoded)).boxed() + self.fetcher().header(number.0.into()).map(from_encoded).boxed() } fn ipfs_cid(&self, content: Bytes) -> Result { diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 689ba3e708a..9c3673ee462 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -400,17 +400,17 @@ impl Parity for ParityClient where }) } - fn block_header(&self, number: Trailing) -> BoxFuture, Error> { + fn block_header(&self, number: Trailing) -> BoxFuture { const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; let client = take_weakf!(self.client); let id: BlockId = number.0.into(); let encoded = match client.block_header(id.clone()) { Some(encoded) => encoded, - None => return future::ok(None).boxed(), + None => return future::err(errors::unknown_block()).boxed(), }; - future::ok(Some(RichHeader { + future::ok(RichHeader { inner: Header { hash: Some(encoded.hash().into()), size: Some(encoded.rlp().as_raw().len().into()), @@ -431,7 +431,7 @@ impl Parity for ParityClient where extra_data: Bytes::new(encoded.extra_data()), }, extra_info: client.block_extra_info(id).expect(EXTRA_INFO_PROOF), - })).boxed() + }).boxed() } fn ipfs_cid(&self, content: Bytes) -> Result { diff --git a/rpc/src/v1/traits/parity.rs b/rpc/src/v1/traits/parity.rs index 272d874955e..498d1b82dea 100644 --- a/rpc/src/v1/traits/parity.rs +++ b/rpc/src/v1/traits/parity.rs @@ -202,7 +202,7 @@ build_rpc_trait! { /// Get block header. /// Same as `eth_getBlockByNumber` but without uncles and transactions. #[rpc(async, name = "parity_getBlockHeaderByNumber")] - fn block_header(&self, Trailing) -> BoxFuture, Error>; + fn block_header(&self, Trailing) -> BoxFuture; /// Get IPFS CIDv0 given protobuf encoded bytes. #[rpc(name = "parity_cidV0")]