Skip to content

Commit

Permalink
Merge pull request #4339 from nervosnetwork/yukang-conflicted-pool
Browse files Browse the repository at this point in the history
Add conflicts cache for tx_pool to record conflicted transactions
  • Loading branch information
chenyukang authored Mar 12, 2024
2 parents 0478dd8 + 39daa1f commit 0ad645f
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 43 deletions.
3 changes: 3 additions & 0 deletions rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4562,6 +4562,7 @@ Response
"timestamp": "0x17c983e6e44"
}
},
"conflicted": [],
"proposed": {}
}
}
Expand Down Expand Up @@ -6702,6 +6703,8 @@ Tx-pool entries object

`TxPoolEntries` is a JSON object with the following fields.

* `conflicted`: `Array<` [`H256`](#type-h256) `>` - Conflicted tx hash vec

* `pending`: - Pending tx verbose info

* `proposed`: - Proposed tx verbose info
Expand Down
1 change: 1 addition & 0 deletions rpc/src/module/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ pub trait PoolRpc {
/// "timestamp": "0x17c983e6e44"
/// }
/// },
/// "conflicted": [],
/// "proposed": {}
/// }
/// }
Expand Down
1 change: 0 additions & 1 deletion sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ impl Relayer {

if !short_ids_set.is_empty() {
let tx_pool = self.shared.shared().tx_pool_controller();

let fetch_txs = tx_pool.fetch_txs(short_ids_set);
if let Err(e) = fetch_txs {
return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
Expand Down
2 changes: 2 additions & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(LongForks),
Box::new(ForksContainSameTransactions),
Box::new(ForksContainSameUncle),
Box::new(SendConflictTxToRelay),
Box::new(SendConflictTxToRelayRBF),
Box::new(WithdrawDAO),
Box::new(WithdrawDAOWithOverflowCapacity),
Box::new(DAOWithSatoshiCellOccupied),
Expand Down
163 changes: 155 additions & 8 deletions test/src/specs/tx_pool/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::{
rpc::RpcClient,
util::{
cell::gen_spendable,
transaction::{always_success_transaction, always_success_transactions},
transaction::{
always_success_transaction, always_success_transactions, get_tx_pool_conflicts,
},
},
utils::wait_until,
Node, Spec,
Expand Down Expand Up @@ -85,7 +87,8 @@ impl Spec for RbfBasic {
let res = node0
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_ok(), "tx2 should replace old tx");
assert!(res.is_ok(), "tx2 should replace with old tx");
assert_eq!(get_tx_pool_conflicts(node0), vec![tx1.hash().unpack()]);

let ret = node0
.rpc_client()
Expand Down Expand Up @@ -138,6 +141,7 @@ impl Spec for RbfBasic {
assert!(ret.transaction.is_none());
assert!(matches!(ret.tx_status.status, Status::Rejected));
assert!(ret.tx_status.reason.unwrap().contains("RBFRejected"));
assert_eq!(get_tx_pool_conflicts(node0), vec![tx1.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -165,6 +169,9 @@ impl Spec for RbfSameInput {
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_err(), "tx2 should be rejected");
let message = res.err().unwrap().to_string();
assert!(message.contains("PoolRejectedDuplicatedTransaction"));
assert_eq!(get_tx_pool_conflicts(node0), vec![]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -200,6 +207,7 @@ impl Spec for RbfOnlyForResolveDead {
.send_transaction_result(tx2.data().into());
let message = res.err().unwrap().to_string();
assert!(message.contains("TransactionFailedToResolve: Resolve failed Unknown"));
assert_eq!(get_tx_pool_conflicts(node0), vec![]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -246,6 +254,9 @@ impl Spec for RbfSameInputwithLessFee {
assert!(message.contains(
"Tx's current fee is 1000000000, expect it to >= 2000000363 to replace old txs"
));

// local submit tx RBF check failed, will be added into conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![tx2.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -306,6 +317,9 @@ impl Spec for RbfTooManyDescendants {
.unwrap()
.to_string()
.contains("Tx conflict with too many txs"));

// local submit tx RBF check failed, will not in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![tx2.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -378,6 +392,9 @@ impl Spec for RbfContainNewTx {
.unwrap()
.to_string()
.contains("new Tx contains unconfirmed inputs"));

// local submit tx RBF check failed, will be in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![tx2.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -450,6 +467,9 @@ impl Spec for RbfContainInvalidInput {
.unwrap()
.to_string()
.contains("new Tx contains inputs in descendants of to be replaced Tx"));

// local submit tx RBF check failed, will not in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![tx2.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -502,12 +522,12 @@ impl Spec for RbfChildPayForParent {
}

let clone_tx = txs[2].clone();
// Set tx2 fee to a higher value, but not enough to pay for tx5
// Set tx2 fee to a higher value, but not enough to pay for tx4
let output2 = CellOutputBuilder::default()
.capacity(capacity_bytes!(70).pack())
.build();

let tx2 = clone_tx
let new_tx = clone_tx
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
Expand All @@ -519,19 +539,22 @@ impl Spec for RbfChildPayForParent {

let res = node0
.rpc_client()
.send_transaction_result(tx2.data().into());
.send_transaction_result(new_tx.data().into());
assert!(res.is_err(), "tx2 should be rejected");
assert!(res
.err()
.unwrap()
.to_string()
.contains("RBF rejected: Tx's current fee is 3000000000, expect it to >= 5000000363 to replace old txs"));

// local submit tx RBF check failed, will be in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![new_tx.hash().unpack()]);

// let's try a new transaction with new higher fee
let output2 = CellOutputBuilder::default()
.capacity(capacity_bytes!(45).pack())
.build();
let tx2 = clone_tx
let new_tx_ok = clone_tx
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
Expand All @@ -542,8 +565,19 @@ impl Spec for RbfChildPayForParent {
.build();
let res = node0
.rpc_client()
.send_transaction_result(tx2.data().into());
.send_transaction_result(new_tx_ok.data().into());
assert!(res.is_ok());

// replaced txs are in conflicts pool
// tx2 tx3 tx4 is replaced
let mut expected: Vec<ckb_types::H256> = txs[2..=max_count - 1]
.iter()
.map(|tx| tx.hash().unpack())
.collect::<Vec<_>>();
expected.push(new_tx.hash().unpack());
expected.sort_unstable();
let conflicts = get_tx_pool_conflicts(node0);
assert_eq!(conflicts, expected);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -592,6 +626,8 @@ impl Spec for RbfContainInvalidCells {
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_err(), "tx2 should be rejected");
// script verification failed because of invalid cell dep, will not in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -806,6 +842,14 @@ impl Spec for RbfReplaceProposedSuccess {
let tx1_status = node0.rpc_client().get_transaction(txs[2].hash()).tx_status;
assert_eq!(tx1_status.status, Status::Rejected);

let mut expected = [
txs[2].hash().unpack(),
txs[3].hash().unpack(),
txs[4].hash().unpack(),
];
expected.sort_unstable();
assert_eq!(get_tx_pool_conflicts(node0), expected);

let window_count = node0.consensus().tx_proposal_window().closest();
node0.mine(window_count);
// since old tx is already in BlockAssembler,
Expand Down Expand Up @@ -840,7 +884,7 @@ impl Spec for RbfConcurrency {
let tx1 = node0.new_transaction(tx_hash_0.clone());

let mut conflicts = vec![tx1];
// tx1 capacity is 100, set other txs to higer fee
// tx1 capacity is 100, set other txs to higher fee
let fees = [
capacity_bytes!(83),
capacity_bytes!(82),
Expand Down Expand Up @@ -886,6 +930,14 @@ impl Spec for RbfConcurrency {
for s in status.iter().take(4) {
assert_eq!(*s, Status::Rejected);
}

let mut expected: Vec<ckb_types::H256> = conflicts
.iter()
.take(4)
.map(|x| x.hash().unpack())
.collect();
expected.sort_unstable();
assert_eq!(get_tx_pool_conflicts(node0), expected);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -952,9 +1004,104 @@ impl Spec for RbfCellDepsCheck {
.unwrap()
.to_string()
.contains("new Tx contains cell deps from conflicts"));
assert_eq!(get_tx_pool_conflicts(node0), vec![new_tx.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}

fn run_spec_send_conflict_relay(nodes: &mut [Node]) {
let node0 = &nodes[0];
let node1 = &nodes[1];

node1.mine_until_out_bootstrap_period();
node0.connect(node1);
info!("Generate large cycles tx");

node0.new_block_with_blocking(|template| template.number.value() != 13);
let tx_hash_0 = node0.generate_transaction();
info!("Generate 2 txs with same input");
let tx1 = node0.new_transaction(tx_hash_0.clone());

let output = CellOutputBuilder::default()
.capacity(capacity_bytes!(90).pack())
.build();

let tx1 = tx1.as_advanced_builder().set_outputs(vec![output]).build();
node0.rpc_client().send_transaction(tx1.data().into());

let result = wait_until(60, || {
node1.get_tip_block_number() == node0.get_tip_block_number()
});
assert!(result, "node0 can't sync with node1");

let result = wait_until(60, || {
node1
.rpc_client()
.get_transaction(tx1.hash())
.transaction
.is_some()
});
assert!(result, "Node0 should accept tx");
// node0 remove tx1 from tx_pool
node0.remove_transaction(tx1.hash());

// a new tx with same input and lower fee
// node0 will accept it and node1 will reject it and put it in conflicts pool
let tx2_temp = node0.new_transaction(tx_hash_0);
let output = CellOutputBuilder::default()
.capacity(capacity_bytes!(95).pack())
.build();

let tx2 = tx2_temp
.as_advanced_builder()
.set_outputs(vec![output])
.build();
let res = node0
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_ok(), "tx2 should be accepted by node0");

let _ = wait_until(60, || {
node1.get_tip_block_number() == node0.get_tip_block_number()
});

let _result = wait_until(60, || get_tx_pool_conflicts(node1).len() == 1);

let res = node1.get_transaction(tx2.hash());
assert_eq!(res.status, Status::Rejected);
let res = node1.get_transaction(tx1.hash());
assert_eq!(res.status, Status::Pending);
assert_eq!(get_tx_pool_conflicts(node1), vec![tx2.hash().unpack()]);
}

pub struct SendConflictTxToRelay;
impl Spec for SendConflictTxToRelay {
crate::setup!(num_nodes: 2, retry_failed: 5);

fn run(&self, nodes: &mut Vec<Node>) {
run_spec_send_conflict_relay(nodes);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.network.connect_outbound_interval_secs = 0;
config.tx_pool.min_fee_rate = ckb_types::core::FeeRate(1500);
}
}

pub struct SendConflictTxToRelayRBF;
impl Spec for SendConflictTxToRelayRBF {
crate::setup!(num_nodes: 2, retry_failed: 5);

fn run(&self, nodes: &mut Vec<Node>) {
run_spec_send_conflict_relay(nodes);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.network.connect_outbound_interval_secs = 0;
config.tx_pool.min_fee_rate = ckb_types::core::FeeRate(1000);
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}
14 changes: 14 additions & 0 deletions test/src/util/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::util::cell::{as_input, as_inputs, as_output, as_outputs};
use crate::{Net, Node};
use ckb_jsonrpc_types::{RawTxPool, TxPoolEntries};
use ckb_network::SupportProtocols;
use ckb_types::{
bytes::Bytes,
Expand Down Expand Up @@ -81,3 +82,16 @@ pub fn relay_tx(net: &Net, node: &Node, tx: TransactionView, cycles: u64) {
.build();
net.send(node, SupportProtocols::RelayV3, tx_msg.as_bytes());
}

pub fn get_tx_pool_conflicts(node: &Node) -> Vec<ckb_types::H256> {
let tx_pool_raw = node.rpc_client().get_raw_tx_pool(Some(true));
match tx_pool_raw {
RawTxPool::Verbose(TxPoolEntries { mut conflicted, .. }) => {
conflicted.sort_unstable();
conflicted
}
_ => {
panic!("tx_pool_raw is None");
}
}
}
5 changes: 5 additions & 0 deletions tx-pool/src/component/pool_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ impl PoolMap {
.collect()
}

pub(crate) fn find_conflict_outpoint(&self, tx: &TransactionView) -> Option<OutPoint> {
tx.input_pts_iter()
.find_map(|out_point| self.edges.get_input_ref(&out_point).map(|_| out_point))
}

pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec<ConflictEntry> {
let mut conflicts = Vec::new();

Expand Down
Loading

0 comments on commit 0ad645f

Please sign in to comment.