Skip to content

Commit

Permalink
Add trivial improvements to transaction pool (paritytech#8572)
Browse files Browse the repository at this point in the history
* Add trival improvements to transaction pool

* .

* Add trival improvements to transaction pool

* Update client/transaction-pool/graph/src/future.rs

* Update client/transaction-pool/graph/src/base_pool.rs

* Fix transaction_debug test

Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
2 people authored and KalitaAlexey committed Jul 9, 2021
1 parent 3304e6b commit 21caaf8
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 102 deletions.
41 changes: 13 additions & 28 deletions client/transaction-pool/graph/src/base_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
/// every reason to be commented. That's why we `Transaction` is not `Clone`,
/// but there's explicit `duplicate` method.
pub fn duplicate(&self) -> Self {
Transaction {
Self {
data: self.data.clone(),
bytes: self.bytes.clone(),
bytes: self.bytes,
hash: self.hash.clone(),
priority: self.priority.clone(),
priority: self.priority,
source: self.source,
valid_till: self.valid_till.clone(),
valid_till: self.valid_till,
requires: self.requires.clone(),
provides: self.provides.clone(),
propagate: self.propagate,
Expand All @@ -174,16 +174,9 @@ impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
Extrinsic: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fn print_tags(fmt: &mut fmt::Formatter, tags: &[Tag]) -> fmt::Result {
let mut it = tags.iter();
if let Some(t) = it.next() {
write!(fmt, "{}", HexDisplay::from(t))?;
}
for t in it {
write!(fmt, ",{}", HexDisplay::from(t))?;
}
Ok(())
}
let join_tags = |tags: &[Tag]| {
tags.iter().map(|tag| HexDisplay::from(tag).to_string()).collect::<Vec<_>>().join(", ")
};

write!(fmt, "Transaction {{ ")?;
write!(fmt, "hash: {:?}, ", &self.hash)?;
Expand All @@ -192,11 +185,8 @@ impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
write!(fmt, "bytes: {:?}, ", &self.bytes)?;
write!(fmt, "propagate: {:?}, ", &self.propagate)?;
write!(fmt, "source: {:?}, ", &self.source)?;
write!(fmt, "requires: [")?;
print_tags(fmt, &self.requires)?;
write!(fmt, "], provides: [")?;
print_tags(fmt, &self.provides)?;
write!(fmt, "], ")?;
write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
write!(fmt, "data: {:?}", &self.data)?;
write!(fmt, "}}")?;
Ok(())
Expand Down Expand Up @@ -239,7 +229,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for Bas
impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
/// Create new pool given reject_future_transactions flag.
pub fn new(reject_future_transactions: bool) -> Self {
BasePool {
Self {
reject_future_transactions,
future: Default::default(),
ready: Default::default(),
Expand Down Expand Up @@ -320,13 +310,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
let mut first = true;
let mut to_import = vec![tx];

loop {
// take first transaction from the list
let tx = match to_import.pop() {
Some(tx) => tx,
None => break,
};

// take first transaction from the list
while let Some(tx) = to_import.pop() {
// find transactions in Future that it unlocks
to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));

Expand Down Expand Up @@ -1087,7 +1072,7 @@ mod tests {
}),
"Transaction { \
hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
source: TransactionSource::External, requires: [03,02], provides: [04], data: [4]}".to_owned()
source: TransactionSource::External, requires: [03, 02], provides: [04], data: [4]}".to_owned()
);
}

Expand Down
24 changes: 11 additions & 13 deletions client/transaction-pool/graph/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,22 @@ impl<Hash: fmt::Debug, Ex: fmt::Debug> fmt::Debug for WaitingTransaction<Hash, E
write!(fmt, "WaitingTransaction {{ ")?;
write!(fmt, "imported_at: {:?}, ", self.imported_at)?;
write!(fmt, "transaction: {:?}, ", self.transaction)?;
write!(fmt, "missing_tags: {{")?;
let mut it = self.missing_tags.iter().map(|tag| HexDisplay::from(tag));
if let Some(tag) = it.next() {
write!(fmt, "{}", tag)?;
}
for tag in it {
write!(fmt, ", {}", tag)?;
}
write!(fmt, " }}}}")
write!(
fmt,
"missing_tags: {{{}}}",
self.missing_tags.iter()
.map(|tag| HexDisplay::from(tag).to_string()).collect::<Vec<_>>().join(", "),
)?;
write!(fmt, "}}")
}
}

impl<Hash, Ex> Clone for WaitingTransaction<Hash, Ex> {
fn clone(&self) -> Self {
WaitingTransaction {
Self {
transaction: self.transaction.clone(),
missing_tags: self.missing_tags.clone(),
imported_at: self.imported_at.clone(),
imported_at: self.imported_at,
}
}
}
Expand All @@ -90,7 +88,7 @@ impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
.cloned()
.collect();

WaitingTransaction {
Self {
transaction: Arc::new(transaction),
missing_tags,
imported_at: Instant::now(),
Expand Down Expand Up @@ -123,7 +121,7 @@ pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {

impl<Hash: hash::Hash + Eq, Ex> Default for FutureTransactions<Hash, Ex> {
fn default() -> Self {
FutureTransactions {
Self {
wanted_tags: Default::default(),
waiting: Default::default(),
}
Expand Down
8 changes: 5 additions & 3 deletions client/transaction-pool/graph/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
use std::{
collections::HashMap, hash, fmt::Debug,
};

use linked_hash_map::LinkedHashMap;
use serde::Serialize;
use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
use log::{debug, trace, warn};
use sp_runtime::traits;

use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};

/// Extrinsic pool default listener.
pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
watchers: HashMap<H, watcher::Sender<H, ExtrinsicHash<C>>>,
Expand All @@ -37,7 +39,7 @@ const MAX_FINALITY_WATCHERS: usize = 512;

impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
fn default() -> Self {
Listener {
Self {
watchers: Default::default(),
finality_watchers: Default::default(),
}
Expand Down Expand Up @@ -115,7 +117,7 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
if let Some((hash, txs)) = self.finality_watchers.pop_front() {
for tx in txs {
self.fire(&tx, |s| s.finality_timeout(hash.clone()));
self.fire(&tx, |s| s.finality_timeout(hash));
}
}
}
Expand Down
27 changes: 13 additions & 14 deletions client/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use std::{
sync::Arc,
};

use crate::{base_pool as base, watcher::Watcher};

use futures::Future;
use sp_runtime::{
generic::BlockId,
Expand All @@ -35,6 +33,7 @@ use sp_transaction_pool::error;
use wasm_timer::Instant;
use futures::channel::mpsc::Receiver;

use crate::{base_pool as base, watcher::Watcher};
use crate::validated_pool::ValidatedPool;
pub use crate::validated_pool::{IsValidator, ValidatedTransaction};

Expand Down Expand Up @@ -111,7 +110,7 @@ pub struct Options {

impl Default for Options {
fn default() -> Self {
Options {
Self {
ready: base::Limit {
count: 8192,
total_bytes: 20 * 1024 * 1024,
Expand Down Expand Up @@ -151,7 +150,7 @@ where
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
Pool {
Self {
validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)),
}
}
Expand Down Expand Up @@ -193,7 +192,7 @@ impl<B: ChainApi> Pool<B> {
res.expect("One extrinsic passed; one result returned; qed")
}

/// Import a single extrinsic and starts to watch their progress in the pool.
/// Import a single extrinsic and starts to watch its progress in the pool.
pub async fn submit_and_watch(
&self,
at: &BlockId<B::Block>,
Expand Down Expand Up @@ -242,8 +241,8 @@ impl<B: ChainApi> Pool<B> {

// Prune all transactions that provide given tags
let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
let pruned_transactions = hashes.into_iter().cloned()
.chain(prune_status.pruned.iter().map(|tx| tx.hash.clone()));
let pruned_transactions = hashes.iter().cloned()
.chain(prune_status.pruned.iter().map(|tx| tx.hash));
self.validated_pool.fire_pruned(at, pruned_transactions)
}

Expand Down Expand Up @@ -337,7 +336,7 @@ impl<B: ChainApi> Pool<B> {
// note that `known_imported_hashes` will be rejected here due to temporary ban.
let pruned_hashes = prune_status.pruned
.iter()
.map(|tx| tx.hash.clone()).collect::<Vec<_>>();
.map(|tx| tx.hash).collect::<Vec<_>>();
let pruned_transactions = prune_status.pruned
.into_iter()
.map(|tx| (tx.source, tx.data.clone()));
Expand Down Expand Up @@ -402,7 +401,7 @@ impl<B: ChainApi> Pool<B> {

let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into()))
return (hash, ValidatedTransaction::Invalid(hash, err))
}

let validation_result = self.validated_pool.api().validate_transaction(
Expand All @@ -413,17 +412,17 @@ impl<B: ChainApi> Pool<B> {

let status = match validation_result {
Ok(status) => status,
Err(e) => return (hash.clone(), ValidatedTransaction::Invalid(hash, e)),
Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
};

let validity = match status {
Ok(validity) => {
if validity.provides.is_empty() {
ValidatedTransaction::Invalid(hash.clone(), error::Error::NoTagsProvided.into())
ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
} else {
ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash.clone(),
hash,
source,
xt,
bytes,
Expand All @@ -432,9 +431,9 @@ impl<B: ChainApi> Pool<B> {
}
},
Err(TransactionValidityError::Invalid(e)) =>
ValidatedTransaction::Invalid(hash.clone(), error::Error::InvalidTransaction(e).into()),
ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
Err(TransactionValidityError::Unknown(e)) =>
ValidatedTransaction::Unknown(hash.clone(), error::Error::UnknownTransaction(e).into()),
ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
};

(hash, validity)
Expand Down
8 changes: 4 additions & 4 deletions client/transaction-pool/graph/src/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct TransactionRef<Hash, Ex> {

impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
fn clone(&self) -> Self {
TransactionRef {
Self {
transaction: self.transaction.clone(),
insertion_id: self.insertion_id,
}
Expand Down Expand Up @@ -93,7 +93,7 @@ pub struct ReadyTx<Hash, Ex> {

impl<Hash: Clone, Ex> Clone for ReadyTx<Hash, Ex> {
fn clone(&self) -> Self {
ReadyTx {
Self {
transaction: self.transaction.clone(),
unlocks: self.unlocks.clone(),
requires_offset: self.requires_offset,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<Hash, Ex> tracked_map::Size for ReadyTx<Hash, Ex> {

impl<Hash: hash::Hash + Eq, Ex> Default for ReadyTransactions<Hash, Ex> {
fn default() -> Self {
ReadyTransactions {
Self {
insertion_id: Default::default(),
provided_tags: Default::default(),
ready: Default::default(),
Expand Down Expand Up @@ -259,7 +259,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
/// (i.e. the entire subgraph that this transaction is a start of will be removed).
/// All removed transactions are returned.
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let to_remove = hashes.iter().cloned().collect::<Vec<_>>();
let to_remove = hashes.to_vec();
self.remove_subtree_with_tag_filter(to_remove, None)
}

Expand Down
3 changes: 1 addition & 2 deletions client/transaction-pool/graph/src/rotator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct PoolRotator<Hash> {

impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
fn default() -> Self {
PoolRotator {
Self {
ban_time: Duration::from_secs(60 * 30),
banned_until: Default::default(),
}
Expand Down Expand Up @@ -78,7 +78,6 @@ impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
}
}


/// Bans extrinsic if it's stale.
///
/// Returns `true` if extrinsic is stale and got banned.
Expand Down
14 changes: 7 additions & 7 deletions client/transaction-pool/graph/src/tracked_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
};
use parking_lot::{RwLock, RwLockWriteGuard, RwLockReadGuard};

/// Something that can report it's size.
/// Something that can report its size.
pub trait Size {
fn size(&self) -> usize;
}
Expand Down Expand Up @@ -64,14 +64,14 @@ impl<K, V> TrackedMap<K, V> {
}

/// Lock map for read.
pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> {
pub fn read(&self) -> TrackedMapReadAccess<K, V> {
TrackedMapReadAccess {
inner_guard: self.index.read(),
}
}

/// Lock map for write.
pub fn write<'a>(&'a self) -> TrackedMapWriteAccess<'a, K, V> {
pub fn write(&self) -> TrackedMapWriteAccess<K, V> {
TrackedMapWriteAccess {
inner_guard: self.index.write(),
bytes: &self.bytes,
Expand All @@ -90,7 +90,7 @@ where
K: Eq + std::hash::Hash
{
/// Lock map for read.
pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> {
pub fn read(&self) -> TrackedMapReadAccess<K, V> {
TrackedMapReadAccess {
inner_guard: self.0.read(),
}
Expand Down Expand Up @@ -136,10 +136,10 @@ where
let new_bytes = val.size();
self.bytes.fetch_add(new_bytes as isize, AtomicOrdering::Relaxed);
self.length.fetch_add(1, AtomicOrdering::Relaxed);
self.inner_guard.insert(key, val).and_then(|old_val| {
self.inner_guard.insert(key, val).map(|old_val| {
self.bytes.fetch_sub(old_val.size() as isize, AtomicOrdering::Relaxed);
self.length.fetch_sub(1, AtomicOrdering::Relaxed);
Some(old_val)
old_val
})
}

Expand Down Expand Up @@ -186,4 +186,4 @@ mod tests {
assert_eq!(map.bytes(), 1);
assert_eq!(map.len(), 1);
}
}
}
Loading

0 comments on commit 21caaf8

Please sign in to comment.