Skip to content

Commit

Permalink
TransactionsHandle propagation commands should not adhere to caching (p…
Browse files Browse the repository at this point in the history
…aradigmxyz#12079)

Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
Parikalp-Bhardwaj and mattsse authored Oct 27, 2024
1 parent b7b3f81 commit 0c51609
Showing 1 changed file with 93 additions and 20 deletions.
113 changes: 93 additions & 20 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ where
fn propagate_all(&mut self, hashes: Vec<TxHash>) {
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
PropagationMode::Basic,
);

// notify pool so events get fired
Expand All @@ -431,6 +432,7 @@ where
fn propagate_transactions(
&mut self,
to_propagate: Vec<PropagateTransaction>,
propagation_mode: PropagationMode,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
if self.network.tx_gossip_disabled() {
Expand All @@ -449,14 +451,18 @@ where
PropagateTransactionsBuilder::full(peer.version)
};

// Iterate through the transactions to propagate and fill the hashes and full
// transaction lists, before deciding whether or not to send full transactions to the
// peer.
for tx in &to_propagate {
// Only proceed if the transaction is not in the peer's list of seen transactions
if !peer.seen_transactions.contains(&tx.hash()) {
// add transaction to the list of hashes to propagate
builder.push(tx);
if propagation_mode.is_forced() {
builder.extend(to_propagate.iter());
} else {
// Iterate through the transactions to propagate and fill the hashes and full
// transaction lists, before deciding whether or not to send full transactions to
// the peer.
for tx in &to_propagate {
// Only proceed if the transaction is not in the peer's list of seen
// transactions
if !peer.seen_transactions.contains(&tx.hash()) {
builder.push(tx);
}
}
}

Expand Down Expand Up @@ -514,6 +520,7 @@ where
&mut self,
txs: Vec<TxHash>,
peer_id: PeerId,
propagation_mode: PropagationMode,
) -> Option<PropagatedTransactions> {
trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");

Expand All @@ -525,10 +532,17 @@ where

let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new);

// Iterate through the transactions to propagate and fill the hashes and full transaction
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
full_transactions.push(&tx);
if propagation_mode.is_forced() {
// skip cache check if forced
full_transactions.extend(to_propagate);
} else {
// Iterate through the transactions to propagate and fill the hashes and full
// transaction
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
// Only include if the peer hasn't seen the transaction
full_transactions.push(&tx);
}
}
}

Expand All @@ -546,6 +560,7 @@ where
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}

// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
}
Expand All @@ -557,6 +572,7 @@ where
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}

// send full transactions
self.network.send_transactions(peer_id, new_full_transactions);
}
Expand All @@ -570,7 +586,12 @@ where
/// Propagate the transaction hashes to the given peer
///
/// Note: This will only send the hashes for transactions that exist in the pool.
fn propagate_hashes_to(&mut self, hashes: Vec<TxHash>, peer_id: PeerId) {
fn propagate_hashes_to(
&mut self,
hashes: Vec<TxHash>,
peer_id: PeerId,
propagation_mode: PropagationMode,
) {
trace!(target: "net::tx", "Start propagating transactions as hashes");

// This fetches a transactions from the pool, including the blob transactions, which are
Expand All @@ -589,9 +610,14 @@ where
// check if transaction is known to peer
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);

for tx in to_propagate {
if !peer.seen_transactions.insert(tx.hash()) {
hashes.push(&tx);
if propagation_mode.is_forced() {
hashes.extend(to_propagate)
} else {
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
// Include if the peer hasn't seen it
hashes.push(&tx);
}
}
}

Expand Down Expand Up @@ -880,14 +906,16 @@ where
self.on_new_pending_transactions(vec![hash])
}
TransactionsCommand::PropagateHashesTo(hashes, peer) => {
self.propagate_hashes_to(hashes, peer)
self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
}
TransactionsCommand::GetActivePeers(tx) => {
let peers = self.peers.keys().copied().collect::<HashSet<_>>();
tx.send(peers).ok();
}
TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
if let Some(propagated) = self.propagate_full_transactions_to_peer(txs, peer) {
if let Some(propagated) =
self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
{
self.pool.on_propagated(propagated);
}
}
Expand Down Expand Up @@ -1395,6 +1423,29 @@ where
}
}

/// Represents the different modes of transaction propagation.
///
/// This enum is used to determine how transactions are propagated to peers in the network.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum PropagationMode {
/// Default propagation mode.
///
/// Transactions are only sent to peers that haven't seen them yet.
Basic,
/// Forced propagation mode.
///
/// Transactions are sent to all peers regardless of whether they have been sent or received
/// before.
Forced,
}

impl PropagationMode {
/// Returns `true` if the propagation kind is `Forced`.
const fn is_forced(self) -> bool {
matches!(self, Self::Forced)
}
}

/// A transaction that's about to be propagated to multiple peers.
#[derive(Debug, Clone)]
struct PropagateTransaction {
Expand Down Expand Up @@ -1441,6 +1492,13 @@ impl PropagateTransactionsBuilder {
Self::Full(FullTransactionsBuilder::new(version))
}

/// Appends all transactions
fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction>) {
for tx in txs {
self.push(tx);
}
}

/// Appends a transaction to the list.
fn push(&mut self, transaction: &PropagateTransaction) {
match self {
Expand Down Expand Up @@ -1502,6 +1560,13 @@ impl FullTransactionsBuilder {
}
}

/// Appends all transactions.
fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction>) {
for tx in txs {
self.push(&tx)
}
}

/// Append a transaction to the list of full transaction if the total message bytes size doesn't
/// exceed the soft maximum target byte size. The limit is soft, meaning if one single
/// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
Expand Down Expand Up @@ -1581,6 +1646,13 @@ impl PooledTransactionsHashesBuilder {
}
}

/// Appends all hashes
fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction>) {
for tx in txs {
self.push(&tx);
}
}

fn push(&mut self, tx: &PropagateTransaction) {
match self {
Self::Eth66(msg) => msg.0.push(tx.hash()),
Expand Down Expand Up @@ -2388,7 +2460,8 @@ mod tests {
let eip4844_tx = Arc::new(factory.create_eip4844());
propagate.push(PropagateTransaction::new(eip4844_tx.clone()));

let propagated = tx_manager.propagate_transactions(propagate.clone());
let propagated =
tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
assert_eq!(propagated.0.len(), 2);
let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
Expand All @@ -2404,7 +2477,7 @@ mod tests {
peer.seen_transactions.contains(eip4844_tx.transaction.hash());

// propagate again
let propagated = tx_manager.propagate_transactions(propagate);
let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
assert!(propagated.0.is_empty());
}
}

0 comments on commit 0c51609

Please sign in to comment.