Skip to content

Commit

Permalink
feat: make import_transactions use network generics (#13110)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Dec 3, 2024
1 parent 0fcc6cf commit 35cfd41
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
6 changes: 5 additions & 1 deletion crates/net/eth-wire-types/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub trait NetworkPrimitives:
+ PartialEq
+ Eq
+ 'static;

/// The block body type.
type BlockBody: Encodable
+ Decodable
Expand All @@ -32,6 +33,7 @@ pub trait NetworkPrimitives:
+ PartialEq
+ Eq
+ 'static;

/// Full block type.
type Block: Block<Header = Self::BlockHeader, Body = Self::BlockBody>
+ Encodable
Expand All @@ -58,8 +60,10 @@ pub trait NetworkPrimitives:
+ PartialEq
+ Eq
+ 'static;

/// The transaction type which peers return in `PooledTransactions` messages.
type PooledTransaction: Encodable
type PooledTransaction: TryFrom<Self::BroadcastedTransaction>
+ Encodable
+ Decodable
+ Send
+ Sync
Expand Down
43 changes: 21 additions & 22 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use reth_primitives::{PooledTransactionsElement, TransactionSigned};
use reth_primitives::{
transaction::SignedTransactionIntoRecoveredExt, PooledTransactionsElement, RecoveredTx,
TransactionSigned,
};
use reth_primitives_traits::{SignedTransaction, TxType};
use reth_tokio_util::EventStream;
use reth_transaction_pool::{
Expand Down Expand Up @@ -272,7 +275,7 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
/// - account has enough balance to cover the transaction's gas
pending_transactions: ReceiverStream<TxHash>,
/// Incoming events from the [`NetworkManager`](crate::NetworkManager).
transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent>,
transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
/// How the `TransactionsManager` is configured.
config: TransactionsManagerConfig,
/// `TransactionsManager` metrics
Expand Down Expand Up @@ -697,13 +700,15 @@ where

impl<Pool, N> TransactionsManager<Pool, N>
where
Pool: TransactionPool,
Pool: TransactionPool + 'static,
N: NetworkPrimitives<
BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction,
>,
Pool::Transaction:
PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled: Into<N::PooledTransaction>>,
Pool::Transaction: PoolTransaction<
Consensus = N::BroadcastedTransaction,
Pooled: Into<N::PooledTransaction> + From<RecoveredTx<N::PooledTransaction>>,
>,
{
/// Invoked when transactions in the local mempool are considered __pending__.
///
Expand Down Expand Up @@ -1099,16 +1104,9 @@ where
_ => {}
}
}
}

impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
Pool::Transaction:
PoolTransaction<Consensus = TransactionSigned, Pooled: Into<PooledTransactionsElement>>,
{
/// Handles dedicated transaction events related to the `eth` protocol.
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
match event {
NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
// ensure we didn't receive any blob transactions as these are disallowed to be
Expand All @@ -1119,7 +1117,7 @@ where
let non_blob_txs = msg
.0
.into_iter()
.map(PooledTransactionsElement::try_from_broadcast)
.map(N::PooledTransaction::try_from)
.filter_map(Result::ok)
.collect();

Expand All @@ -1146,7 +1144,7 @@ where
fn import_transactions(
&mut self,
peer_id: PeerId,
transactions: PooledTransactions,
transactions: PooledTransactions<N::PooledTransaction>,
source: TransactionSource,
) {
// If the node is pipeline syncing, ignore transactions
Expand All @@ -1162,15 +1160,15 @@ where

// mark the transactions as received
self.transaction_fetcher
.remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.hash()));
.remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));

// track that the peer knows these transaction, but only if this is a new broadcast.
// If we received the transactions as the response to our `GetPooledTransactions``
// requests (based on received `NewPooledTransactionHashes`) then we already
// recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
let mut num_already_seen_by_peer = 0;
for tx in &transactions {
if source.is_broadcast() && !peer.seen_transactions.insert(*tx.hash()) {
if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
num_already_seen_by_peer += 1;
}
}
Expand Down Expand Up @@ -1199,7 +1197,7 @@ where
Err(badtx) => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%badtx.hash(),
hash=%badtx.tx_hash(),
client_version=%peer.client_version,
"failed ecrecovery for transaction"
);
Expand All @@ -1208,22 +1206,23 @@ where
}
};

match self.transactions_by_peers.entry(*tx.hash()) {
match self.transactions_by_peers.entry(*tx.tx_hash()) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
entry.get_mut().insert(peer_id);
}
Entry::Vacant(entry) => {
if self.bad_imports.contains(tx.hash()) {
if self.bad_imports.contains(tx.tx_hash()) {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%tx.hash(),
hash=%tx.tx_hash(),
client_version=%peer.client_version,
"received a known bad transaction from peer"
);
has_bad_transactions = true;
} else {
// this is a new transaction that should be imported into the pool

let pool_transaction = Pool::Transaction::from_pooled(tx.into());
new_txs.push(pool_transaction);

Expand Down Expand Up @@ -1285,7 +1284,7 @@ where
}

/// Processes a [`FetchEvent`].
fn on_fetch_event(&mut self, fetch_event: FetchEvent) {
fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
match fetch_event {
FetchEvent::TransactionsFetched { peer_id, transactions } => {
self.import_transactions(peer_id, transactions, TransactionSource::Response);
Expand Down

0 comments on commit 35cfd41

Please sign in to comment.