diff --git a/crates/net/eth-wire-types/src/primitives.rs b/crates/net/eth-wire-types/src/primitives.rs index ff7ab1c801bd..1b0c16c0622d 100644 --- a/crates/net/eth-wire-types/src/primitives.rs +++ b/crates/net/eth-wire-types/src/primitives.rs @@ -21,6 +21,7 @@ pub trait NetworkPrimitives: + PartialEq + Eq + 'static; + /// The block body type. type BlockBody: Encodable + Decodable @@ -32,6 +33,7 @@ pub trait NetworkPrimitives: + PartialEq + Eq + 'static; + /// Full block type. type Block: Block
+ Encodable @@ -58,8 +60,10 @@ pub trait NetworkPrimitives: + PartialEq + Eq + 'static; + /// The transaction type which peers return in `PooledTransactions` messages. - type PooledTransaction: Encodable + type PooledTransaction: TryFrom + + Encodable + Decodable + Send + Sync diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index b352dfe31366..4a7167a8064f 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -48,7 +48,10 @@ use reth_network_p2p::{ }; use reth_network_peers::PeerId; use reth_network_types::ReputationChangeKind; -use reth_primitives::{PooledTransactionsElement, TransactionSigned}; +use reth_primitives::{ + transaction::SignedTransactionIntoRecoveredExt, PooledTransactionsElement, RecoveredTx, + TransactionSigned, +}; use reth_primitives_traits::{SignedTransaction, TxType}; use reth_tokio_util::EventStream; use reth_transaction_pool::{ @@ -272,7 +275,7 @@ pub struct TransactionsManager, /// Incoming events from the [`NetworkManager`](crate::NetworkManager). - transaction_events: UnboundedMeteredReceiver, + transaction_events: UnboundedMeteredReceiver>, /// How the `TransactionsManager` is configured. config: TransactionsManagerConfig, /// `TransactionsManager` metrics @@ -697,13 +700,15 @@ where impl TransactionsManager where - Pool: TransactionPool, + Pool: TransactionPool + 'static, N: NetworkPrimitives< BroadcastedTransaction: SignedTransaction, PooledTransaction: SignedTransaction, >, - Pool::Transaction: - PoolTransaction>, + Pool::Transaction: PoolTransaction< + Consensus = N::BroadcastedTransaction, + Pooled: Into + From>, + >, { /// Invoked when transactions in the local mempool are considered __pending__. /// @@ -1099,16 +1104,9 @@ where _ => {} } } -} -impl TransactionsManager -where - Pool: TransactionPool + 'static, - Pool::Transaction: - PoolTransaction>, -{ /// Handles dedicated transaction events related to the `eth` protocol. - fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { + fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => { // ensure we didn't receive any blob transactions as these are disallowed to be @@ -1119,7 +1117,7 @@ where let non_blob_txs = msg .0 .into_iter() - .map(PooledTransactionsElement::try_from_broadcast) + .map(N::PooledTransaction::try_from) .filter_map(Result::ok) .collect(); @@ -1146,7 +1144,7 @@ where fn import_transactions( &mut self, peer_id: PeerId, - transactions: PooledTransactions, + transactions: PooledTransactions, source: TransactionSource, ) { // If the node is pipeline syncing, ignore transactions @@ -1162,7 +1160,7 @@ where // mark the transactions as received self.transaction_fetcher - .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.hash())); + .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash())); // track that the peer knows these transaction, but only if this is a new broadcast. // If we received the transactions as the response to our `GetPooledTransactions`` @@ -1170,7 +1168,7 @@ where // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`. let mut num_already_seen_by_peer = 0; for tx in &transactions { - if source.is_broadcast() && !peer.seen_transactions.insert(*tx.hash()) { + if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) { num_already_seen_by_peer += 1; } } @@ -1199,7 +1197,7 @@ where Err(badtx) => { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hash=%badtx.hash(), + hash=%badtx.tx_hash(), client_version=%peer.client_version, "failed ecrecovery for transaction" ); @@ -1208,22 +1206,23 @@ where } }; - match self.transactions_by_peers.entry(*tx.hash()) { + match self.transactions_by_peers.entry(*tx.tx_hash()) { Entry::Occupied(mut entry) => { // transaction was already inserted entry.get_mut().insert(peer_id); } Entry::Vacant(entry) => { - if self.bad_imports.contains(tx.hash()) { + if self.bad_imports.contains(tx.tx_hash()) { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hash=%tx.hash(), + hash=%tx.tx_hash(), client_version=%peer.client_version, "received a known bad transaction from peer" ); has_bad_transactions = true; } else { // this is a new transaction that should be imported into the pool + let pool_transaction = Pool::Transaction::from_pooled(tx.into()); new_txs.push(pool_transaction); @@ -1285,7 +1284,7 @@ where } /// Processes a [`FetchEvent`]. - fn on_fetch_event(&mut self, fetch_event: FetchEvent) { + fn on_fetch_event(&mut self, fetch_event: FetchEvent) { match fetch_event { FetchEvent::TransactionsFetched { peer_id, transactions } => { self.import_transactions(peer_id, transactions, TransactionSource::Response);