diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 439f92bada9a..b6e589c6ff79 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -416,6 +416,7 @@ where fn propagate_all(&mut self, hashes: Vec) { let propagated = self.propagate_transactions( self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(), + PropagationMode::Basic, ); // notify pool so events get fired @@ -431,6 +432,7 @@ where fn propagate_transactions( &mut self, to_propagate: Vec, + propagation_mode: PropagationMode, ) -> PropagatedTransactions { let mut propagated = PropagatedTransactions::default(); if self.network.tx_gossip_disabled() { @@ -449,14 +451,18 @@ where PropagateTransactionsBuilder::full(peer.version) }; - // Iterate through the transactions to propagate and fill the hashes and full - // transaction lists, before deciding whether or not to send full transactions to the - // peer. - for tx in &to_propagate { - // Only proceed if the transaction is not in the peer's list of seen transactions - if !peer.seen_transactions.contains(&tx.hash()) { - // add transaction to the list of hashes to propagate - builder.push(tx); + if propagation_mode.is_forced() { + builder.extend(to_propagate.iter()); + } else { + // Iterate through the transactions to propagate and fill the hashes and full + // transaction lists, before deciding whether or not to send full transactions to + // the peer. + for tx in &to_propagate { + // Only proceed if the transaction is not in the peer's list of seen + // transactions + if !peer.seen_transactions.contains(&tx.hash()) { + builder.push(tx); + } } } @@ -514,6 +520,7 @@ where &mut self, txs: Vec, peer_id: PeerId, + propagation_mode: PropagationMode, ) -> Option { trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer"); @@ -525,10 +532,17 @@ where let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new); - // Iterate through the transactions to propagate and fill the hashes and full transaction - for tx in to_propagate { - if !peer.seen_transactions.contains(&tx.hash()) { - full_transactions.push(&tx); + if propagation_mode.is_forced() { + // skip cache check if forced + full_transactions.extend(to_propagate); + } else { + // Iterate through the transactions to propagate and fill the hashes and full + // transaction + for tx in to_propagate { + if !peer.seen_transactions.contains(&tx.hash()) { + // Only include if the peer hasn't seen the transaction + full_transactions.push(&tx); + } } } @@ -546,6 +560,7 @@ where // mark transaction as seen by peer peer.seen_transactions.insert(hash); } + // send hashes of transactions self.network.send_transactions_hashes(peer_id, new_pooled_hashes); } @@ -557,6 +572,7 @@ where // mark transaction as seen by peer peer.seen_transactions.insert(tx.hash()); } + // send full transactions self.network.send_transactions(peer_id, new_full_transactions); } @@ -570,7 +586,12 @@ where /// Propagate the transaction hashes to the given peer /// /// Note: This will only send the hashes for transactions that exist in the pool. - fn propagate_hashes_to(&mut self, hashes: Vec, peer_id: PeerId) { + fn propagate_hashes_to( + &mut self, + hashes: Vec, + peer_id: PeerId, + propagation_mode: PropagationMode, + ) { trace!(target: "net::tx", "Start propagating transactions as hashes"); // This fetches a transactions from the pool, including the blob transactions, which are @@ -589,9 +610,14 @@ where // check if transaction is known to peer let mut hashes = PooledTransactionsHashesBuilder::new(peer.version); - for tx in to_propagate { - if !peer.seen_transactions.insert(tx.hash()) { - hashes.push(&tx); + if propagation_mode.is_forced() { + hashes.extend(to_propagate) + } else { + for tx in to_propagate { + if !peer.seen_transactions.contains(&tx.hash()) { + // Include if the peer hasn't seen it + hashes.push(&tx); + } } } @@ -880,14 +906,16 @@ where self.on_new_pending_transactions(vec![hash]) } TransactionsCommand::PropagateHashesTo(hashes, peer) => { - self.propagate_hashes_to(hashes, peer) + self.propagate_hashes_to(hashes, peer, PropagationMode::Forced) } TransactionsCommand::GetActivePeers(tx) => { let peers = self.peers.keys().copied().collect::>(); tx.send(peers).ok(); } TransactionsCommand::PropagateTransactionsTo(txs, peer) => { - if let Some(propagated) = self.propagate_full_transactions_to_peer(txs, peer) { + if let Some(propagated) = + self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced) + { self.pool.on_propagated(propagated); } } @@ -1395,6 +1423,29 @@ where } } +/// Represents the different modes of transaction propagation. +/// +/// This enum is used to determine how transactions are propagated to peers in the network. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +enum PropagationMode { + /// Default propagation mode. + /// + /// Transactions are only sent to peers that haven't seen them yet. + Basic, + /// Forced propagation mode. + /// + /// Transactions are sent to all peers regardless of whether they have been sent or received + /// before. + Forced, +} + +impl PropagationMode { + /// Returns `true` if the propagation kind is `Forced`. + const fn is_forced(self) -> bool { + matches!(self, Self::Forced) + } +} + /// A transaction that's about to be propagated to multiple peers. #[derive(Debug, Clone)] struct PropagateTransaction { @@ -1441,6 +1492,13 @@ impl PropagateTransactionsBuilder { Self::Full(FullTransactionsBuilder::new(version)) } + /// Appends all transactions + fn extend<'a>(&mut self, txs: impl IntoIterator) { + for tx in txs { + self.push(tx); + } + } + /// Appends a transaction to the list. fn push(&mut self, transaction: &PropagateTransaction) { match self { @@ -1502,6 +1560,13 @@ impl FullTransactionsBuilder { } } + /// Appends all transactions. + fn extend(&mut self, txs: impl IntoIterator) { + for tx in txs { + self.push(&tx) + } + } + /// Append a transaction to the list of full transaction if the total message bytes size doesn't /// exceed the soft maximum target byte size. The limit is soft, meaning if one single /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`] @@ -1581,6 +1646,13 @@ impl PooledTransactionsHashesBuilder { } } + /// Appends all hashes + fn extend(&mut self, txs: impl IntoIterator) { + for tx in txs { + self.push(&tx); + } + } + fn push(&mut self, tx: &PropagateTransaction) { match self { Self::Eth66(msg) => msg.0.push(tx.hash()), @@ -2388,7 +2460,8 @@ mod tests { let eip4844_tx = Arc::new(factory.create_eip4844()); propagate.push(PropagateTransaction::new(eip4844_tx.clone())); - let propagated = tx_manager.propagate_transactions(propagate.clone()); + let propagated = + tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic); assert_eq!(propagated.0.len(), 2); let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap(); assert_eq!(prop_txs.len(), 1); @@ -2404,7 +2477,7 @@ mod tests { peer.seen_transactions.contains(eip4844_tx.transaction.hash()); // propagate again - let propagated = tx_manager.propagate_transactions(propagate); + let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic); assert!(propagated.0.is_empty()); } }