diff --git a/.github/workflows/ci_integration_tests_windows.yaml b/.github/workflows/ci_integration_tests_windows.yaml index c59c12078c..2ee12d3e9e 100644 --- a/.github/workflows/ci_integration_tests_windows.yaml +++ b/.github/workflows/ci_integration_tests_windows.yaml @@ -51,7 +51,7 @@ jobs: ci_integration_tests_windows: name: ci_integration_tests_windows needs: prologue - runs-on: ${{ needs.prologue.outputs.windows_runner_label }} + runs-on: windows-latest timeout-minutes: 140 steps: - uses: actions/checkout@v3 diff --git a/chain/src/init.rs b/chain/src/init.rs index 4dc9d2d919..e352ed43ab 100644 --- a/chain/src/init.rs +++ b/chain/src/init.rs @@ -2,11 +2,11 @@ //! Bootstrap InitLoadUnverified, PreloadUnverifiedBlock, ChainService and ConsumeUnverified threads. use crate::chain_service::ChainService; -use crate::consume_unverified::ConsumeUnverifiedBlocks; use crate::init_load_unverified::InitLoadUnverified; use crate::orphan_broker::OrphanBroker; use crate::preload_unverified_blocks_channel::PreloadUnverifiedBlocksChannel; use crate::utils::orphan_block_pool::OrphanBlockPool; +use crate::verify::ConsumeUnverifiedBlocks; use crate::{chain_controller::ChainController, LonelyBlockHash, UnverifiedBlock}; use ckb_channel::{self as channel, SendError}; use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW; @@ -37,7 +37,7 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController { let is_pending_verify: Arc> = Arc::new(DashSet::new()); let consumer_unverified_thread = thread::Builder::new() - .name("consume_unverified_blocks".into()) + .name("verify_blocks".into()) .spawn({ let shared = builder.shared.clone(); let is_pending_verify = Arc::clone(&is_pending_verify); diff --git a/chain/src/init_load_unverified.rs b/chain/src/init_load_unverified.rs index e2c4ebae00..30b0247cdb 100644 --- a/chain/src/init_load_unverified.rs +++ b/chain/src/init_load_unverified.rs @@ -93,6 +93,15 @@ impl InitLoadUnverified { let unverified_hashes: Vec = self.find_unverified_block_hashes(check_unverified_number); + if check_unverified_number > tip_number && unverified_hashes.is_empty() { + info!( + "no unverified blocks found after tip, current tip: {}-{}", + tip_number, + self.shared.snapshot().tip_hash() + ); + return; + } + for unverified_hash in unverified_hashes { f(&unverified_hash); } diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 5ffd268222..34b61ef562 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -16,7 +16,6 @@ use std::sync::Arc; mod chain_controller; mod chain_service; -pub mod consume_unverified; mod init; mod init_load_unverified; mod orphan_broker; @@ -24,6 +23,7 @@ mod preload_unverified_blocks_channel; #[cfg(test)] mod tests; mod utils; +pub mod verify; pub use chain_controller::ChainController; use ckb_logger::{error, info}; diff --git a/chain/src/tests/find_fork.rs b/chain/src/tests/find_fork.rs index 93fa67f118..2c39cfd0ca 100644 --- a/chain/src/tests/find_fork.rs +++ b/chain/src/tests/find_fork.rs @@ -1,5 +1,5 @@ -use crate::consume_unverified::ConsumeUnverifiedBlockProcessor; use crate::utils::forkchanges::ForkChanges; +use crate::verify::ConsumeUnverifiedBlockProcessor; use crate::{start_chain_services, UnverifiedBlock}; use ckb_chain_spec::consensus::{Consensus, ProposalWindow}; use ckb_proposal_table::ProposalTable; diff --git a/chain/src/consume_unverified.rs b/chain/src/verify.rs similarity index 99% rename from chain/src/consume_unverified.rs rename to chain/src/verify.rs index cf09abdaba..1b2a007aa1 100644 --- a/chain/src/consume_unverified.rs +++ b/chain/src/verify.rs @@ -103,7 +103,7 @@ impl ConsumeUnverifiedBlocks { }, }, recv(self.stop_rx) -> _ => { - info!("consume_unverified_blocks thread received exit signal, exit now"); + info!("verify_blocks thread received exit signal, exit now"); break; } diff --git a/deny.toml b/deny.toml index aadefc53d2..fabff48ef2 100644 --- a/deny.toml +++ b/deny.toml @@ -70,6 +70,10 @@ feature-depth = 1 # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ +# https://rustsec.org/advisories/RUSTSEC-2024-0363 +# https://github.com/launchbadge/sqlx/issues/3440 +# The queries for the rich indexer receive input parameters via RPC, and the data size is far less than 4GB, so this issue can be temporarily ignored while waiting for sqlx to be fixed. + "RUSTSEC-2024-0363", # https://rustsec.org/advisories/RUSTSEC-2022-0090 # It was sometimes possible for SQLite versions >= 1.0.12, < 3.39.2 to allow an array-bounds overflow when large string were input into SQLite's `printf` function. "RUSTSEC-2022-0090", diff --git a/devtools/ci/check-relaxed.sh b/devtools/ci/check-relaxed.sh new file mode 100755 index 0000000000..8f5cab29c4 --- /dev/null +++ b/devtools/ci/check-relaxed.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +set -euo pipefail + +case "$OSTYPE" in + darwin*) + if ! type gsed &>/dev/null || ! type ggrep &>/dev/null; then + echo "GNU sed and grep not found! You can install via Homebrew" >&2 + echo >&2 + echo " brew install grep gnu-sed" >&2 + exit 1 + fi + + SED=gsed + GREP=ggrep + ;; + *) + SED=sed + GREP=grep + ;; +esac + +function main() { + local res=$(find ./ -not -path '*/target/*' -type f -name "*.rs" | xargs grep -H "Relaxed") + + if [ -z "${res}" ]; then + echo "ok" + exit 0 + else + echo "find use Relaxed on code, please check" + + for file in ${res}; do + printf ${file} + done + + exit 1 + fi +} + +main "$@" diff --git a/devtools/ci/ci_main.sh b/devtools/ci/ci_main.sh index f3ff03a86b..33242b3035 100755 --- a/devtools/ci/ci_main.sh +++ b/devtools/ci/ci_main.sh @@ -60,6 +60,7 @@ case $GITHUB_WORKFLOW in make check-dirty-rpc-doc make check-dirty-hashes-toml devtools/ci/check-cyclic-dependencies.py + devtools/ci/check-relaxed.sh ;; ci_aarch64_build*) echo "ci_aarch64_build" diff --git a/network/src/network.rs b/network/src/network.rs index ea7a135eee..b5814d3482 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -485,7 +485,7 @@ impl NetworkState { /// Network message processing controller, default is true, if false, discard any received messages pub fn is_active(&self) -> bool { - self.active.load(Ordering::Relaxed) + self.active.load(Ordering::Acquire) } } @@ -1368,7 +1368,7 @@ impl NetworkController { /// Change active status, if set false discard any received messages pub fn set_active(&self, active: bool) { - self.network_state.active.store(active, Ordering::Relaxed); + self.network_state.active.store(active, Ordering::Release); } /// Return all connected peers' protocols info diff --git a/rpc/README.md b/rpc/README.md index b769033c7d..c94b79d98e 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -109,6 +109,7 @@ The crate `ckb-rpc`'s minimum supported rustc version is 1.71.1. * [Method `remove_transaction`](#pool-remove_transaction) * [Method `tx_pool_info`](#pool-tx_pool_info) * [Method `clear_tx_pool`](#pool-clear_tx_pool) + * [Method `clear_tx_verify_queue`](#pool-clear_tx_verify_queue) * [Method `get_raw_tx_pool`](#pool-get_raw_tx_pool) * [Method `get_pool_tx_detail_info`](#pool-get_pool_tx_detail_info) * [Method `tx_pool_ready`](#pool-tx_pool_ready) @@ -4748,6 +4749,37 @@ Response } ``` + +#### Method `clear_tx_verify_queue` +* `clear_tx_verify_queue()` + +* result: `null` + +Removes all transactions from the verification queue. + +###### Examples + +Request + +```json +{ + "id": 42, + "jsonrpc": "2.0", + "method": "clear_tx_verify_queue", + "params": [] +} +``` + +Response + +```json +{ + "id": 42, + "jsonrpc": "2.0", + "result": null +} +``` + #### Method `get_raw_tx_pool` * `get_raw_tx_pool(verbose)` diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index 9fe30b357e..a9e65da1c3 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -322,6 +322,33 @@ pub trait PoolRpc { #[rpc(name = "clear_tx_pool")] fn clear_tx_pool(&self) -> Result<()>; + /// Removes all transactions from the verification queue. + /// + /// ## Examples + /// + /// Request + /// + /// ```json + /// { + /// "id": 42, + /// "jsonrpc": "2.0", + /// "method": "clear_tx_verify_queue", + /// "params": [] + /// } + /// ``` + /// + /// Response + /// + /// ```json + /// { + /// "id": 42, + /// "jsonrpc": "2.0", + /// "result": null + /// } + /// ``` + #[rpc(name = "clear_tx_verify_queue")] + fn clear_tx_verify_queue(&self) -> Result<()>; + /// Returns all transaction ids in tx pool as a json array of string transaction ids. /// ## Params /// @@ -662,6 +689,15 @@ impl PoolRpc for PoolRpcImpl { Ok(()) } + fn clear_tx_verify_queue(&self) -> Result<()> { + let tx_pool = self.shared.tx_pool_controller(); + tx_pool + .clear_verify_queue() + .map_err(|err| RPCError::custom(RPCError::Invalid, err.to_string()))?; + + Ok(()) + } + fn get_raw_tx_pool(&self, verbose: Option) -> Result { let tx_pool = self.shared.tx_pool_controller(); diff --git a/script/src/verify/tests/ckb_latest/features_since_v2021.rs b/script/src/verify/tests/ckb_latest/features_since_v2021.rs index 5ffea363ce..cb70fb0b4c 100644 --- a/script/src/verify/tests/ckb_latest/features_since_v2021.rs +++ b/script/src/verify/tests/ckb_latest/features_since_v2021.rs @@ -835,10 +835,12 @@ fn _check_typical_secp256k1_blake160_2_in_2_out_tx_with_state(step_cycles: Cycle let mut cycles = 0; let verifier = TransactionScriptsVerifierWithEnv::new(); let result = verifier.verify_map(script_version, &rtx, |verifier| { + #[allow(unused_assignments)] let mut init_state: Option = None; - if let VerifyResult::Suspended(state) = verifier.resumable_verify(step_cycles).unwrap() { - init_state = Some(state); + match verifier.resumable_verify(step_cycles).unwrap() { + VerifyResult::Suspended(state) => init_state = Some(state), + VerifyResult::Completed(cycle) => return Ok(cycle), } loop { @@ -948,12 +950,12 @@ fn _check_typical_secp256k1_blake160_2_in_2_out_tx_with_snap(step_cycles: Cycle) if script_version == crate::ScriptVersion::V2 { assert!( cycles >= TWO_IN_TWO_OUT_CYCLES - V2_CYCLE_BOUND, - "step_cycles {step_cycles}" + "cycles {cycles} step_cycles {step_cycles}" ); } else { assert!( cycles >= TWO_IN_TWO_OUT_CYCLES - CYCLE_BOUND, - "step_cycles {step_cycles}" + "cycles {cycles} step_cycles {step_cycles}" ); } assert_eq!(cycles, cycles_once, "step_cycles {step_cycles}"); diff --git a/shared/src/shared.rs b/shared/src/shared.rs index a6141e041d..2939230a5f 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -378,14 +378,14 @@ impl Shared { /// Return whether chain is in initial block download pub fn is_initial_block_download(&self) -> bool { // Once this function has returned false, it must remain false. - if self.ibd_finished.load(Ordering::Relaxed) { + if self.ibd_finished.load(Ordering::Acquire) { false } else if unix_time_as_millis().saturating_sub(self.snapshot().tip_header().timestamp()) > MAX_TIP_AGE { true } else { - self.ibd_finished.store(true, Ordering::Relaxed); + self.ibd_finished.store(true, Ordering::Release); false } } diff --git a/shared/src/types/header_map/kernel_lru.rs b/shared/src/types/header_map/kernel_lru.rs index 46dba8eb35..c82404658e 100644 --- a/shared/src/types/header_map/kernel_lru.rs +++ b/shared/src/types/header_map/kernel_lru.rs @@ -157,7 +157,7 @@ where self.stats().tick_primary_delete(); } // If IBD is not finished, don't shrink memory map - let allow_shrink_to_fit = self.ibd_finished.load(Ordering::Relaxed); + let allow_shrink_to_fit = self.ibd_finished.load(Ordering::Acquire); self.memory.remove(hash, allow_shrink_to_fit); if self.backend.is_empty() { return; @@ -175,7 +175,7 @@ where }); // If IBD is not finished, don't shrink memory map - let allow_shrink_to_fit = self.ibd_finished.load(Ordering::Relaxed); + let allow_shrink_to_fit = self.ibd_finished.load(Ordering::Acquire); self.memory .remove_batch(values.iter().map(|value| value.hash()), allow_shrink_to_fit); } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index aa3f2cc181..83aa1213c3 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -680,6 +680,11 @@ impl Synchronizer { return; } + if ckb_stop_handler::has_received_stop_signal() { + info!("received stop signal, stop find_blocks_to_fetch"); + return; + } + let unverified_tip = self.shared.active_chain().unverified_tip_number(); let disconnect_list = { diff --git a/sync/src/tests/types.rs b/sync/src/tests/types.rs index a6f11e9b44..e41c0827d1 100644 --- a/sync/src/tests/types.rs +++ b/sync/src/tests/types.rs @@ -8,7 +8,10 @@ use ckb_types::{ use rand::{thread_rng, Rng}; use std::{ collections::{BTreeMap, HashMap}, - sync::atomic::{AtomicUsize, Ordering::Relaxed}, + sync::atomic::{ + AtomicUsize, + Ordering::{Acquire, SeqCst}, + }, }; use crate::types::{TtlFilter, FILTER_TTL}; @@ -64,7 +67,7 @@ fn test_get_ancestor_use_skip_list() { 0, b, |hash, _| { - count.fetch_add(1, Relaxed); + count.fetch_add(1, SeqCst); header_map.get(hash).cloned() }, |_, _| None, @@ -72,7 +75,7 @@ fn test_get_ancestor_use_skip_list() { .unwrap(); // Search must finished in steps - assert!(count.load(Relaxed) <= limit); + assert!(count.load(Acquire) <= limit); header }; diff --git a/test/src/main.rs b/test/src/main.rs index d6a8187e70..038b947452 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -491,6 +491,7 @@ fn all_specs() -> Vec> { Box::new(RbfReplaceProposedSuccess), Box::new(RbfConcurrency), Box::new(RbfCellDepsCheck), + Box::new(RbfCyclingAttack), Box::new(CompactBlockEmpty), Box::new(CompactBlockEmptyParentUnknown), Box::new(CompactBlockPrefilled), diff --git a/test/src/node.rs b/test/src/node.rs index 32ba61baea..ad25d7fe89 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -32,15 +32,22 @@ use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::{Duration, Instant}; -#[cfg(target_os = "windows")] -use windows_sys::Win32::System::Console::{GenerateConsoleCtrlEvent, CTRL_C_EVENT}; - pub(crate) struct ProcessGuard { pub name: String, pub child: Child, pub killed: bool, } +impl ProcessGuard { + pub(crate) fn is_alive(&mut self) -> bool { + let try_wait = self.child.try_wait(); + match try_wait { + Ok(status_op) => status_op.is_none(), + Err(_err) => false, + } + } +} + impl Drop for ProcessGuard { fn drop(&mut self) { if !self.killed { @@ -361,7 +368,7 @@ impl Node { let timestamp = block.timestamp(); let uncle = block .as_advanced_builder() - .timestamp((timestamp + 1).pack()) + .timestamp((timestamp - 1).pack()) .build(); (block, uncle) } @@ -701,6 +708,7 @@ impl Node { } }; + self.wait_find_unverified_blocks_finished(); self.wait_tx_pool_ready(); self.set_process_guard(ProcessGuard { @@ -740,6 +748,15 @@ impl Node { g.take() } + pub(crate) fn is_alive(&mut self) -> bool { + let mut g = self.inner.guard.write().unwrap(); + if let Some(guard) = g.as_mut() { + guard.is_alive() + } else { + false + } + } + pub fn stop(&mut self) { drop(self.take_guard()); } @@ -844,28 +861,19 @@ impl Node { info!("accessed db done"); } + #[allow(unused_mut)] pub fn stop_gracefully(&mut self) { let guard = self.take_guard(); if let Some(mut guard) = guard { if !guard.killed { // on nix: send SIGINT to the child - // on windows: use taskkill to kill the child gracefully - Self::kill_gracefully(guard.child.id()); - let _ = guard.child.wait(); - guard.killed = true; - } - } - } - - #[cfg(target_os = "windows")] - fn kill_gracefully(pid: u32) { - unsafe { - let ret = GenerateConsoleCtrlEvent(CTRL_C_EVENT, pid); - if ret == 0 { - let err = std::io::Error::last_os_error(); - error!("GenerateConsoleCtrlEvent failed: {}", err); - } else { - info!("GenerateConsoleCtrlEvent success"); + // on windows: don't kill gracefully..... fix later + #[cfg(not(target_os = "windows"))] + { + Self::kill_gracefully(guard.child.id()); + let _ = guard.child.wait(); + guard.killed = true; + } } } } diff --git a/test/src/rpc.rs b/test/src/rpc.rs index 2f91be7e83..bb4b9a81dc 100644 --- a/test/src/rpc.rs +++ b/test/src/rpc.rs @@ -174,13 +174,24 @@ impl RpcClient { } pub fn wait_rpc_ready(&self) { + self.wait_rpc_ready_internal(|| { + panic!("wait rpc ready timeout"); + }); + } + + pub fn wait_rpc_ready_internal(&self, fail: F) -> bool + where + F: Fn(), + { let now = std::time::Instant::now(); while self.inner.get_tip_block_number().is_err() { std::thread::sleep(std::time::Duration::from_millis(100)); if now.elapsed() > std::time::Duration::from_secs(60) { - panic!("wait rpc ready timeout"); + fail(); + return false; } } + true } pub fn get_block_template( diff --git a/test/src/specs/fault_injection/randomly_kill.rs b/test/src/specs/fault_injection/randomly_kill.rs index 358c81bd9d..e3f42a67d0 100644 --- a/test/src/specs/fault_injection/randomly_kill.rs +++ b/test/src/specs/fault_injection/randomly_kill.rs @@ -11,8 +11,23 @@ impl Spec for RandomlyKill { fn run(&self, nodes: &mut Vec) { let mut rng = thread_rng(); let node = &mut nodes[0]; - for _ in 0..rng.gen_range(10..20) { - node.rpc_client().wait_rpc_ready(); + let max_restart_times = rng.gen_range(10..20); + + let mut node_crash_times = 0; + + let mut randomly_kill_times = 0; + while randomly_kill_times < max_restart_times { + node.rpc_client().wait_rpc_ready_internal(|| {}); + + if !node.is_alive() { + node.start(); + node_crash_times += 1; + + if node_crash_times > 3 { + panic!("Node crash too many times"); + } + } + let n = rng.gen_range(0..10); // TODO: the kill of child process and mining are actually sequential here // We need to find some way to so these two things in parallel. @@ -24,7 +39,8 @@ impl Spec for RandomlyKill { node.mine(n); } info!("Stop the node"); - node.stop(); + node.stop_gracefully(); + randomly_kill_times += 1; info!("Start the node"); node.start(); } diff --git a/test/src/specs/sync/sync_churn.rs b/test/src/specs/sync/sync_churn.rs index 554f580ef6..4770947053 100644 --- a/test/src/specs/sync/sync_churn.rs +++ b/test/src/specs/sync/sync_churn.rs @@ -60,6 +60,11 @@ impl Spec for SyncChurn { if too_many_blocks || restart_stopped_rx.try_recv().is_ok() { break; } + info!( + "mining_node {}, tip: {}", + mining_node.node_id(), + mining_node.get_tip_block_number() + ); waiting_for_sync(&mining_nodes); } }); diff --git a/test/src/specs/tx_pool/replace.rs b/test/src/specs/tx_pool/replace.rs index 3d5a61c6bc..b6ef175906 100644 --- a/test/src/specs/tx_pool/replace.rs +++ b/test/src/specs/tx_pool/replace.rs @@ -1013,6 +1013,142 @@ impl Spec for RbfCellDepsCheck { } } +pub struct RbfCyclingAttack; +impl Spec for RbfCyclingAttack { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + let initial_inputs = gen_spendable(node0, 3); + let input_a = &initial_inputs[0]; + let input_b = &initial_inputs[1]; + let input_c = &initial_inputs[2]; + + let input_c: CellInput = CellInput::new_builder() + .previous_output(input_c.out_point.clone()) + .build(); + + // Commit transaction root + let tx_a = { + let tx_a = always_success_transaction(node0, input_a); + node0.submit_transaction(&tx_a); + tx_a + }; + + let tx_b = { + let tx_b = always_success_transaction(node0, input_b); + node0.submit_transaction(&tx_b); + tx_b + }; + + let mut prev = tx_a.clone(); + // Create transaction chain, A0 -> A1 -> A2 + let mut txs_chain_a = vec![tx_a]; + for _i in 0..2 { + let input = + CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default()) + .out_point(OutPoint::new(prev.hash(), 0)) + .build(); + let cur = always_success_transaction(node0, &input); + txs_chain_a.push(cur.clone()); + let _ = node0.rpc_client().send_transaction(cur.data().into()); + prev = cur.clone(); + } + + // Create transaction chain, B0 -> B1 + let mut txs_chain_b = vec![tx_b.clone()]; + let mut prev = tx_b; + for _i in 0..1 { + let input = + CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default()) + .out_point(OutPoint::new(prev.hash(), 0)) + .build(); + let cur = always_success_transaction(node0, &input); + txs_chain_b.push(cur.clone()); + let _ = node0.rpc_client().send_transaction(cur.data().into()); + prev = cur.clone(); + } + let tx_b1 = txs_chain_b[1].clone(); + eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id()); + + // Create a child transaction consume B0 and A1 + // A0 ---> A1 ---> A2 + // | + // ----------> B2 + // | + // B0 ---> B1 + let tx_a1 = &txs_chain_a[1]; + let tx_b0 = &txs_chain_b[0]; + + let input_a1: CellInput = CellInput::new_builder() + .previous_output(OutPoint::new(tx_a1.hash(), 0)) + .build(); + let input_b0 = CellInput::new_builder() + .previous_output(OutPoint::new(tx_b0.hash(), 0)) + .build(); + + let tx_b2_output = CellOutputBuilder::default() + .capacity(capacity_bytes!(200).pack()) + .build(); + let tx_b2 = tx_a1 + .as_advanced_builder() + .set_inputs(vec![input_a1, input_b0]) + .set_outputs(vec![tx_b2_output]) + .build(); + let res = node0.rpc_client().send_transaction(tx_b2.data().into()); + eprintln!("tx_b2 {:?}", res); + + // after A2 and B1 is replaced by B2 + // A0 ---> A1 + // | + // ----------> B2 + // | + // B0 + let res = node0.rpc_client().get_transaction(tx_b2.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + let res = node0.rpc_client().get_transaction(txs_chain_a[2].hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + let res = node0.rpc_client().get_transaction(txs_chain_b[1].hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + + // tx_b1 is still rejected + let res = node0.rpc_client().get_transaction(tx_b1.hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + + // Create a new transaction A3 consume A1, it will replace B2 + let input_a1 = CellInput::new_builder() + .previous_output(OutPoint::new(tx_a1.hash(), 0)) + .build(); + let tx_a3_output = CellOutputBuilder::default() + .capacity(capacity_bytes!(100).pack()) + .build(); + let tx_a3 = tx_a1 + .as_advanced_builder() + .set_inputs(vec![input_a1, input_c]) + .set_outputs(vec![tx_a3_output]) + .build(); + let _res = node0.rpc_client().send_transaction(tx_a3.data().into()); + + // now result is: + // A0 ---> A1 -> A3 + // + // B0 -> B1 (B1 is recovered back) + // + let res = node0.rpc_client().get_transaction(tx_a3.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + let res = node0.rpc_client().get_transaction(tx_b2.hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id()); + + // B1 is expected by recovered back + let res = node0.rpc_client().get_transaction(tx_b1.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + fn run_spec_send_conflict_relay(nodes: &mut [Node]) { let node0 = &nodes[0]; let node1 = &nodes[1]; diff --git a/test/src/specs/tx_pool/send_large_cycles_tx.rs b/test/src/specs/tx_pool/send_large_cycles_tx.rs index 2ddf972629..ce4d4ec09e 100644 --- a/test/src/specs/tx_pool/send_large_cycles_tx.rs +++ b/test/src/specs/tx_pool/send_large_cycles_tx.rs @@ -100,6 +100,7 @@ impl Spec for SendLargeCyclesTxToRelay { let node0 = &nodes[0]; let node1 = &nodes[1]; + node0.mine_until_out_bootstrap_period(); node1.mine_until_out_bootstrap_period(); node0.connect(node1); info!("Generate large cycles tx"); @@ -116,7 +117,7 @@ impl Spec for SendLargeCyclesTxToRelay { }); assert!(result, "node0 can't sync with node1"); - let result = wait_until(60, || { + let result = wait_until(120, || { node0 .rpc_client() .get_transaction(tx.hash()) diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 54b2395e4c..da259d413c 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -28,6 +28,7 @@ use std::sync::Arc; const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; const CONFLICTES_CACHE_SIZE: usize = 10_000; +const CONFLICTES_INPUTS_CACHE_SIZE: usize = 30_000; const MAX_REPLACEMENT_CANDIDATES: usize = 100; /// Tx-pool implementation @@ -44,6 +45,8 @@ pub struct TxPool { pub(crate) expiry: u64, // conflicted transaction cache pub(crate) conflicts_cache: lru::LruCache, + // conflicted transaction outputs cache, input -> tx_short_id + pub(crate) conflicts_outputs_cache: lru::LruCache, } impl TxPool { @@ -59,6 +62,7 @@ impl TxPool { recent_reject, expiry, conflicts_cache: LruCache::new(CONFLICTES_CACHE_SIZE), + conflicts_outputs_cache: lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE), } } @@ -158,6 +162,9 @@ impl TxPool { pub(crate) fn record_conflict(&mut self, tx: TransactionView) { let short_id = tx.proposal_short_id(); + for inputs in tx.input_pts_iter() { + self.conflicts_outputs_cache.put(inputs, short_id.clone()); + } self.conflicts_cache.put(short_id.clone(), tx); debug!( "record_conflict {:?} now cache size: {}", @@ -167,7 +174,11 @@ impl TxPool { } pub(crate) fn remove_conflict(&mut self, short_id: &ProposalShortId) { - self.conflicts_cache.pop(short_id); + if let Some(tx) = self.conflicts_cache.pop(short_id) { + for inputs in tx.input_pts_iter() { + self.conflicts_outputs_cache.pop(&inputs); + } + } debug!( "remove_conflict {:?} now cache size: {}", short_id, @@ -175,6 +186,19 @@ impl TxPool { ); } + pub(crate) fn get_conflicted_txs_from_inputs( + &self, + inputs: impl Iterator, + ) -> Vec { + inputs + .filter_map(|input| { + self.conflicts_outputs_cache + .peek(&input) + .and_then(|id| self.conflicts_cache.peek(id).cloned()) + }) + .collect() + } + /// Returns tx with cycles corresponding to the id. pub(crate) fn get_tx_with_cycles( &self, @@ -493,6 +517,7 @@ impl TxPool { self.snapshot = snapshot; self.committed_txs_hash_cache = LruCache::new(COMMITTED_HASH_CACHE_SIZE); self.conflicts_cache = LruCache::new(CONFLICTES_CACHE_SIZE); + self.conflicts_outputs_cache = lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE); } pub(crate) fn package_proposals( diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index b249be78d7..2594901784 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -132,26 +132,11 @@ impl TxPoolService { time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?; } - // try to remove conflicted tx here - for id in conflicts.iter() { - let removed = tx_pool.pool_map.remove_entry_and_descendants(id); - for old in removed { - debug!( - "remove conflict tx {} for RBF by new tx {}", - old.transaction().hash(), - entry.transaction().hash() - ); - let reject = Reject::RBFRejected(format!( - "replaced by tx {}", - entry.transaction().hash() - )); - // RBF replace successfully, put old transactions into conflicts pool - tx_pool.record_conflict(old.transaction().clone()); - // after removing old tx from tx_pool, we call reject callbacks manually - self.callbacks.call_reject(tx_pool, &old, reject); - } - } + let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts); let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?; + + // in a corner case, a tx with lower fee rate may be rejected immediately + // after inserting into pool, return proper reject error here for evict in evicted { let reject = Reject::Invalidated(format!( "invalidated by tx {}", @@ -159,12 +144,23 @@ impl TxPoolService { )); self.callbacks.call_reject(tx_pool, &evict, reject); } + tx_pool.remove_conflict(&entry.proposal_short_id()); - // in a corner case, a tx with lower fee rate may be rejected immediately - // after inserting into pool, return proper reject error here tx_pool .limit_size(&self.callbacks, Some(&entry.proposal_short_id())) .map_or(Ok(()), Err)?; + + if !may_recovered_txs.is_empty() { + let self_clone = self.clone(); + tokio::spawn(async move { + // push the recovered txs back to verify queue, so that they can be verified and submitted again + let mut queue = self_clone.verify_queue.write().await; + for tx in may_recovered_txs { + debug!("recover back: {:?}", tx.proposal_short_id()); + let _ = queue.add_tx(tx, None); + } + }); + } Ok(()) }) .await; @@ -200,6 +196,55 @@ impl TxPoolService { } } + // try to remove conflicted tx here, the returned txs can be re-verified and re-submitted + // since they maybe not conflicted anymore + fn process_rbf( + &self, + tx_pool: &mut TxPool, + entry: &TxEntry, + conflicts: &HashSet, + ) -> Vec { + let mut may_recovered_txs = vec![]; + let mut available_inputs = HashSet::new(); + + if conflicts.is_empty() { + return may_recovered_txs; + } + + let all_removed: Vec<_> = conflicts + .iter() + .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id)) + .collect(); + + available_inputs.extend( + all_removed + .iter() + .flat_map(|removed| removed.transaction().input_pts_iter()), + ); + + for input in entry.transaction().input_pts_iter() { + available_inputs.remove(&input); + } + + may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter()); + for old in all_removed { + debug!( + "remove conflict tx {} for RBF by new tx {}", + old.transaction().hash(), + entry.transaction().hash() + ); + let reject = + Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash())); + + // RBF replace successfully, put old transactions into conflicts pool + tx_pool.record_conflict(old.transaction().clone()); + // after removing old tx from tx_pool, we call reject callbacks manually + self.callbacks.call_reject(tx_pool, &old, reject); + } + assert!(!may_recovered_txs.contains(entry.transaction())); + may_recovered_txs + } + pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool { let queue = self.verify_queue.read().await; queue.contains_key(&tx.proposal_short_id()) @@ -448,7 +493,7 @@ impl TxPoolService { self.process_orphan_tx(&tx).await; } Err(reject) => { - debug!( + info!( "after_process {} {} remote reject: {} ", tx_hash, peer, reject ); @@ -686,6 +731,10 @@ impl TxPoolService { if let Some(declared) = declared_cycles { if declared != verified.cycles { + info!( + "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}", + declared, verified.cycles, tx + ); return Some(( Err(Reject::DeclaredWrongCycles(declared, verified.cycles)), snapshot, diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index cb9c3939f7..6645550af8 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -110,6 +110,7 @@ pub(crate) enum Message { GetTransactionWithStatus(Request), NewUncle(Notify), ClearPool(Request, ()>), + ClearVerifyQueue(Request<(), ()>), GetAllEntryInfo(Request<(), TxPoolEntryInfo>), GetAllIds(Request<(), TxPoolIds>), SavePool(Request<(), ()>), @@ -176,13 +177,13 @@ macro_rules! send_notify { impl TxPoolController { /// Return whether tx-pool service is started pub fn service_started(&self) -> bool { - self.started.load(Ordering::Relaxed) + self.started.load(Ordering::Acquire) } /// Set tx-pool service started, should only used for test #[cfg(feature = "internal")] pub fn set_service_started(&self, v: bool) { - self.started.store(v, Ordering::Relaxed); + self.started.store(v, Ordering::Release); } /// Return reference of tokio runtime handle @@ -322,6 +323,11 @@ impl TxPoolController { send_message!(self, ClearPool, new_snapshot) } + /// Clears the tx-verify-queue. + pub fn clear_verify_queue(&self) -> Result<(), AnyError> { + send_message!(self, ClearVerifyQueue, ()) + } + /// TODO(doc): @zhangsoledad pub fn get_all_entry_info(&self) -> Result { send_message!(self, GetAllEntryInfo, ()) @@ -652,7 +658,7 @@ impl TxPoolServiceBuilder { } } }); - self.started.store(true, Ordering::Relaxed); + self.started.store(true, Ordering::Release); if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) { error!("Failed to import persistent txs, cause: {}", err); } @@ -914,6 +920,12 @@ async fn process(mut service: TxPoolService, message: Message) { error!("Responder sending clear_pool failed {:?}", e) }; } + Message::ClearVerifyQueue(Request { responder, .. }) => { + service.verify_queue.write().await.clear(); + if let Err(e) = responder.send(()) { + error!("Responder sending clear_verify_queue failed {:?}", e) + }; + } Message::GetPoolTxDetails(Request { responder, arguments: tx_hash, @@ -1105,10 +1117,10 @@ impl TxPoolService { } pub fn after_delay(&self) -> bool { - self.after_delay.load(Ordering::Relaxed) + self.after_delay.load(Ordering::Acquire) } pub fn set_after_delay_true(&self) { - self.after_delay.store(true, Ordering::Relaxed); + self.after_delay.store(true, Ordering::Release); } } diff --git a/tx-pool/src/verify_mgr.rs b/tx-pool/src/verify_mgr.rs index 1e0a9b5a70..0734ee8ab5 100644 --- a/tx-pool/src/verify_mgr.rs +++ b/tx-pool/src/verify_mgr.rs @@ -76,6 +76,10 @@ impl Worker { async fn process_inner(&mut self) { loop { + if self.exit_signal.is_cancelled() { + info!("Verify worker::process_inner exit_signal is cancelled"); + return; + } if self.status != ChunkCommand::Resume { return; } @@ -170,7 +174,6 @@ impl VerifyMgr { } fn send_child_command(&self, command: ChunkCommand) { - //info!("[verify-test] verify-mgr send child command: {:?}", command); for w in &self.workers { if let Err(err) = w.0.send(command.clone()) { info!("send worker command failed, error: {}", err); diff --git a/util/app-config/src/tests/bats_tests/cli_test.sh b/util/app-config/src/tests/bats_tests/cli_test.sh index 25ee673c26..bd5fc1318c 100755 --- a/util/app-config/src/tests/bats_tests/cli_test.sh +++ b/util/app-config/src/tests/bats_tests/cli_test.sh @@ -9,6 +9,25 @@ function cleanup { rm -rf ${CKB_BATS_TESTBED} } +git_clone_repo_with_retry() { + local branch=$1 + local repo_address=$2 + local dir_name=$3 + local retry_count=5 + local retry_delay=5 + + for i in $(seq 1 $retry_count); do + git clone --depth 1 --branch "$branch" "$repo_address" "$dir_name" && break + echo "Attempt $i failed. Retrying in $retry_delay seconds..." + sleep $retry_delay + done + + if [ $i -eq $retry_count ]; then + echo "Failed to clone repository after $retry_count attempts." + exit 1 + fi +} + trap cleanup EXIT cp target/prod/ckb ${CKB_BATS_TESTBED} @@ -17,24 +36,25 @@ cp -r util/app-config/src/tests/bats_tests/later_bats_job ${CKB_BATS_TESTBED} cp util/app-config/src/tests/bats_tests/*.sh ${CKB_BATS_TESTBED} if [ ! -d "/tmp/ckb_bats_assets/" ]; then - git clone --depth=1 https://github.com/nervosnetwork/ckb-assets /tmp/ckb_bats_assets + git_clone_repo_with_retry "main" "https://github.com/nervosnetwork/ckb-assets" "/tmp/ckb_bats_assets" fi cp /tmp/ckb_bats_assets/cli_bats_env/ckb_mainnet_4000.json ${CKB_BATS_TESTBED} CKB_BATS_CORE_DIR=/tmp/ckb_bats_core if [ ! -d "${CKB_BATS_CORE_DIR}/bats" ]; then - git clone --depth 1 --branch v1.9.0 https://github.com/bats-core/bats-core.git ${CKB_BATS_CORE_DIR}/bats - ${CKB_BATS_CORE_DIR}/bats/install.sh /tmp/ckb_bats_bin/tmp_install + git_clone_repo_with_retry "v1.9.0" "https://github.com/bats-core/bats-core.git" "${CKB_BATS_CORE_DIR}/bats" + ${CKB_BATS_CORE_DIR}/bats/install.sh /tmp/ckb_bats_bin/tmp_install fi if [ ! -d "${CKB_BATS_CORE_DIR}/bats-support" ]; then - git clone --depth 1 --branch v0.3.0 https://github.com/bats-core/bats-support.git ${CKB_BATS_CORE_DIR}/bats-support + git_clone_repo_with_retry "v0.3.0" "https://github.com/bats-core/bats-support.git" "${CKB_BATS_CORE_DIR}/bats-support" fi bash ${CKB_BATS_CORE_DIR}/bats-support/load.bash if [ ! -d "${CKB_BATS_CORE_DIR}/bats-assert" ]; then - git clone --depth 1 --branch v2.1.0 https://github.com/bats-core/bats-assert.git ${CKB_BATS_CORE_DIR}/bats-assert + git_clone_repo_with_retry "v2.1.0" "https://github.com/bats-core/bats-assert.git" "${CKB_BATS_CORE_DIR}/bats-assert" fi + bash ${CKB_BATS_CORE_DIR}/bats-assert/load.bash cd ${CKB_BATS_TESTBED} diff --git a/util/systemtime/src/lib.rs b/util/systemtime/src/lib.rs index 3f845bb11b..aed840fa23 100644 --- a/util/systemtime/src/lib.rs +++ b/util/systemtime/src/lib.rs @@ -58,14 +58,14 @@ impl FaketimeGuard { /// Set faketime #[cfg(feature = "enable_faketime")] pub fn set_faketime(&self, time: u64) { - FAKETIME.store(time, Ordering::Relaxed); + FAKETIME.store(time, Ordering::Release); FAKETIME_ENABLED.store(true, Ordering::SeqCst); } /// Disable faketime #[cfg(feature = "enable_faketime")] pub fn disable_faketime(&self) { - FAKETIME_ENABLED.store(false, Ordering::Relaxed); + FAKETIME_ENABLED.store(false, Ordering::Release); } }