Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Back-references for the on-demand service #5573

Merged
merged 9 commits into from
May 23, 2017
371 changes: 152 additions & 219 deletions ethcore/light/src/on_demand/mod.rs

Large diffs are not rendered by default.

391 changes: 299 additions & 92 deletions ethcore/light/src/on_demand/request.rs

Large diffs are not rendered by default.

120 changes: 110 additions & 10 deletions ethcore/light/src/on_demand/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use ::request::{self as basic_request, Response};

use std::sync::Arc;

use super::{request, OnDemand, Peer};
use super::{request, OnDemand, Peer, HeaderRef};

// useful contexts to give the service.
enum Context {
Expand Down Expand Up @@ -122,7 +122,10 @@ fn dummy_capabilities() -> Capabilities {
#[test]
fn detects_hangup() {
let on_demand = Harness::create().service;
let result = on_demand.header_by_hash(&Context::NoOp, request::HeaderByHash(H256::default()));
let result = on_demand.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(H256::default().into()).into()],
);

assert_eq!(on_demand.pending.read().len(), 1);
drop(result);
Expand All @@ -148,7 +151,7 @@ fn single_request() {

let recv = harness.service.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(header.hash()).into()]
vec![request::HeaderByHash(header.hash().into()).into()]
).unwrap();

assert_eq!(harness.service.pending.read().len(), 1);
Expand Down Expand Up @@ -182,7 +185,7 @@ fn no_capabilities() {

let _recv = harness.service.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(Default::default()).into()]
vec![request::HeaderByHash(H256::default().into()).into()]
).unwrap();

assert_eq!(harness.service.pending.read().len(), 1);
Expand All @@ -209,7 +212,7 @@ fn reassign() {

let recv = harness.service.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(header.hash()).into()]
vec![request::HeaderByHash(header.hash().into()).into()]
).unwrap();

assert_eq!(harness.service.pending.read().len(), 1);
Expand Down Expand Up @@ -264,8 +267,8 @@ fn partial_response() {
let recv = harness.service.request_raw(
&Context::NoOp,
vec![
request::HeaderByHash(header1.hash()).into(),
request::HeaderByHash(header2.hash()).into(),
request::HeaderByHash(header1.hash().into()).into(),
request::HeaderByHash(header2.hash().into()).into(),
],
).unwrap();

Expand Down Expand Up @@ -323,8 +326,8 @@ fn part_bad_part_good() {
let recv = harness.service.request_raw(
&Context::NoOp,
vec![
request::HeaderByHash(header1.hash()).into(),
request::HeaderByHash(header2.hash()).into(),
request::HeaderByHash(header1.hash().into()).into(),
request::HeaderByHash(header2.hash().into()).into(),
],
).unwrap();

Expand Down Expand Up @@ -378,7 +381,7 @@ fn wrong_kind() {

let _recv = harness.service.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(Default::default()).into()]
vec![request::HeaderByHash(H256::default().into()).into()]
).unwrap();

assert_eq!(harness.service.pending.read().len(), 1);
Expand All @@ -395,3 +398,100 @@ fn wrong_kind() {

assert_eq!(harness.service.pending.read().len(), 1);
}

#[test]
fn back_references() {
let harness = Harness::create();

let peer_id = 10101;
let req_id = ReqId(14426);

harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});

let header = Header::default();
let encoded = encoded::Header::new(header.rlp(Seal::With));

let recv = harness.service.request_raw(
&Context::NoOp,
vec![
request::HeaderByHash(header.hash().into()).into(),
request::BlockReceipts(HeaderRef::Unresolved(0, header.hash().into())).into(),
]
).unwrap();

assert_eq!(harness.service.pending.read().len(), 1);

harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id));

assert_eq!(harness.service.pending.read().len(), 0);

harness.service.on_responses(
&Context::WithPeer(peer_id),
req_id,
&[
Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] }),
Response::Receipts(basic_request::ReceiptsResponse { receipts: vec![] }),
]
);

assert!(recv.wait().is_ok());
}

#[test]
#[should_panic]
fn bad_back_reference() {
let harness = Harness::create();

let header = Header::default();

let _ = harness.service.request_raw(
&Context::NoOp,
vec![
request::HeaderByHash(header.hash().into()).into(),
request::BlockReceipts(HeaderRef::Unresolved(1, header.hash().into())).into(),
]
).unwrap();
}

#[test]
fn fill_from_cache() {
let harness = Harness::create();

let peer_id = 10101;
let req_id = ReqId(14426);

harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});

let header = Header::default();
let encoded = encoded::Header::new(header.rlp(Seal::With));

let recv = harness.service.request_raw(
&Context::NoOp,
vec![
request::HeaderByHash(header.hash().into()).into(),
request::BlockReceipts(HeaderRef::Unresolved(0, header.hash().into())).into(),
]
).unwrap();

assert_eq!(harness.service.pending.read().len(), 1);

harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id));

assert_eq!(harness.service.pending.read().len(), 0);

harness.service.on_responses(
&Context::WithPeer(peer_id),
req_id,
&[
Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] }),
]
);

assert!(recv.wait().is_ok());
}
53 changes: 39 additions & 14 deletions ethcore/light/src/types/request/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! supplied as well.

use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use request::{
IncompleteRequest, OutputKind, Output, NoSuchOutput, ResponseError, ResponseLike,
};
Expand Down Expand Up @@ -124,23 +125,14 @@ impl<T: IncompleteRequest + Clone> Requests<T> {
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
}
}
}

impl<T: super::CheckedRequest> Requests<T> {
/// Supply a response for the next request.
/// Fails on: wrong request kind, all requests answered already.
pub fn supply_response(&mut self, env: &T::Environment, response: &T::Response)
-> Result<T::Extract, ResponseError<T::Error>>
{
let idx = self.answered;

// check validity.
if self.is_complete() { return Err(ResponseError::Unexpected) }

let extracted = self.requests[idx]
.check_response(env, response).map_err(ResponseError::Validity)?;
/// Supply a response, asserting its correctness.
/// Fill outputs based upon it.
pub fn supply_response_unchecked<R: ResponseLike>(&mut self, response: &R) {
if self.is_complete() { return }

let outputs = &mut self.outputs;
let idx = self.answered;
response.fill_outputs(|out_idx, output| {
// we don't need to check output kinds here because all back-references
// are validated in the builder.
Expand All @@ -154,7 +146,26 @@ impl<T: super::CheckedRequest> Requests<T> {
if let Some(ref mut req) = self.requests.get_mut(self.answered) {
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
}
}
}

impl<T: super::CheckedRequest + Clone> Requests<T> {
/// Supply a response for the next request.
/// Fails on: wrong request kind, all requests answered already.
pub fn supply_response(&mut self, env: &T::Environment, response: &T::Response)
-> Result<T::Extract, ResponseError<T::Error>>
{
let idx = self.answered;

// check validity.
if idx == self.requests.len() { return Err(ResponseError::Unexpected) }
let completed = self.next_complete()
.expect("only fails when all requests have been answered; this just checked against; qed");

let extracted = self.requests[idx]
.check_response(&completed, env, response).map_err(ResponseError::Validity)?;

self.supply_response_unchecked(response);
Ok(extracted)
}
}
Expand Down Expand Up @@ -182,6 +193,20 @@ impl Requests<super::Request> {
}
}

impl<T: IncompleteRequest> Deref for Requests<T> {
type Target = [T];

fn deref(&self) -> &[T] {
&self.requests[..]
}
}

impl<T: IncompleteRequest> DerefMut for Requests<T> {
fn deref_mut(&mut self) -> &mut [T] {
&mut self.requests[..]
}
}

#[cfg(test)]
mod tests {
use request::*;
Expand Down
29 changes: 26 additions & 3 deletions ethcore/light/src/types/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub enum ResponseError<T> {
}

/// An input to a request.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Field<T> {
/// A pre-specified input.
Scalar(T),
Expand All @@ -93,6 +93,29 @@ pub enum Field<T> {
}

impl<T> Field<T> {
/// Helper for creating a new back-reference field.
pub fn back_ref(idx: usize, req: usize) -> Self {
Field::BackReference(idx, req)
}

/// map a scalar into some other item.
pub fn map<F, U>(self, f: F) -> Field<U> where F: FnOnce(T) -> U {
match self {
Field::Scalar(x) => Field::Scalar(f(x)),
Field::BackReference(req, idx) => Field::BackReference(req, idx),
}
}

/// Attempt to get a reference to the inner scalar.
pub fn as_ref(&self) -> Option<&T> {
match *self {
Field::Scalar(ref x) => Some(x),
Field::BackReference(_, _) => None,
}
}



// attempt conversion into scalar value.
fn into_scalar(self) -> Result<T, NoSuchOutput> {
match self {
Expand Down Expand Up @@ -384,7 +407,7 @@ impl CheckedRequest for Request {
type Error = WrongKind;
type Environment = ();

fn check_response(&self, _: &(), response: &Response) -> Result<(), WrongKind> {
fn check_response(&self, _: &Self::Complete, _: &(), response: &Response) -> Result<(), WrongKind> {
if self.kind() == response.kind() {
Ok(())
} else {
Expand Down Expand Up @@ -571,7 +594,7 @@ pub trait CheckedRequest: IncompleteRequest {
type Environment;

/// Check whether the response matches (beyond the type).
fn check_response(&self, &Self::Environment, &Self::Response) -> Result<Self::Extract, Self::Error>;
fn check_response(&self, &Self::Complete, &Self::Environment, &Self::Response) -> Result<Self::Extract, Self::Error>;
}

/// A response-like object.
Expand Down
38 changes: 21 additions & 17 deletions parity/light_helpers/queue_cull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use light::client::Client;
use light::on_demand::{request, OnDemand};
use light::TransactionQueue;

use futures::{future, stream, Future, Stream};
use futures::{future, Future};

use parity_reactor::Remote;

Expand Down Expand Up @@ -73,28 +73,32 @@ impl IoHandler<ClientIoMessage> for QueueCull {
self.remote.spawn_with_timeout(move || {
let maybe_fetching = sync.with_context(move |ctx| {
// fetch the nonce of each sender in the queue.
let nonce_futures = senders.iter()
.map(|&address| request::Account { header: best_header.clone(), address: address })
.map(move |request| {
on_demand.account(ctx, request)
.map(move |maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce))
let nonce_reqs = senders.iter()
.map(|&address| request::Account { header: best_header.clone().into(), address: address })
.collect::<Vec<_>>();

// when they come in, update each sender to the new nonce.
on_demand.request(ctx, nonce_reqs)
.expect("No back-references; therefore all back-references are valid; qed")
.map(move |accs| {
let txq = txq.write();
let _ = accs.into_iter()
.map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce))
.zip(senders)
.fold(txq, |mut txq, (nonce, addr)| {
txq.cull(addr, nonce);
txq
});
})
.zip(senders.iter())
.map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce)));

// as they come in, update each sender to the new nonce.
stream::futures_unordered(nonce_futures)
.fold(txq, |txq, (address, nonce)| {
txq.write().cull(address, nonce);
future::ok(txq)
})
.map(|_| ()) // finally, discard the txq handle and log errors.
.map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel."))
});

match maybe_fetching {
Some(fut) => fut.boxed(),
None => future::ok(()).boxed(),
None => {
debug!(target: "cull", "Unable to acquire network context; qed");
future::ok(()).boxed()
}
}
}, Duration::from_millis(PURGE_TIMEOUT_MS), || {})
}
Expand Down
Loading