diff --git a/demo/cli/src/lib.rs b/demo/cli/src/lib.rs index 0267ac35607a9..485c10570bcbb 100644 --- a/demo/cli/src/lib.rs +++ b/demo/cli/src/lib.rs @@ -50,15 +50,15 @@ pub mod error; use std::sync::Arc; use demo_primitives::Hash; use demo_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig, - SessionConfig, StakingConfig, BuildStorage}; -use demo_runtime::{Block, UncheckedExtrinsic}; + SessionConfig, StakingConfig}; +use demo_runtime::{Block, BlockId, UncheckedExtrinsic, BuildStorage}; use futures::{Future, Sink, Stream}; struct DummyPool; -impl extrinsic_pool::api::ExtrinsicPool for DummyPool { +impl extrinsic_pool::api::ExtrinsicPool for DummyPool { type Error = extrinsic_pool::txpool::Error; - fn submit(&self, _: Vec) + fn submit(&self, _block: BlockId, _: Vec) -> Result, Self::Error> { Err("unimplemented".into()) @@ -155,7 +155,8 @@ pub fn run(args: I) -> error::Result<()> where let _rpc_servers = { let handler = || { let chain = rpc::apis::chain::Chain::new(client.clone(), core.remote()); - rpc::rpc_handler::(client.clone(), chain, Arc::new(DummyPool), DummySystem) + let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool)); + rpc::rpc_handler::(client.clone(), chain, author, DummySystem) }; let http_address = "127.0.0.1:9933".parse().unwrap(); let ws_address = "127.0.0.1:9944".parse().unwrap(); diff --git a/demo/runtime/src/lib.rs b/demo/runtime/src/lib.rs index 77bcad3d25eb6..08529a1974721 100644 --- a/demo/runtime/src/lib.rs +++ b/demo/runtime/src/lib.rs @@ -162,6 +162,8 @@ pub type Address = staking::Address; pub type Header = generic::Header>; /// Block type as expected by this runtime. pub type Block = generic::Block; +/// BlockId type as expected by this runtime. +pub type BlockId = generic::BlockId; /// Unchecked extrinsic type as expected by this runtime. pub type UncheckedExtrinsic = generic::UncheckedExtrinsic; /// Extrinsic type as expected by this runtime. This is not the type that is signed. diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index 48dd25ebc8b4f..cd0040c43926f 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -27,12 +27,12 @@ parking_lot = "0.4" serde_json = "1.0" serde = "1.0" substrate-client = { path = "../../substrate/client" } -substrate-state-machine = { path = "../../substrate/state-machine" } -substrate-rpc = { path = "../../substrate/rpc" } -substrate-rpc-servers = { path = "../../substrate/rpc-servers" } substrate-network = { path = "../../substrate/network" } substrate-primitives = { path = "../../substrate/primitives" } +substrate-rpc = { path = "../../substrate/rpc" } +substrate-rpc-servers = { path = "../../substrate/rpc-servers" } substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" } +substrate-state-machine = { path = "../../substrate/state-machine" } substrate-telemetry = { path = "../../substrate/telemetry" } polkadot-primitives = { path = "../primitives" } polkadot-runtime = { path = "../runtime" } diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index d70b0cff9cd41..36fb5653a9f64 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -34,13 +34,13 @@ extern crate parking_lot; extern crate serde; extern crate serde_json; -extern crate substrate_primitives; -extern crate substrate_state_machine as state_machine; extern crate substrate_client as client; extern crate substrate_network as network; +extern crate substrate_primitives; extern crate substrate_rpc; extern crate substrate_rpc_servers as rpc; extern crate substrate_runtime_primitives as runtime_primitives; +extern crate substrate_state_machine as state_machine; extern crate polkadot_primitives; extern crate polkadot_runtime; extern crate polkadot_service as service; @@ -192,12 +192,12 @@ pub fn run(args: I) -> error::Result<()> where let (mut genesis_storage, boot_nodes) = PresetConfig::from_spec(chain_spec) .map(PresetConfig::deconstruct) - .unwrap_or_else(|f| (Box::new(move || + .unwrap_or_else(|f| (Box::new(move || read_storage_json(&f) .map(|s| { info!("{} storage items read from {}", s.len(), f); s }) .unwrap_or_else(|| panic!("Bad genesis state file: {}", f)) ), vec![])); - + if matches.is_present("build-genesis") { info!("Building genesis"); for (i, (k, v)) in genesis_storage().iter().enumerate() { @@ -285,10 +285,11 @@ fn run_until_exit(mut core: reactor::Core, service: service::Service, matc let handler = || { let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); + let author = rpc::apis::author::Author::new(service.client(), service.transaction_pool()); rpc::rpc_handler::( service.client(), chain, - service.transaction_pool(), + author, sys_conf.clone(), ) }; diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 6146e6d87da90..5dd4c1d484e46 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -74,7 +74,7 @@ use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, Timestamp}; use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt}; use polkadot_runtime::BareExtrinsic; use primitives::AuthorityId; -use transaction_pool::{Ready, TransactionPool}; +use transaction_pool::{TransactionPool}; use tokio_core::reactor::{Handle, Timeout, Interval}; use futures::prelude::*; @@ -226,7 +226,7 @@ pub struct ProposerFactory { /// The client instance. pub client: Arc, /// The transaction pool. - pub transaction_pool: Arc, + pub transaction_pool: Arc>, /// The backing network handle. pub network: N, /// Parachain collators. @@ -239,7 +239,8 @@ pub struct ProposerFactory { impl bft::ProposerFactory for ProposerFactory where - C: PolkadotApi, + C: PolkadotApi + Send + Sync, + C::CheckedBlockId: Sync, N: Network, P: Collators, { @@ -319,12 +320,13 @@ pub struct Proposer { random_seed: Hash, router: R, table: Arc, - transaction_pool: Arc, + transaction_pool: Arc>, } impl bft::Proposer for Proposer where - C: PolkadotApi, + C: PolkadotApi + Send + Sync, + C::CheckedBlockId: Sync, R: TableRouter, P: Collators, { @@ -501,8 +503,7 @@ impl bft::Proposer for Proposer let local_id = self.local_key.public().0.into(); let mut next_index = { - let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); - let cur_index = self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending| pending + let cur_index = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending| pending .filter(|tx| tx.sender().map(|s| s == local_id).unwrap_or(false)) .last() .map(|tx| Ok(tx.index())) @@ -510,7 +511,11 @@ impl bft::Proposer for Proposer ); match cur_index { - Ok(cur_index) => cur_index + 1, + Ok(Ok(cur_index)) => cur_index + 1, + Ok(Err(e)) => { + warn!(target: "consensus", "Error computing next transaction index: {}", e); + return; + } Err(e) => { warn!(target: "consensus", "Error computing next transaction index: {}", e); return; @@ -549,7 +554,7 @@ impl bft::Proposer for Proposer }; let uxt = UncheckedExtrinsic::new(extrinsic, signature); - self.transaction_pool.import_unchecked_extrinsic(uxt) + self.transaction_pool.import_unchecked_extrinsic(BlockId::hash(self.parent_hash), uxt) .expect("locally signed extrinsic is valid; qed"); } } @@ -618,7 +623,7 @@ pub struct CreateProposal { parent_number: BlockNumber, parent_id: C::CheckedBlockId, client: Arc, - transaction_pool: Arc, + transaction_pool: Arc>, collation: CollationFetch, router: R, table: Arc, @@ -640,9 +645,8 @@ impl CreateProposal let mut block_builder = self.client.build_block(&self.parent_id, timestamp, candidates)?; { - let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); let mut unqueue_invalid = Vec::new(); - self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending_iterator| { + let result = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending_iterator| { let mut pending_size = 0; for pending in pending_iterator { // skip and cull transactions which are too large. @@ -664,6 +668,9 @@ impl CreateProposal } } }); + if let Err(e) = result { + warn!("Unable to get the pending set: {:?}", e); + } self.transaction_pool.remove(&unqueue_invalid, false); } diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index f1ee7f49fe3b2..7e7f5e914665a 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -235,13 +235,14 @@ impl Service { client: Arc, api: Arc, network: Arc>, - transaction_pool: Arc, + transaction_pool: Arc>, parachain_empty_duration: Duration, key: ed25519::Pair, ) -> Service where A: LocalPolkadotApi + Send + Sync + 'static, C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + Send + Sync + 'static, + A::CheckedBlockId: Sync, { let (signal, exit) = ::exit_future::signal(); let thread = thread::spawn(move || { diff --git a/polkadot/service/src/components.rs b/polkadot/service/src/components.rs index e8274f4dfccc9..7211fa660b650 100644 --- a/polkadot/service/src/components.rs +++ b/polkadot/service/src/components.rs @@ -55,11 +55,11 @@ pub trait Components { fn build_api(&self, client: Arc>) -> Arc; /// Create network transaction pool adapter. - fn build_network_tx_pool(&self, client: Arc>, api: Arc, tx_pool: Arc) + fn build_network_tx_pool(&self, client: Arc>, tx_pool: Arc>) -> Arc>; /// Create consensus service. - fn build_consensus(&self, client: Arc>, network: Arc>, tx_pool: Arc, keystore: &Keystore) + fn build_consensus(&self, client: Arc>, network: Arc>, tx_pool: Arc>, keystore: &Keystore) -> Result, error::Error>; } @@ -83,17 +83,16 @@ impl Components for FullComponents { client } - fn build_network_tx_pool(&self, client: Arc>, api: Arc, pool: Arc) + fn build_network_tx_pool(&self, client: Arc>, pool: Arc>) -> Arc> { Arc::new(TransactionPoolAdapter { imports_external_transactions: true, pool, client, - api, }) } - fn build_consensus(&self, client: Arc>, network: Arc>, tx_pool: Arc, keystore: &Keystore) + fn build_consensus(&self, client: Arc>, network: Arc>, tx_pool: Arc>, keystore: &Keystore) -> Result, error::Error> { if !self.is_validator { return Ok(None); @@ -134,17 +133,16 @@ impl Components for LightComponents { Arc::new(polkadot_api::light::RemotePolkadotApiWrapper(client.clone())) } - fn build_network_tx_pool(&self, client: Arc>, api: Arc, pool: Arc) + fn build_network_tx_pool(&self, client: Arc>, pool: Arc>) -> Arc> { Arc::new(TransactionPoolAdapter { imports_external_transactions: false, pool, client, - api, }) } - fn build_consensus(&self, _client: Arc>, _network: Arc>, _tx_pool: Arc, _keystore: &Keystore) + fn build_consensus(&self, _client: Arc>, _network: Arc>, _tx_pool: Arc>, _keystore: &Keystore) -> Result, error::Error> { Ok(None) } @@ -153,9 +151,25 @@ impl Components for LightComponents { /// Transaction pool adapter. pub struct TransactionPoolAdapter where A: Send + Sync, E: Send + Sync { imports_external_transactions: bool, - pool: Arc, + pool: Arc>, client: Arc>, - api: Arc, +} + +impl TransactionPoolAdapter + where + A: Send + Sync, + B: client::backend::Backend + Send + Sync, + E: client::CallExecutor + Send + Sync, + client::error::Error: From<<>::State as state_machine::backend::Backend>::Error>, +{ + fn best_block_id(&self) -> Option { + self.client.info() + .map(|info| BlockId::hash(info.chain.best_hash)) + .map_err(|e| { + debug!("Error getting best block: {:?}", e); + }) + .ok() + } } impl network::TransactionPool for TransactionPoolAdapter @@ -166,28 +180,20 @@ impl network::TransactionPool for TransactionPoolAdapter Vec<(Hash, Vec)> { - let best_block = match self.client.info() { - Ok(info) => info.chain.best_hash, - Err(e) => { - debug!("Error getting best block: {:?}", e); - return Vec::new(); - } - }; - - let id = match self.api.check_id(BlockId::hash(best_block)) { - Ok(id) => id, - Err(_) => return Vec::new(), + let best_block_id = match self.best_block_id() { + Some(id) => id, + None => return vec![], }; - - let ready = transaction_pool::Ready::create(id, &*self.api); - - self.pool.cull_and_get_pending(ready, |pending| pending + self.pool.cull_and_get_pending(best_block_id, |pending| pending .map(|t| { let hash = t.hash().clone(); (hash, t.primitive_extrinsic()) }) .collect() - ) + ).unwrap_or_else(|e| { + warn!("Error retrieving pending set: {}", e); + vec![] + }) } fn import(&self, transaction: &Vec) -> Option { @@ -197,7 +203,8 @@ impl network::TransactionPool for TransactionPoolAdapter Some(*xt.hash()), Err(e) => match *e.kind() { transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()), diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 75ddf1ffbee7d..230a9ae587ef4 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -75,7 +75,7 @@ pub struct Service { thread: Option>, client: Arc>, network: Arc>, - transaction_pool: Arc, + transaction_pool: Arc>, signal: Option, _consensus: Option, } @@ -127,8 +127,8 @@ impl Service info!("Best block is #{}", best_header.number); telemetry!("node.start"; "height" => best_header.number, "best" => ?best_header.hash()); - let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool)); - let transaction_pool_adapter = components.build_network_tx_pool(client.clone(), api.clone(), transaction_pool.clone()); + let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool, api.clone())); + let transaction_pool_adapter = components.build_network_tx_pool(client.clone(), transaction_pool.clone()); let network_params = network::Params { config: network::ProtocolConfig { roles: config.roles, @@ -161,7 +161,8 @@ impl Service let events = client.import_notification_stream() .for_each(move |notification| { network1.on_block_imported(notification.hash, ¬ification.header); - prune_imported(&*api, &*txpool1, notification.hash); + prune_imported(&*txpool1, notification.hash); + Ok(()) }); core.handle().spawn(events); @@ -210,22 +211,22 @@ impl Service } /// Get shared transaction pool instance. - pub fn transaction_pool(&self) -> Arc { + pub fn transaction_pool(&self) -> Arc> { self.transaction_pool.clone() } } /// Produce a task which prunes any finalized transactions from the pool. -pub fn prune_imported(api: &A, pool: &TransactionPool, hash: Hash) - where - A: PolkadotApi, +pub fn prune_imported(pool: &TransactionPool, hash: Hash) + where A: PolkadotApi, { - match api.check_id(BlockId::hash(hash)) { - Ok(id) => { - let ready = transaction_pool::Ready::create(id, api); - pool.cull(None, ready); - }, - Err(e) => warn!("Failed to check block id: {:?}", e), + let block = BlockId::hash(hash); + if let Err(e) = pool.cull(block) { + warn!("Culling error: {:?}", e); + } + + if let Err(e) = pool.retry_verification(block) { + warn!("Re-verifying error: {:?}", e); } } diff --git a/polkadot/transaction-pool/src/error.rs b/polkadot/transaction-pool/src/error.rs index 78f9d3e771041..ef6cdf6b4166c 100644 --- a/polkadot/transaction-pool/src/error.rs +++ b/polkadot/transaction-pool/src/error.rs @@ -15,12 +15,14 @@ // along with Polkadot. If not, see . use extrinsic_pool::{self, txpool}; +use polkadot_api; use primitives::Hash; use runtime::{Address, UncheckedExtrinsic}; error_chain! { links { Pool(txpool::Error, txpool::ErrorKind); + Api(polkadot_api::Error, polkadot_api::ErrorKind); } errors { /// Unexpected extrinsic format submitted @@ -53,11 +55,6 @@ error_chain! { description("Unrecognised address in extrinsic"), display("Unrecognised address in extrinsic: {}", who), } - /// Extrinsic is not yet checked. - NotReady { - description("Indexed address is unverified"), - display("Indexed address is unverified"), - } } } diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index 843af20fb16f2..1fab5d481ee13 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . extern crate ed25519; +extern crate substrate_client as client; extern crate substrate_codec as codec; extern crate substrate_extrinsic_pool as extrinsic_pool; extern crate substrate_primitives as substrate_primitives; @@ -37,19 +38,17 @@ mod error; use std::{ cmp::Ordering, - collections::{hash_map::Entry, HashMap}, + collections::HashMap, ops::Deref, sync::Arc, - result }; -use parking_lot::Mutex; use codec::Slicable; -use extrinsic_pool::{Pool, txpool::{self, Readiness, scoring::{Change, Choice}}}; +use extrinsic_pool::{Pool, Listener, txpool::{self, Readiness, scoring::{Change, Choice}}}; use extrinsic_pool::api::ExtrinsicPool; use polkadot_api::PolkadotApi; -use primitives::{AccountId, AccountIndex, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic}; -use runtime::{Address, RawAddress, UncheckedExtrinsic}; +use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic}; +use runtime::{Address, UncheckedExtrinsic}; use substrate_runtime_primitives::traits::{Bounded, Checkable, Hashing, BlakeTwo256}; pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps}; @@ -59,65 +58,16 @@ pub use error::{Error, ErrorKind, Result}; pub type CheckedExtrinsic = ::Checked; /// A verified transaction which should be includable and non-inherent. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct VerifiedTransaction { original: UncheckedExtrinsic, - // `create()` will leave this as `Some` only if the `Address` is an `AccountId`, otherwise a - // call to `polish` is needed. - inner: Mutex>, + inner: Option, + sender: Option, hash: Hash, encoded_size: usize, } -impl Clone for VerifiedTransaction { - fn clone(&self) -> Self { - VerifiedTransaction { - original: self.original.clone(), - inner: Mutex::new(self.inner.lock().clone()), - hash: self.hash.clone(), - encoded_size: self.encoded_size.clone(), - } - } -} - impl VerifiedTransaction { - /// Attempt to verify a transaction. - fn create(original: UncheckedExtrinsic) -> Result { - if !original.is_signed() { - bail!(ErrorKind::IsInherent(original)) - } - const UNAVAILABLE_MESSAGE: &'static str = "chain state not available"; - let (encoded_size, hash) = original.using_encoded(|e| (e.len(), BlakeTwo256::hash(e))); - let lookup = |a| match a { - RawAddress::Id(i) => Ok(i), - _ => Err(UNAVAILABLE_MESSAGE), - }; - let inner = Mutex::new(match original.clone().check(lookup) { - Ok(xt) => Some(xt), - Err(e) if e == UNAVAILABLE_MESSAGE => None, - Err(e) => bail!(ErrorKind::BadSignature(e)), - }); - Ok(VerifiedTransaction { original, inner, hash, encoded_size }) - } - - /// If this transaction isn't really verified, verify it and morph it into a really verified - /// transaction. - pub fn polish(&self, lookup: F) -> Result<()> where - F: FnOnce(Address) -> result::Result + Send + Sync - { - let inner: result::Result = self.original - .clone() - .check(lookup) - .map_err(|e| ErrorKind::BadSignature(e).into()); - *self.inner.lock() = Some(inner?); - Ok(()) - } - - /// Is this transaction *really* verified? - pub fn is_really_verified(&self) -> bool { - self.inner.lock().is_some() - } - /// Access the underlying transaction. pub fn as_transaction(&self) -> &UncheckedExtrinsic { &self.original @@ -129,9 +79,9 @@ impl VerifiedTransaction { .expect("UncheckedExtrinsic shares repr with Vec; qed") } - /// Consume the verified transaciton, yielding the unchecked counterpart. - pub fn into_inner(self) -> Result { - self.inner.lock().clone().ok_or_else(|| ErrorKind::NotReady.into()) + /// Consume the verified transaction, yielding the checked counterpart. + pub fn into_inner(self) -> Option { + self.inner } /// Get the 256-bit hash of this transaction. @@ -140,8 +90,8 @@ impl VerifiedTransaction { } /// Get the account ID of the sender of this transaction. - pub fn sender(&self) -> Result { - self.inner.lock().as_ref().map(|i| i.signed.clone()).ok_or_else(|| ErrorKind::NotReady.into()) + pub fn sender(&self) -> Option { + self.sender } /// Get the account ID of the sender of this transaction. @@ -153,22 +103,27 @@ impl VerifiedTransaction { pub fn encoded_size(&self) -> usize { self.encoded_size } + + /// Returns `true` if the transaction is not yet fully verified. + pub fn is_fully_verified(&self) -> bool { + self.inner.is_some() + } } impl txpool::VerifiedTransaction for VerifiedTransaction { type Hash = Hash; - type Sender = Address; + type Sender = Option; fn hash(&self) -> &Self::Hash { &self.hash } fn sender(&self) -> &Self::Sender { - self.original.sender() + &self.sender } fn mem_usage(&self) -> usize { - 1 // TODO + self.encoded_size // TODO } } @@ -184,7 +139,19 @@ impl txpool::Scoring for Scoring { old.index().cmp(&other.index()) } - fn choose(&self, _old: &VerifiedTransaction, _new: &VerifiedTransaction) -> Choice { + fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> Choice { + if old.is_fully_verified() { + assert!(new.is_fully_verified(), "Scoring::choose called with transactions from different senders"); + if old.index() == new.index() { + // TODO [ToDr] Do we allow replacement? If yes then it should be Choice::ReplaceOld + return Choice::RejectNew; + } + } + + // This will keep both transactions, even though they have the same indices. + // It's fine for not fully verified transactions, we might also allow it for + // verified transactions but it would mean that only one of the two is actually valid + // (most likely the first to be included in the block). Choice::InsertNew } @@ -195,33 +162,37 @@ impl txpool::Scoring for Scoring { _change: Change<()> ) { for i in 0..xts.len() { - // all the same score since there are no fees. - // TODO: prioritize things like misbehavior or fishermen reports - scores[i] = 1; + if !xts[i].is_fully_verified() { + scores[i] = 0; + } else { + // all the same score since there are no fees. + // TODO: prioritize things like misbehavior or fishermen reports + scores[i] = 1; + } } } - fn should_replace(&self, _old: &VerifiedTransaction, _new: &VerifiedTransaction) -> bool { - false // no fees to determine which is better. + + fn should_replace(&self, old: &VerifiedTransaction, _new: &VerifiedTransaction) -> bool { + // Always replace not fully verified transactions. + !old.is_fully_verified() } } /// Readiness evaluator for polkadot transactions. -pub struct Ready<'a, T: 'a + PolkadotApi> { - at_block: T::CheckedBlockId, - api: &'a T, - known_nonces: HashMap, - known_indexes: HashMap, +pub struct Ready<'a, A: 'a + PolkadotApi> { + at_block: A::CheckedBlockId, + api: &'a A, + known_nonces: HashMap, } -impl<'a, T: 'a + PolkadotApi> Ready<'a, T> { +impl<'a, A: 'a + PolkadotApi> Ready<'a, A> { /// Create a new readiness evaluator at the given block. Requires that /// the ID has already been checked for local corresponding and available state. - pub fn create(at: T::CheckedBlockId, api: &'a T) -> Self { + fn create(at: A::CheckedBlockId, api: &'a A) -> Self { Ready { at_block: at, api, known_nonces: HashMap::new(), - known_indexes: HashMap::new(), } } } @@ -232,136 +203,214 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> { at_block: self.at_block.clone(), api: self.api, known_nonces: self.known_nonces.clone(), - known_indexes: self.known_indexes.clone(), } } } -impl<'a, T: 'a + PolkadotApi> txpool::Ready for Ready<'a, T> +impl<'a, A: 'a + PolkadotApi> txpool::Ready for Ready<'a, A> { fn is_ready(&mut self, xt: &VerifiedTransaction) -> Readiness { - if !xt.is_really_verified() { - let id = match xt.original.extrinsic.signed.clone() { - RawAddress::Id(id) => id.clone(), // should never happen, since we're not verified. - RawAddress::Index(i) => match self.known_indexes.entry(i) { - Entry::Occupied(e) => e.get().clone(), - Entry::Vacant(e) => { - let (api, at_block) = (&self.api, &self.at_block); - if let Some(id) = api.lookup(at_block, RawAddress::Index(i)) - .ok() - .and_then(|o| o) - { - e.insert(id.clone()); - id - } else { - // Invalid index. - // return stale in order to get the pool to throw it away. - return Readiness::Stale - } - } - }, - }; - if VerifiedTransaction::polish(xt, move |_| Ok(id)).is_err() { - // Invalid signature. - // return stale in order to get the pool to throw it away. - return Readiness::Stale - } - } - - // guaranteed to be properly verified at this point. + let sender = match xt.sender() { + Some(sender) => sender, + None => return Readiness::Future + }; - let sender = xt.sender().expect("only way to get here is `is_really_verified` or successful `polish`; either guarantees `is_really_verified`; `sender` is `Ok` if `is_really_verified`; qed"); trace!(target: "transaction-pool", "Checking readiness of {} (from {})", xt.hash, Hash::from(sender)); - let is_index_sender = match xt.original.extrinsic.signed { RawAddress::Index(_) => false, _ => true }; - // TODO: find a way to handle index error properly -- will need changes to // transaction-pool trait. let (api, at_block) = (&self.api, &self.at_block); - let get_nonce = || api.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value); - let (next_nonce, was_index_sender) = self.known_nonces.entry(sender).or_insert_with(|| (get_nonce(), is_index_sender)); - - trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_nonce, xt.original.extrinsic.index); - - if *was_index_sender == is_index_sender || get_nonce() == *next_nonce { - match xt.original.extrinsic.index.cmp(&next_nonce) { - Ordering::Greater => Readiness::Future, - Ordering::Less => Readiness::Stale, - Ordering::Equal => { - // remember to increment `next_nonce` - // TODO: this won't work perfectly since accounts can now be killed, returning the nonce - // to zero. - *next_nonce = next_nonce.saturating_add(1); - Readiness::Ready - } - } - } else { - // ignore for now. - Readiness::Future - } + let next_index = self.known_nonces.entry(sender) + .or_insert_with(|| api.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value)); + + trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.original.extrinsic.index); + + let result = match xt.original.extrinsic.index.cmp(&next_index) { + // TODO: this won't work perfectly since accounts can now be killed, returning the nonce + // to zero. + // We should detect if the index was reset and mark all transactions as `Stale` for cull to work correctly. + // Otherwise those transactions will keep occupying the queue. + // Perhaps we could mark as stale if `index - state_index` > X? + Ordering::Greater => Readiness::Future, + Ordering::Equal => Readiness::Ready, + // TODO [ToDr] Should mark transactions referrencing too old blockhash as `Stale` as well. + Ordering::Less => Readiness::Stale, + }; + + // remember to increment `next_index` + *next_index = next_index.saturating_add(1); + + result } } -pub struct Verifier; +pub struct Verifier<'a, A: 'a, B> { + api: &'a A, + at_block: B, +} + +impl<'a, A> Verifier<'a, A, A::CheckedBlockId> where + A: 'a + PolkadotApi, +{ + const NO_ACCOUNT: &'static str = "Account not found."; + + fn lookup(&self, address: Address) -> ::std::result::Result { + // TODO [ToDr] Consider introducing a cache for this. + match self.api.lookup(&self.at_block, address.clone()) { + Ok(Some(address)) => Ok(address), + Ok(None) => Err(Self::NO_ACCOUNT.into()), + Err(e) => { + error!("Error looking up address: {:?}: {:?}", address, e); + Err("API error.") + }, + } + } +} -impl txpool::Verifier for Verifier { +impl<'a, A> txpool::Verifier for Verifier<'a, A, A::CheckedBlockId> where + A: 'a + PolkadotApi, +{ type VerifiedTransaction = VerifiedTransaction; type Error = Error; fn verify_transaction(&self, uxt: UncheckedExtrinsic) -> Result { info!("Extrinsic Submitted: {:?}", uxt); - VerifiedTransaction::create(uxt) + + if !uxt.is_signed() { + bail!(ErrorKind::IsInherent(uxt)) + } + + let (encoded_size, hash) = uxt.using_encoded(|e| (e.len(), BlakeTwo256::hash(e))); + let inner = match uxt.clone().check(|a| self.lookup(a)) { + Ok(xt) => Some(xt), + // keep the transaction around in the future pool and attempt to promote it later. + Err(Self::NO_ACCOUNT) => None, + Err(e) => bail!(e), + }; + let sender = inner.as_ref().map(|x| x.signed.clone()); + + Ok(VerifiedTransaction { + original: uxt, + inner, + sender, + hash, + encoded_size + }) } } /// The polkadot transaction pool. /// /// Wraps a `extrinsic_pool::Pool`. -pub struct TransactionPool { - inner: Pool, +pub struct TransactionPool { + inner: Pool, + api: Arc, } -impl TransactionPool { +impl TransactionPool where + A: PolkadotApi, +{ /// Create a new transaction pool. - pub fn new(options: Options) -> Self { + pub fn new(options: Options, api: Arc) -> Self { TransactionPool { - inner: Pool::new(options, Verifier, Scoring), + inner: Pool::new(options, Scoring), + api, } } - // TODO: remove. This is pointless - just use `submit()` directly. - pub fn import_unchecked_extrinsic(&self, uxt: UncheckedExtrinsic) -> Result> { - self.inner.submit(vec![uxt]).map(|mut v| v.swap_remove(0)) + /// Attempt to directly import `UncheckedExtrinsic` without going through serialization. + pub fn import_unchecked_extrinsic(&self, block: BlockId, uxt: UncheckedExtrinsic) -> Result> { + let verifier = Verifier { + api: &*self.api, + at_block: self.api.check_id(block)?, + }; + self.inner.submit(verifier, vec![uxt]).map(|mut v| v.swap_remove(0)) + } + + /// Retry to import all semi-verified transactions (unknown account indices) + pub fn retry_verification(&self, block: BlockId) -> Result<()> { + let to_reverify = self.inner.remove_sender(None); + let verifier = Verifier { + api: &*self.api, + at_block: self.api.check_id(block)?, + }; + + self.inner.submit(verifier, to_reverify.into_iter().map(|tx| tx.original.clone()))?; + Ok(()) + } + + /// Reverify transaction that has been reported incorrect. + /// + /// Returns `Ok(None)` in case the hash is missing, `Err(e)` in case of verification error and new transaction + /// reference otherwise. + /// + /// TODO [ToDr] That method is currently unused, should be used together with BlockBuilder + /// when we detect that particular transaction has failed. + /// In such case we will attempt to remove or re-verify it. + pub fn reverify_transaction(&self, block: BlockId, hash: Hash) -> Result>> { + let result = self.inner.remove(&[hash], false).pop().expect("One hash passed; one result received; qed"); + if let Some(tx) = result { + self.import_unchecked_extrinsic(block, tx.original.clone()).map(Some) + } else { + Ok(None) + } + } + + /// Cull old transactions from the queue. + pub fn cull(&self, block: BlockId) -> Result { + let id = self.api.check_id(block)?; + let ready = Ready::create(id, &*self.api); + Ok(self.inner.cull(None, ready)) + } + + /// Cull transactions from the queue and then compute the pending set. + pub fn cull_and_get_pending(&self, block: BlockId, f: F) -> Result where + F: FnOnce(txpool::PendingIterator, Scoring, Listener>) -> T, + { + let id = self.api.check_id(block)?; + let ready = Ready::create(id, &*self.api); + self.inner.cull(None, ready.clone()); + Ok(self.inner.pending(ready, f)) + } + + /// Remove a set of transactions idenitified by hashes. + pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec>> { + self.inner.remove(hashes, is_valid) } } -impl Deref for TransactionPool { - type Target = Pool; +impl Deref for TransactionPool { + type Target = Pool; fn deref(&self) -> &Self::Target { &self.inner } } -impl ExtrinsicPool for TransactionPool { +impl ExtrinsicPool for TransactionPool where + A: Send + Sync + 'static, + A: PolkadotApi, +{ type Error = Error; - fn submit(&self, xts: Vec) -> Result> { + fn submit(&self, block: BlockId, xts: Vec) -> Result> { // TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions, // even when runtime is out of date. xts.into_iter() .map(|xt| xt.encode()) - .map(|encoded| UncheckedExtrinsic::decode(&mut &encoded[..])) - .map(|maybe_decoded| maybe_decoded.ok_or_else(|| ErrorKind::InvalidExtrinsicFormat.into())) - .map(|x| x.and_then(|x| self.import_unchecked_extrinsic(x))) - .map(|x| x.map(|x| x.hash().clone())) + .map(|encoded| { + let decoded = UncheckedExtrinsic::decode(&mut &encoded[..]).ok_or(ErrorKind::InvalidExtrinsicFormat)?; + let tx = self.import_unchecked_extrinsic(block, decoded)?; + Ok(*tx.hash()) + }) .collect() } } #[cfg(test)] mod tests { - use super::{TransactionPool, Ready}; + use std::sync::{atomic::{self, AtomicBool}, Arc}; + use super::TransactionPool; use substrate_keyring::Keyring::{self, *}; use codec::Slicable; use polkadot_api::{PolkadotApi, BlockBuilder, CheckedBlockId, Result}; @@ -390,8 +439,23 @@ mod tests { } } - #[derive(Clone)] - struct TestPolkadotApi; + #[derive(Default, Clone)] + struct TestPolkadotApi { + no_lookup: Arc, + } + + impl TestPolkadotApi { + fn without_lookup() -> Self { + TestPolkadotApi { + no_lookup: Arc::new(AtomicBool::new(true)), + } + } + + pub fn enable_lookup(&self) { + self.no_lookup.store(false, atomic::Ordering::SeqCst); + } + } + impl PolkadotApi for TestPolkadotApi { type CheckedBlockId = TestCheckedBlockId; type BlockBuilder = TestBlockBuilder; @@ -415,6 +479,7 @@ mod tests { fn lookup(&self, _at: &TestCheckedBlockId, _address: RawAddress) -> Result> { match _address { RawAddress::Id(i) => Ok(Some(i)), + RawAddress::Index(_) if self.no_lookup.load(atomic::Ordering::SeqCst) => Ok(None), RawAddress::Index(i) => Ok(match (i < 8, i + (number_of(_at) as u64) % 8) { (false, _) => None, (_, 0) => Some(Alice.to_raw_public().into()), @@ -456,130 +521,168 @@ mod tests { }, MaybeUnsigned(sig.into())).using_encoded(|e| UncheckedExtrinsic::decode(&mut &e[..])).unwrap() } + fn pool(api: &TestPolkadotApi) -> TransactionPool { + TransactionPool::new(Default::default(), Arc::new(api.clone())) + } + #[test] fn id_submission_should_work() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, true)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209)]); } #[test] fn index_submission_should_work() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209)]); } #[test] fn multiple_id_submission_should_work() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, true)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, true)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]); } #[test] fn multiple_index_submission_should_work() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, false)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]); } #[test] fn id_based_early_nonce_should_be_culled() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 208, true)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 208, true)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); } #[test] fn index_based_early_nonce_should_be_culled() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 208, false)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 208, false)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); } #[test] fn id_based_late_nonce_should_be_queued() { - let pool = TransactionPool::new(Default::default()); - let ready = || Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); + let api = TestPolkadotApi::default(); + let pool = pool(&api); - pool.submit(vec![uxt(Alice, 210, true)]).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); - pool.submit(vec![uxt(Alice, 209, true)]).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]); } #[test] fn index_based_late_nonce_should_be_queued() { - let pool = TransactionPool::new(Default::default()); - let ready = || Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); + let api = TestPolkadotApi::default(); + let pool = pool(&api); - pool.submit(vec![uxt(Alice, 210, false)]).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]); } #[test] fn index_then_id_submission_should_make_progress() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, true)]).unwrap(); + let api = TestPolkadotApi::without_lookup(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); + assert_eq!(pending, vec![]); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + api.enable_lookup(); + pool.retry_verification(BlockId::number(0)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![ - (Some(Alice.to_raw_public().into()), 209) + (Some(Alice.to_raw_public().into()), 209), + (Some(Alice.to_raw_public().into()), 210) ]); } + #[test] + fn retrying_verification_might_not_change_anything() { + let api = TestPolkadotApi::without_lookup(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); + assert_eq!(pending, vec![]); + + pool.retry_verification(BlockId::number(1)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); + assert_eq!(pending, vec![]); + } + #[test] fn id_then_index_submission_should_make_progress() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, true)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, false)]).unwrap(); + let api = TestPolkadotApi::without_lookup(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![ (Some(Alice.to_raw_public().into()), 209) ]); + + // when + api.enable_lookup(); + pool.retry_verification(BlockId::number(0)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); + assert_eq!(pending, vec![ + (Some(Alice.to_raw_public().into()), 209), + (Some(Alice.to_raw_public().into()), 210) + ]); } #[test] fn index_change_should_result_in_second_tx_culled_or_future() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, false)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + let block = BlockId::number(0); + pool.import_unchecked_extrinsic(block, uxt(Alice, 209, false)).unwrap(); + let hash = *pool.import_unchecked_extrinsic(block, uxt(Alice, 210, false)).unwrap().hash(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(block, |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![ (Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210) @@ -593,11 +696,14 @@ mod tests { // after this, a re-evaluation of the second's readiness should result in it being thrown // out (or maybe placed in future queue). -/* - // TODO: uncomment once the new queue design is in. - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(1)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let err = pool.reverify_transaction(BlockId::number(1), hash).unwrap_err(); + match *err.kind() { + ::error::ErrorKind::Msg(ref m) if m == "bad signature in extrinsic" => {}, + ref e => assert!(false, "The transaction should be rejected with BadSignature error, got: {:?}", e), + } + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(1), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); -*/ + } } diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 6604b3225dbee..cfca952570bbc 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -155,7 +155,7 @@ pub fn new_in_mem( Client::new(backend, executor, genesis_storage) } -impl Client where +impl Client where B: backend::Backend, E: CallExecutor, Block: BlockT, diff --git a/substrate/extrinsic-pool/src/api.rs b/substrate/extrinsic-pool/src/api.rs index ba76c45c046c5..31ec0f5346daa 100644 --- a/substrate/extrinsic-pool/src/api.rs +++ b/substrate/extrinsic-pool/src/api.rs @@ -16,9 +16,7 @@ //! External API for extrinsic pool. -use std::fmt; -use std::ops::Deref; -use txpool::{self, VerifiedTransaction}; +use txpool; /// Extrinsic pool error. pub trait Error: ::std::error::Error + Send + Sized { @@ -35,28 +33,10 @@ impl Error for txpool::Error { } /// Extrinsic pool. -pub trait ExtrinsicPool: Send + Sync + 'static { +pub trait ExtrinsicPool: Send + Sync + 'static { /// Error type type Error: Error; /// Submit a collection of extrinsics to the pool. - fn submit(&self, xt: Vec) -> Result, Self::Error>; -} - -// Blanket implementation for anything that `Derefs` to the pool. -impl ExtrinsicPool for T where - Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default, - T: Deref> + Send + Sync + 'static, - V: txpool::Verifier, - S: txpool::Scoring, - V::VerifiedTransaction: txpool::VerifiedTransaction, - E: From, - E: From, - E: Error, -{ - type Error = E; - - fn submit(&self, xt: Vec) -> Result, Self::Error> { - self.deref().submit(xt).map(|result| result.into_iter().map(|xt| *xt.hash()).collect()) - } + fn submit(&self, block: BlockId, xt: Vec) -> Result, Self::Error>; } diff --git a/substrate/extrinsic-pool/src/lib.rs b/substrate/extrinsic-pool/src/lib.rs index c7b44a57096e9..9dfa8c961de3d 100644 --- a/substrate/extrinsic-pool/src/lib.rs +++ b/substrate/extrinsic-pool/src/lib.rs @@ -32,5 +32,6 @@ mod listener; mod pool; mod watcher; +pub use self::listener::Listener; pub use self::pool::Pool; pub use self::watcher::Watcher; diff --git a/substrate/extrinsic-pool/src/listener.rs b/substrate/extrinsic-pool/src/listener.rs index 106c1f23e62f2..6bf110e55f977 100644 --- a/substrate/extrinsic-pool/src/listener.rs +++ b/substrate/extrinsic-pool/src/listener.rs @@ -23,17 +23,22 @@ use txpool; use watcher; +/// Extrinsic pool default listener. #[derive(Default)] pub struct Listener { watchers: HashMap> } impl Listener { + /// Creates a new watcher for given verified extrinsic. + /// + /// The watcher can be used to subscribe to lifecycle events of that extrinsic. pub fn create_watcher>(&mut self, xt: Arc) -> watcher::Watcher { let sender = self.watchers.entry(*xt.hash()).or_insert_with(watcher::Sender::default); sender.new_watcher() } + /// Notify the listeners about extrinsic broadcast. pub fn broadcasted(&mut self, hash: &H, peers: Vec) { self.fire(hash, |watcher| watcher.broadcast(peers)); } diff --git a/substrate/extrinsic-pool/src/pool.rs b/substrate/extrinsic-pool/src/pool.rs index c0e44b3edb7cc..ca8df7c6acad3 100644 --- a/substrate/extrinsic-pool/src/pool.rs +++ b/substrate/extrinsic-pool/src/pool.rs @@ -29,41 +29,37 @@ use listener::Listener; use watcher::Watcher; /// Extrinsics pool. -pub struct Pool where +pub struct Pool where Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex, - V: txpool::Verifier, - S: txpool::Scoring, + S: txpool::Scoring, + VEx: txpool::VerifiedTransaction, { _error: Mutex>, pool: RwLock, >>, - verifier: V, - import_notification_sinks: Mutex>>>, + import_notification_sinks: Mutex>>>, } -impl Pool where +impl Pool where Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default, - V: txpool::Verifier, - S: txpool::Scoring, - V::VerifiedTransaction: txpool::VerifiedTransaction, - E: From, + S: txpool::Scoring, + VEx: txpool::VerifiedTransaction, E: From, { /// Create a new transaction pool. - pub fn new(options: txpool::Options, verifier: V, scoring: S) -> Self { + pub fn new(options: txpool::Options, scoring: S) -> Self { Pool { _error: Default::default(), pool: RwLock::new(txpool::Pool::new(Listener::default(), scoring, options)), - verifier, import_notification_sinks: Default::default(), } } /// Imports a pre-verified extrinsic to the pool. - pub fn import(&self, xt: V::VerifiedTransaction) -> Result, E> { + pub fn import(&self, xt: VEx) -> Result, E> { let result = self.pool.write().import(xt)?; let weak = Arc::downgrade(&result); @@ -74,7 +70,7 @@ impl Pool where } /// Return an event stream of transactions imported to the pool. - pub fn import_notification_stream(&self) -> mpsc::UnboundedReceiver> { + pub fn import_notification_stream(&self) -> mpsc::UnboundedReceiver> { let (sink, stream) = mpsc::unbounded(); self.import_notification_sinks.lock().push(sink); stream @@ -87,11 +83,15 @@ impl Pool where } } - /// Imports a bunch of extrinsics to the pool - pub fn submit(&self, xts: Vec) -> Result>, E> { + /// Imports a bunch of unverified extrinsics to the pool + pub fn submit(&self, verifier: V, xts: T) -> Result>, E> where + V: txpool::Verifier, + E: From, + T: IntoIterator + { xts .into_iter() - .map(|xt| self.verifier.verify_transaction(xt)) + .map(|xt| verifier.verify_transaction(xt)) .map(|xt| { Ok(self.pool.write().import(xt?)?) }) @@ -99,13 +99,16 @@ impl Pool where } /// Import a single extrinsic and starts to watch their progress in the pool. - pub fn submit_and_watch(&self, xt: Ex) -> Result, E> { - let xt = self.submit(vec![xt])?.pop().expect("One extrinsic passed; one result returned; qed"); + pub fn submit_and_watch(&self, verifier: V, xt: Ex) -> Result, E> where + V: txpool::Verifier, + E: From, + { + let xt = self.submit(verifier, vec![xt])?.pop().expect("One extrinsic passed; one result returned; qed"); Ok(self.pool.write().listener_mut().create_watcher(xt)) } /// Remove from the pool. - pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec>> { + pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec>> { let mut pool = self.pool.write(); let mut results = Vec::with_capacity(hashes.len()); for hash in hashes { @@ -115,24 +118,14 @@ impl Pool where } /// Cull transactions from the queue. - pub fn cull(&self, senders: Option<&[::Sender]>, ready: R) -> usize where - R: txpool::Ready, + pub fn cull(&self, senders: Option<&[::Sender]>, ready: R) -> usize where + R: txpool::Ready, { self.pool.write().cull(senders, ready) } - /// Cull transactions from the queue and then compute the pending set. - pub fn cull_and_get_pending(&self, ready: R, f: F) -> T where - R: txpool::Ready + Clone, - F: FnOnce(txpool::PendingIterator>) -> T, - { - let mut pool = self.pool.write(); - pool.cull(None, ready.clone()); - f(pool.pending(ready)) - } - /// Get the full status of the queue (including readiness) - pub fn status>(&self, ready: R) -> txpool::Status { + pub fn status>(&self, ready: R) -> txpool::Status { self.pool.read().status(ready) } @@ -140,4 +133,21 @@ impl Pool where pub fn light_status(&self) -> txpool::LightStatus { self.pool.read().light_status() } + + /// Removes all transactions from given sender + pub fn remove_sender(&self, sender: VEx::Sender) -> Vec> { + let mut pool = self.pool.write(); + let pending = pool.pending_from_sender(|_: &VEx| txpool::Readiness::Ready, &sender).collect(); + // remove all transactions from this sender + pool.cull(Some(&[sender]), |_: &VEx| txpool::Readiness::Stale); + pending + } + + /// Retrieve the pending set. Be careful to not leak the pool `ReadGuard` to prevent deadlocks. + pub fn pending(&self, ready: R, f: F) -> T where + R: txpool::Ready, + F: FnOnce(txpool::PendingIterator>) -> T, + { + f(self.pool.read().pending(ready)) + } } diff --git a/substrate/rpc/src/author/mod.rs b/substrate/rpc/src/author/mod.rs index 4ed850db55837..fd29fa8adc821 100644 --- a/substrate/rpc/src/author/mod.rs +++ b/substrate/rpc/src/author/mod.rs @@ -17,8 +17,13 @@ //! Substrate block-author/full-node API. use std::sync::Arc; + +use client::{self, Client}; use extrinsic_pool::api::{Error, ExtrinsicPool}; +use runtime_primitives::{generic, traits::Block as BlockT}; +use state_machine; + pub mod error; #[cfg(test)] @@ -35,13 +40,34 @@ build_rpc_trait! { } } -impl AuthorApi for Arc where - T: ExtrinsicPool, - T::Error: 'static, +/// Authoring API +pub struct Author { + /// Substrate client + client: Arc>, + /// Extrinsic pool + pool: Arc

, +} + +impl Author { + /// Create new instance of Authoring API. + pub fn new(client: Arc>, pool: Arc

) -> Self { + Author { client, pool } + } +} + + +impl AuthorApi for Author where + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static, + Block: BlockT + 'static, + client::error::Error: From<<>::State as state_machine::backend::Backend>::Error>, + P: ExtrinsicPool, Hash>, + P::Error: 'static, { fn submit_extrinsic(&self, xt: Ex) -> Result { - self - .submit(vec![xt]) + let best_block_hash = self.client.info().unwrap().chain.best_hash; + self.pool + .submit(generic::BlockId::hash(best_block_hash), vec![xt]) .map(|mut res| res.pop().expect("One extrinsic passed; one result back; qed")) .map_err(|e| e.into_pool_error() .map(Into::into) diff --git a/substrate/rpc/src/author/tests.rs b/substrate/rpc/src/author/tests.rs index 8e9963613d66e..08757957226b2 100644 --- a/substrate/rpc/src/author/tests.rs +++ b/substrate/rpc/src/author/tests.rs @@ -18,6 +18,7 @@ use super::*; use std::{fmt, sync::Arc}; use extrinsic_pool::api; +use test_client; use parking_lot::Mutex; type Extrinsic = u64; @@ -40,11 +41,11 @@ impl fmt::Display for Error { } } -impl api::ExtrinsicPool for DummyTxPool { +impl api::ExtrinsicPool for DummyTxPool { type Error = Error; /// Submit extrinsic for inclusion in block. - fn submit(&self, xt: Vec) -> ::std::result::Result, Self::Error> { + fn submit(&self, _block: BlockHash, xt: Vec) -> ::std::result::Result, Self::Error> { let mut submitted = self.submitted.lock(); if submitted.len() < 1 { let hashes = xt.iter().map(|_xt| 1).collect(); @@ -58,7 +59,10 @@ impl api::ExtrinsicPool for DummyTxPool { #[test] fn submit_transaction_should_not_cause_error() { - let p = Arc::new(DummyTxPool::default()); + let p = Author { + client: Arc::new(test_client::new()), + pool: Arc::new(DummyTxPool::default()), + }; assert_matches!( AuthorApi::submit_extrinsic(&p, 5), diff --git a/substrate/runtime/primitives/src/generic.rs b/substrate/runtime/primitives/src/generic.rs index ba11ab310a360..982bf52b54572 100644 --- a/substrate/runtime/primitives/src/generic.rs +++ b/substrate/runtime/primitives/src/generic.rs @@ -115,7 +115,7 @@ where } fn check(self, lookup: ThisLookup) -> Result where - ThisLookup: FnOnce(Address) -> Result + Send + Sync, + ThisLookup: FnOnce(Address) -> Result, { if !self.is_signed() { Ok(CheckedExtrinsic(Extrinsic { diff --git a/substrate/runtime/primitives/src/testing.rs b/substrate/runtime/primitives/src/testing.rs index a3d58d12e73cb..7c712b40105e6 100644 --- a/substrate/runtime/primitives/src/testing.rs +++ b/substrate/runtime/primitives/src/testing.rs @@ -162,7 +162,7 @@ impl &u64 { &(self.0).0 } - fn check Result + Send + Sync>(self, _lookup: ThisLookup) -> Result { Ok(self) } + fn check Result>(self, _lookup: ThisLookup) -> Result { Ok(self) } } impl + Slicable + Sized + Send + Sync + Serialize + DeserializeOwned + Clone + Eq + Debug> Applyable for TestXt { type AccountId = u64; diff --git a/substrate/runtime/primitives/src/traits.rs b/substrate/runtime/primitives/src/traits.rs index c204b5ebda049..996235e06bd36 100644 --- a/substrate/runtime/primitives/src/traits.rs +++ b/substrate/runtime/primitives/src/traits.rs @@ -376,13 +376,13 @@ pub trait Checkable: Sized + Send + Sync { type AccountId: Member + MaybeDisplay; type Checked: Member; fn sender(&self) -> &Self::Address; - fn check Result + Send + Sync>(self, lookup: ThisLookup) -> Result; + fn check Result>(self, lookup: ThisLookup) -> Result; } /// A "checkable" piece of information, used by the standard Substrate Executive in order to /// check the validity of a piece of extrinsic information, usually by verifying the signature. /// -/// This does that checking without requiring a lookup argument. +/// This does that checking without requiring a lookup argument. pub trait BlindCheckable: Sized + Send + Sync { type Address: Member + MaybeDisplay; type Checked: Member; @@ -395,7 +395,7 @@ impl Checkable for T { type AccountId = ::Address; type Checked = ::Checked; fn sender(&self) -> &Self::Address { BlindCheckable::sender(self) } - fn check Result + Send + Sync>(self, _: ThisLookup) -> Result { BlindCheckable::check(self) } + fn check Result>(self, _: ThisLookup) -> Result { BlindCheckable::check(self) } } /// An "executable" piece of information, used by the standard Substrate Executive in order to