From 08e50f2c28f64199c9210d8f59493909394df0f0 Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 14 Mar 2024 11:28:20 +0800 Subject: [PATCH 1/4] fix the performance issue caused by track_entry_statics --- tx-pool/src/component/pool_map.rs | 64 +++++++++++++++++++------ tx-pool/src/component/tests/proposed.rs | 64 +++++++++++++++++++++++++ tx-pool/src/pool.rs | 8 ++++ tx-pool/src/process.rs | 26 +++++++++- tx-pool/src/service.rs | 12 +++-- 5 files changed, 155 insertions(+), 19 deletions(-) diff --git a/tx-pool/src/component/pool_map.rs b/tx-pool/src/component/pool_map.rs index 7be171cfd3..b095256630 100644 --- a/tx-pool/src/component/pool_map.rs +++ b/tx-pool/src/component/pool_map.rs @@ -19,6 +19,7 @@ use ckb_types::{ }; use multi_index_map::MultiIndexMap; use std::collections::HashSet; +use std::time::Instant; type ConflictEntry = (TxEntry, Reject); @@ -71,6 +72,9 @@ pub struct PoolMap { pub(crate) total_tx_size: usize, // sum of all tx_pool tx's cycles. pub(crate) total_tx_cycles: Cycle, + pub(crate) pending_count: usize, + pub(crate) gap_count: usize, + pub(crate) proposed_count: usize, } impl PoolMap { @@ -82,6 +86,9 @@ impl PoolMap { max_ancestors_count, total_tx_size: 0, total_tx_cycles: 0, + pending_count: 0, + gap_count: 0, + proposed_count: 0, } } @@ -124,11 +131,16 @@ impl PoolMap { } pub(crate) fn get_max_update_time(&self) -> u64 { - self.entries + let instant = Instant::now(); + let res = self + .entries .iter() .map(|(_, entry)| entry.inner.timestamp) .max() - .unwrap_or(0) + .unwrap_or(0); + let duration = instant.elapsed(); + debug!("[Perf] get_max_update_time duration: {:?}", duration); + res } pub(crate) fn get_by_id(&self, id: &ProposalShortId) -> Option<&PoolEntry> { @@ -144,12 +156,11 @@ impl PoolMap { } pub(crate) fn pending_size(&self) -> usize { - self.entries.get_by_status(&Status::Pending).len() - + self.entries.get_by_status(&Status::Gap).len() + self.pending_count + self.gap_count } pub(crate) fn proposed_size(&self) -> usize { - self.entries.get_by_status(&Status::Proposed).len() + self.proposed_count } pub(crate) fn sorted_proposed_iter(&self) -> impl Iterator { @@ -213,19 +224,21 @@ impl PoolMap { self.record_entry_edges(&entry)?; self.insert_entry(&entry, status); self.record_entry_descendants(&entry); - self.track_entry_statics(); + self.track_entry_statics(None, Some(status)); self.update_stat_for_add_tx(entry.size, entry.cycles); Ok((true, evicts)) } /// Change the status of the entry, only used for `gap_rtx` and `proposed_rtx` pub(crate) fn set_entry(&mut self, short_id: &ProposalShortId, status: Status) { + let mut old_status = None; self.entries .modify_by_id(short_id, |e| { + old_status = Some(e.status); e.status = status; }) .expect("unconsistent pool"); - self.track_entry_statics(); + self.track_entry_statics(old_status, Some(status)); } pub(crate) fn remove_entry(&mut self, id: &ProposalShortId) -> Option { @@ -239,6 +252,7 @@ impl PoolMap { self.update_descendants_index_key(&entry.inner, EntryOp::Remove); self.remove_entry_edges(&entry.inner); self.remove_entry_links(id); + self.track_entry_statics(Some(entry.status), None); self.update_stat_for_remove_tx(entry.inner.size, entry.inner.cycles); entry.inner }) @@ -625,20 +639,42 @@ impl PoolMap { }); } - fn track_entry_statics(&self) { + pub fn track_entry_statics(&mut self, remove: Option, add: Option) { + match remove { + Some(Status::Pending) => self.pending_count -= 1, + Some(Status::Gap) => self.gap_count -= 1, + Some(Status::Proposed) => self.proposed_count -= 1, + _ => {} + } + match add { + Some(Status::Pending) => self.pending_count += 1, + Some(Status::Gap) => self.gap_count += 1, + Some(Status::Proposed) => self.proposed_count += 1, + _ => {} + } + assert_eq!( + self.pending_count + self.gap_count + self.proposed_count, + self.entries.len() + ); + // let duration = instant.elapsed(); + // eprintln!( + // "pending: {}, gap: {}, proposed: {} => duration: {:?} total_entries_size: {}", + // self.pending_count, + // self.gap_count, + // self.proposed_count, + // duration, + // self.entries.len() + // ); if let Some(metrics) = ckb_metrics::handle() { metrics .ckb_tx_pool_entry .pending - .set(self.entries.get_by_status(&Status::Pending).len() as i64); - metrics - .ckb_tx_pool_entry - .gap - .set(self.entries.get_by_status(&Status::Gap).len() as i64); + .set(self.pending_count as i64); + metrics.ckb_tx_pool_entry.gap.set(self.gap_count as i64); metrics .ckb_tx_pool_entry .proposed - .set(self.proposed_size() as i64); + .set(self.proposed_count as i64); } } diff --git a/tx-pool/src/component/tests/proposed.rs b/tx-pool/src/component/tests/proposed.rs index 3536bdfcdc..13fffab30e 100644 --- a/tx-pool/src/component/tests/proposed.rs +++ b/tx-pool/src/component/tests/proposed.rs @@ -3,6 +3,12 @@ use crate::component::tests::util::{ build_tx, build_tx_with_dep, build_tx_with_header_dep, DEFAULT_MAX_ANCESTORS_COUNT, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE, }; +use ckb_types::core::capacity_bytes; +use ckb_types::core::ScriptHashType; +use ckb_types::packed::CellOutputBuilder; +use ckb_types::packed::ScriptBuilder; +use ckb_types::H256; +use std::time::Instant; use crate::component::{entry::TxEntry, pool_map::PoolMap}; use ckb_types::{ @@ -732,3 +738,61 @@ fn test_container_bench_add_limits() { pool.clear(); assert_eq!(pool.size(), 0); } + +#[test] +fn test_pool_map_bench() { + use rand::Rng; + let mut rng = rand::thread_rng(); + + let mut pool = PoolMap::new(150); + + let mut instant = Instant::now(); + for i in 0..500000 { + let lock_script1 = ScriptBuilder::default() + .code_hash(H256(rand::random()).pack()) + .hash_type(ScriptHashType::Data.into()) + .args(Bytes::from(b"lock_script1".to_vec()).pack()) + .build(); + + let type_script1 = ScriptBuilder::default() + .code_hash(H256(rand::random()).pack()) + .hash_type(ScriptHashType::Data.into()) + .args(Bytes::from(b"type_script1".to_vec()).pack()) + .build(); + + let tx = TransactionBuilder::default() + .output( + CellOutputBuilder::default() + .capacity(capacity_bytes!(1000).pack()) + .lock(lock_script1) + .type_(Some(type_script1).pack()) + .build(), + ) + .output_data(Default::default()) + .build(); + + let entry = TxEntry::dummy_resolve( + tx, + rng.gen_range(0..1000), + Capacity::shannons(200), + rng.gen_range(0..1000), + ); + let short_id = entry.proposal_short_id(); + if i % 1000 == 0 { + eprintln!("i: {}, time: {:?}", i, instant.elapsed()); + instant = Instant::now(); + } + let status = if rng.gen_range(0..100) >= 30 { + Status::Pending + } else { + Status::Gap + }; + pool.add_entry(entry, status); + } + // for _i in 0..100 { + // let instant = Instant::now(); + // let res = pool.track_entry_statics(None, None); + // let duration = instant.elapsed(); + // eprintln!("duration: {:?}", duration); + // } +} diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 23c0caf71a..a814e29799 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -25,6 +25,7 @@ use ckb_types::{ use lru::LruCache; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Instant; const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; const CONFLICTES_CACHE_SIZE: usize = 10_000; @@ -254,6 +255,8 @@ impl TxPool { // Expire all transaction (and their dependencies) in the pool. pub(crate) fn remove_expired(&mut self, callbacks: &Callbacks) { let now_ms = ckb_systemtime::unix_time_as_millis(); + let instant = Instant::now(); + let removed: Vec<_> = self .pool_map .iter() @@ -268,10 +271,13 @@ impl TxPool { let reject = Reject::Expiry(entry.timestamp); callbacks.call_reject(self, &entry, reject); } + let duration = instant.elapsed(); + debug!("[Perf] remove_expired duration: {:?}", duration); } // Remove transactions from the pool until total size <= size_limit. pub(crate) fn limit_size(&mut self, callbacks: &Callbacks) { + let instant = Instant::now(); while self.pool_map.total_tx_size > self.config.max_tx_pool_size { let next_evict_entry = || { self.pool_map @@ -297,6 +303,8 @@ impl TxPool { } } self.pool_map.entries.shrink_to_fit(); + let duration = instant.elapsed(); + debug!("[Perf] limit_size duration: {:?}", duration); } // remove transaction with detached proposal from gap and proposed diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index a669b4881b..5831331af5 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -33,7 +33,7 @@ use ckb_verification::{ use std::collections::HashSet; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::task::block_in_place; const DELAY_LIMIT: usize = 1_500 * 21; // 1_500 per block, 21 blocks @@ -103,6 +103,7 @@ impl TxPoolService { entry: TxEntry, mut status: TxStatus, ) -> (Result<(), Reject>, Arc) { + let instant = Instant::now(); let (ret, snapshot) = self .with_tx_pool_write_lock(move |tx_pool, snapshot| { // check_rbf must be invoked in `write` lock to avoid concurrent issues. @@ -170,6 +171,8 @@ impl TxPoolService { }) .await; + let duration = instant.elapsed(); + debug!("[Perf] submit_entry: {:?}", duration); (ret, snapshot) } @@ -238,6 +241,7 @@ impl TxPoolService { tx: &TransactionView, ) -> (Result, Arc) { // Acquire read lock for cheap check + let instant = Instant::now(); let tx_size = tx.data().serialized_size_in_block(); let (ret, snapshot) = self @@ -279,6 +283,8 @@ impl TxPoolService { } }) .await; + let duration = instant.elapsed(); + debug!("[Perf] pre-check: {:?}", duration); (ret, snapshot) } @@ -1125,6 +1131,7 @@ fn _submit_entry( ) -> Result, Reject> { let tx_hash = entry.transaction().hash(); debug!("submit_entry {:?} {}", status, tx_hash); + let start = Instant::now(); let (succ, evicts) = match status { TxStatus::Fresh => tx_pool.add_pending(entry.clone())?, TxStatus::Gap => tx_pool.add_gap(entry.clone())?, @@ -1137,6 +1144,8 @@ fn _submit_entry( TxStatus::Proposed => callbacks.call_proposed(&entry), } } + let duration = start.elapsed(); + debug!("[Perf] Time elapsed in _submit_entry is: {:?}", duration); Ok(evicts) } @@ -1149,6 +1158,7 @@ fn _update_tx_pool_for_reorg( callbacks: &Callbacks, mine_mode: bool, ) { + let instant = Instant::now(); tx_pool.snapshot = Arc::clone(&snapshot); // NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into @@ -1166,6 +1176,7 @@ fn _update_tx_pool_for_reorg( let mut proposals = Vec::new(); let mut gaps = Vec::new(); + let instant = Instant::now(); for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) { let short_id = entry.inner.proposal_short_id(); if snapshot.proposals().contains_proposed(&short_id) { @@ -1182,7 +1193,16 @@ fn _update_tx_pool_for_reorg( gaps.push(elem); } } + let duration = instant.elapsed(); + debug!("[Perf] reorg duration: {:?}", duration); + debug!( + "[Perf] reorg size: tx_pool size {:?} snapshot gap: {:?}, proposed: {:?}", + tx_pool.pool_map.entries.len(), + snapshot.proposals().gap().len(), + snapshot.proposals().set().len(), + ); + let instant = Instant::now(); for (id, entry) in proposals { debug!("begin to proposed: {:x}", id); if let Err(e) = tx_pool.proposed_rtx(&id) { @@ -1208,6 +1228,8 @@ fn _update_tx_pool_for_reorg( callbacks.call_reject(tx_pool, &entry, e.clone()); } } + let duration = instant.elapsed(); + debug!("[Perf] reorg setting: {:?}", duration); } // Remove expired transaction from pending @@ -1215,6 +1237,8 @@ fn _update_tx_pool_for_reorg( // Remove transactions from the pool until its size <= size_limit. tx_pool.limit_size(callbacks); + let duration = instant.elapsed(); + debug!("[Perf] reorg _update_tx_pool_for_reorg: {:?}", duration); } pub fn all_inputs_is_unknown(snapshot: &Snapshot, tx: &TransactionView) -> bool { diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 8ad6300a5b..e3381f7403 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -14,7 +14,7 @@ use ckb_chain_spec::consensus::Consensus; use ckb_channel::oneshot; use ckb_error::AnyError; use ckb_jsonrpc_types::BlockTemplate; -use ckb_logger::{error, info}; +use ckb_logger::{debug, error, info}; use ckb_network::{NetworkController, PeerIndex}; use ckb_snapshot::Snapshot; use ckb_stop_handler::new_tokio_exit_rx; @@ -33,7 +33,7 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::watch; use tokio::sync::{mpsc, RwLock}; use tokio::task::block_in_place; @@ -920,7 +920,8 @@ impl TxPoolService { let tx_pool = self.tx_pool.read().await; let orphan = self.orphan.read().await; let tip_header = tx_pool.snapshot.tip_header(); - TxPoolInfo { + let instant = Instant::now(); + let res = TxPoolInfo { tip_hash: tip_header.hash(), tip_number: tip_header.number(), pending_size: tx_pool.pool_map.pending_size(), @@ -933,7 +934,10 @@ impl TxPoolService { last_txs_updated_at: tx_pool.pool_map.get_max_update_time(), tx_size_limit: TRANSACTION_SIZE_LIMIT, max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64, - } + }; + let duration = instant.elapsed(); + debug!("[Perf] tx_pool info: {:?}", duration); + res } pub fn should_notify_block_assembler(&self) -> bool { From 8463d7295ccc225c0d675b31942197848a763f73 Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 14 Mar 2024 18:20:51 +0800 Subject: [PATCH 2/4] add bench test for pool map --- tx-pool/src/component/pool_map.rs | 13 ------------- tx-pool/src/component/tests/proposed.rs | 24 ++++++++++++++---------- tx-pool/src/pool.rs | 9 --------- 3 files changed, 14 insertions(+), 32 deletions(-) diff --git a/tx-pool/src/component/pool_map.rs b/tx-pool/src/component/pool_map.rs index b095256630..79e1046b13 100644 --- a/tx-pool/src/component/pool_map.rs +++ b/tx-pool/src/component/pool_map.rs @@ -151,10 +151,6 @@ impl PoolMap { self.get_by_id(id).expect("unconsistent pool") } - pub(crate) fn get_by_status(&self, status: Status) -> Vec<&PoolEntry> { - self.entries.get_by_status(&status) - } - pub(crate) fn pending_size(&self) -> usize { self.pending_count + self.gap_count } @@ -656,15 +652,6 @@ impl PoolMap { self.pending_count + self.gap_count + self.proposed_count, self.entries.len() ); - // let duration = instant.elapsed(); - // eprintln!( - // "pending: {}, gap: {}, proposed: {} => duration: {:?} total_entries_size: {}", - // self.pending_count, - // self.gap_count, - // self.proposed_count, - // duration, - // self.entries.len() - // ); if let Some(metrics) = ckb_metrics::handle() { metrics .ckb_tx_pool_entry diff --git a/tx-pool/src/component/tests/proposed.rs b/tx-pool/src/component/tests/proposed.rs index 13fffab30e..bf2f447886 100644 --- a/tx-pool/src/component/tests/proposed.rs +++ b/tx-pool/src/component/tests/proposed.rs @@ -747,7 +747,8 @@ fn test_pool_map_bench() { let mut pool = PoolMap::new(150); let mut instant = Instant::now(); - for i in 0..500000 { + let mut time_spend = vec![]; + for i in 0..20000 { let lock_script1 = ScriptBuilder::default() .code_hash(H256(rand::random()).pack()) .hash_type(ScriptHashType::Data.into()) @@ -777,9 +778,9 @@ fn test_pool_map_bench() { Capacity::shannons(200), rng.gen_range(0..1000), ); - let short_id = entry.proposal_short_id(); - if i % 1000 == 0 { + if i % 5000 == 0 && i != 0 { eprintln!("i: {}, time: {:?}", i, instant.elapsed()); + time_spend.push(instant.elapsed()); instant = Instant::now(); } let status = if rng.gen_range(0..100) >= 30 { @@ -787,12 +788,15 @@ fn test_pool_map_bench() { } else { Status::Gap }; - pool.add_entry(entry, status); + let _ = pool.add_entry(entry, status); } - // for _i in 0..100 { - // let instant = Instant::now(); - // let res = pool.track_entry_statics(None, None); - // let duration = instant.elapsed(); - // eprintln!("duration: {:?}", duration); - // } + let first = time_spend[0].as_millis(); + let last = time_spend.last().unwrap().as_millis(); + let diff = (last as i128 - first as i128).abs(); + let expect_diff_range = ((first as f64) * 0.15) as i128; + eprintln!( + "first: {} last: {}, diff: {}, range: {}", + first, last, diff, expect_diff_range + ); + assert!(diff < expect_diff_range); } diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index a814e29799..53fd572e99 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -73,15 +73,6 @@ impl TxPool { Arc::clone(&self.snapshot) } - fn get_by_status(&self, status: Status) -> Vec<&PoolEntry> { - self.pool_map.get_by_status(status) - } - - /// Get tx-pool size - pub fn status_size(&self, status: Status) -> usize { - self.get_by_status(status).len() - } - /// Check whether tx-pool enable RBF pub fn enable_rbf(&self) -> bool { self.config.min_rbf_rate > self.config.min_fee_rate From 96ccb0dcd19afcdf70999d78ec7c746f5577bb09 Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 14 Mar 2024 21:32:56 +0800 Subject: [PATCH 3/4] cleanup perf logs --- tx-pool/src/component/pool_map.rs | 11 ++--------- tx-pool/src/pool.rs | 7 ------- tx-pool/src/process.rs | 26 +------------------------- tx-pool/src/service.rs | 12 ++++-------- 4 files changed, 7 insertions(+), 49 deletions(-) diff --git a/tx-pool/src/component/pool_map.rs b/tx-pool/src/component/pool_map.rs index 79e1046b13..184cc522ba 100644 --- a/tx-pool/src/component/pool_map.rs +++ b/tx-pool/src/component/pool_map.rs @@ -19,8 +19,6 @@ use ckb_types::{ }; use multi_index_map::MultiIndexMap; use std::collections::HashSet; -use std::time::Instant; - type ConflictEntry = (TxEntry, Reject); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -131,16 +129,11 @@ impl PoolMap { } pub(crate) fn get_max_update_time(&self) -> u64 { - let instant = Instant::now(); - let res = self - .entries + self.entries .iter() .map(|(_, entry)| entry.inner.timestamp) .max() - .unwrap_or(0); - let duration = instant.elapsed(); - debug!("[Perf] get_max_update_time duration: {:?}", duration); - res + .unwrap_or(0) } pub(crate) fn get_by_id(&self, id: &ProposalShortId) -> Option<&PoolEntry> { diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 53fd572e99..ee88211b7d 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -25,7 +25,6 @@ use ckb_types::{ use lru::LruCache; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use std::time::Instant; const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; const CONFLICTES_CACHE_SIZE: usize = 10_000; @@ -246,7 +245,6 @@ impl TxPool { // Expire all transaction (and their dependencies) in the pool. pub(crate) fn remove_expired(&mut self, callbacks: &Callbacks) { let now_ms = ckb_systemtime::unix_time_as_millis(); - let instant = Instant::now(); let removed: Vec<_> = self .pool_map @@ -262,13 +260,10 @@ impl TxPool { let reject = Reject::Expiry(entry.timestamp); callbacks.call_reject(self, &entry, reject); } - let duration = instant.elapsed(); - debug!("[Perf] remove_expired duration: {:?}", duration); } // Remove transactions from the pool until total size <= size_limit. pub(crate) fn limit_size(&mut self, callbacks: &Callbacks) { - let instant = Instant::now(); while self.pool_map.total_tx_size > self.config.max_tx_pool_size { let next_evict_entry = || { self.pool_map @@ -294,8 +289,6 @@ impl TxPool { } } self.pool_map.entries.shrink_to_fit(); - let duration = instant.elapsed(); - debug!("[Perf] limit_size duration: {:?}", duration); } // remove transaction with detached proposal from gap and proposed diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 5831331af5..a669b4881b 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -33,7 +33,7 @@ use ckb_verification::{ use std::collections::HashSet; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::task::block_in_place; const DELAY_LIMIT: usize = 1_500 * 21; // 1_500 per block, 21 blocks @@ -103,7 +103,6 @@ impl TxPoolService { entry: TxEntry, mut status: TxStatus, ) -> (Result<(), Reject>, Arc) { - let instant = Instant::now(); let (ret, snapshot) = self .with_tx_pool_write_lock(move |tx_pool, snapshot| { // check_rbf must be invoked in `write` lock to avoid concurrent issues. @@ -171,8 +170,6 @@ impl TxPoolService { }) .await; - let duration = instant.elapsed(); - debug!("[Perf] submit_entry: {:?}", duration); (ret, snapshot) } @@ -241,7 +238,6 @@ impl TxPoolService { tx: &TransactionView, ) -> (Result, Arc) { // Acquire read lock for cheap check - let instant = Instant::now(); let tx_size = tx.data().serialized_size_in_block(); let (ret, snapshot) = self @@ -283,8 +279,6 @@ impl TxPoolService { } }) .await; - let duration = instant.elapsed(); - debug!("[Perf] pre-check: {:?}", duration); (ret, snapshot) } @@ -1131,7 +1125,6 @@ fn _submit_entry( ) -> Result, Reject> { let tx_hash = entry.transaction().hash(); debug!("submit_entry {:?} {}", status, tx_hash); - let start = Instant::now(); let (succ, evicts) = match status { TxStatus::Fresh => tx_pool.add_pending(entry.clone())?, TxStatus::Gap => tx_pool.add_gap(entry.clone())?, @@ -1144,8 +1137,6 @@ fn _submit_entry( TxStatus::Proposed => callbacks.call_proposed(&entry), } } - let duration = start.elapsed(); - debug!("[Perf] Time elapsed in _submit_entry is: {:?}", duration); Ok(evicts) } @@ -1158,7 +1149,6 @@ fn _update_tx_pool_for_reorg( callbacks: &Callbacks, mine_mode: bool, ) { - let instant = Instant::now(); tx_pool.snapshot = Arc::clone(&snapshot); // NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into @@ -1176,7 +1166,6 @@ fn _update_tx_pool_for_reorg( let mut proposals = Vec::new(); let mut gaps = Vec::new(); - let instant = Instant::now(); for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) { let short_id = entry.inner.proposal_short_id(); if snapshot.proposals().contains_proposed(&short_id) { @@ -1193,16 +1182,7 @@ fn _update_tx_pool_for_reorg( gaps.push(elem); } } - let duration = instant.elapsed(); - debug!("[Perf] reorg duration: {:?}", duration); - debug!( - "[Perf] reorg size: tx_pool size {:?} snapshot gap: {:?}, proposed: {:?}", - tx_pool.pool_map.entries.len(), - snapshot.proposals().gap().len(), - snapshot.proposals().set().len(), - ); - let instant = Instant::now(); for (id, entry) in proposals { debug!("begin to proposed: {:x}", id); if let Err(e) = tx_pool.proposed_rtx(&id) { @@ -1228,8 +1208,6 @@ fn _update_tx_pool_for_reorg( callbacks.call_reject(tx_pool, &entry, e.clone()); } } - let duration = instant.elapsed(); - debug!("[Perf] reorg setting: {:?}", duration); } // Remove expired transaction from pending @@ -1237,8 +1215,6 @@ fn _update_tx_pool_for_reorg( // Remove transactions from the pool until its size <= size_limit. tx_pool.limit_size(callbacks); - let duration = instant.elapsed(); - debug!("[Perf] reorg _update_tx_pool_for_reorg: {:?}", duration); } pub fn all_inputs_is_unknown(snapshot: &Snapshot, tx: &TransactionView) -> bool { diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index e3381f7403..8ad6300a5b 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -14,7 +14,7 @@ use ckb_chain_spec::consensus::Consensus; use ckb_channel::oneshot; use ckb_error::AnyError; use ckb_jsonrpc_types::BlockTemplate; -use ckb_logger::{debug, error, info}; +use ckb_logger::{error, info}; use ckb_network::{NetworkController, PeerIndex}; use ckb_snapshot::Snapshot; use ckb_stop_handler::new_tokio_exit_rx; @@ -33,7 +33,7 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::sync::watch; use tokio::sync::{mpsc, RwLock}; use tokio::task::block_in_place; @@ -920,8 +920,7 @@ impl TxPoolService { let tx_pool = self.tx_pool.read().await; let orphan = self.orphan.read().await; let tip_header = tx_pool.snapshot.tip_header(); - let instant = Instant::now(); - let res = TxPoolInfo { + TxPoolInfo { tip_hash: tip_header.hash(), tip_number: tip_header.number(), pending_size: tx_pool.pool_map.pending_size(), @@ -934,10 +933,7 @@ impl TxPoolService { last_txs_updated_at: tx_pool.pool_map.get_max_update_time(), tx_size_limit: TRANSACTION_SIZE_LIMIT, max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64, - }; - let duration = instant.elapsed(); - debug!("[Perf] tx_pool info: {:?}", duration); - res + } } pub fn should_notify_block_assembler(&self) -> bool { From 4b715ff9a3191d1bed8bef42dfae4df20115f2b6 Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 15 Mar 2024 00:44:14 +0800 Subject: [PATCH 4/4] fix sepc RpcTruncate --- test/src/specs/rpc/truncate.rs | 1 + tx-pool/src/component/pool_map.rs | 5 ++++- tx-pool/src/component/tests/proposed.rs | 8 +++----- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/test/src/specs/rpc/truncate.rs b/test/src/specs/rpc/truncate.rs index a1002ff8c8..da2251972f 100644 --- a/test/src/specs/rpc/truncate.rs +++ b/test/src/specs/rpc/truncate.rs @@ -31,6 +31,7 @@ impl Spec for RpcTruncate { let tx_pool_info = node.get_tip_tx_pool_info(); assert!(tx_pool_info.total_tx_size.value() > 0, "tx-pool holds tx2"); + assert!(tx_pool_info.pending.value() > 0, "tx-pool hods tx2"); // Truncate from `to_truncate` diff --git a/tx-pool/src/component/pool_map.rs b/tx-pool/src/component/pool_map.rs index 184cc522ba..aea59e74bb 100644 --- a/tx-pool/src/component/pool_map.rs +++ b/tx-pool/src/component/pool_map.rs @@ -365,6 +365,9 @@ impl PoolMap { self.links.clear(); self.total_tx_size = 0; self.total_tx_cycles = 0; + self.pending_count = 0; + self.gap_count = 0; + self.proposed_count = 0; } pub(crate) fn score_sorted_iter_by( @@ -628,7 +631,7 @@ impl PoolMap { }); } - pub fn track_entry_statics(&mut self, remove: Option, add: Option) { + fn track_entry_statics(&mut self, remove: Option, add: Option) { match remove { Some(Status::Pending) => self.pending_count -= 1, Some(Status::Gap) => self.gap_count -= 1, diff --git a/tx-pool/src/component/tests/proposed.rs b/tx-pool/src/component/tests/proposed.rs index bf2f447886..236d4b10a9 100644 --- a/tx-pool/src/component/tests/proposed.rs +++ b/tx-pool/src/component/tests/proposed.rs @@ -3,10 +3,8 @@ use crate::component::tests::util::{ build_tx, build_tx_with_dep, build_tx_with_header_dep, DEFAULT_MAX_ANCESTORS_COUNT, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE, }; -use ckb_types::core::capacity_bytes; -use ckb_types::core::ScriptHashType; -use ckb_types::packed::CellOutputBuilder; -use ckb_types::packed::ScriptBuilder; +use ckb_types::core::{capacity_bytes, ScriptHashType}; +use ckb_types::packed::{CellOutputBuilder, ScriptBuilder}; use ckb_types::H256; use std::time::Instant; @@ -793,7 +791,7 @@ fn test_pool_map_bench() { let first = time_spend[0].as_millis(); let last = time_spend.last().unwrap().as_millis(); let diff = (last as i128 - first as i128).abs(); - let expect_diff_range = ((first as f64) * 0.15) as i128; + let expect_diff_range = ((first as f64) * 2.0) as i128; eprintln!( "first: {} last: {}, diff: {}, range: {}", first, last, diff, expect_diff_range