diff --git a/Cargo.lock b/Cargo.lock index 9514d361fbfa..973ad4718720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7800,6 +7800,7 @@ dependencies = [ "reth-dns-discovery", "reth-ecies", "reth-eth-wire", + "reth-eth-wire-types", "reth-fs-util", "reth-metrics", "reth-net-banlist", diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 09f81e63e545..dde0b4a0b230 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -24,6 +24,7 @@ reth-discv4.workspace = true reth-discv5.workspace = true reth-dns-discovery.workspace = true reth-eth-wire.workspace = true +reth-eth-wire-types.workspace = true reth-ecies.workspace = true reth-tasks.workspace = true reth-transaction-pool.workspace = true @@ -111,6 +112,7 @@ serde = [ "reth-dns-discovery/serde", "reth-eth-wire/serde", "reth-provider?/serde", + "reth-eth-wire-types/serde", "alloy-consensus/serde", "alloy-eips/serde", "alloy-primitives/serde", @@ -118,7 +120,7 @@ serde = [ "parking_lot/serde", "rand/serde", "smallvec/serde", - "url/serde" + "url/serde", ] test-utils = [ "dep:reth-provider", diff --git a/crates/net/network/src/builder.rs b/crates/net/network/src/builder.rs index e6a5d9566419..31038906b25f 100644 --- a/crates/net/network/src/builder.rs +++ b/crates/net/network/src/builder.rs @@ -1,5 +1,6 @@ //! Builder support for configuring the entire setup. +use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives}; use reth_network_api::test_utils::PeersHandleProvider; use reth_transaction_pool::TransactionPool; use tokio::sync::mpsc; @@ -16,8 +17,8 @@ pub(crate) const ETH_REQUEST_CHANNEL_CAPACITY: usize = 256; /// A builder that can configure all components of the network. #[allow(missing_debug_implementations)] -pub struct NetworkBuilder { - pub(crate) network: NetworkManager, +pub struct NetworkBuilder { + pub(crate) network: NetworkManager, pub(crate) transactions: Tx, pub(crate) request_handler: Eth, } diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 96aef249d9f9..db7b384c2b34 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -6,7 +6,9 @@ use reth_chainspec::{ChainSpecProvider, EthChainSpec, Hardforks}; use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS}; use reth_discv5::NetworkStackId; use reth_dns_discovery::DnsDiscoveryConfig; -use reth_eth_wire::{HelloMessage, HelloMessageWithProtocols, Status}; +use reth_eth_wire::{ + EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status, +}; use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer}; use reth_network_types::{PeersConfig, SessionsConfig}; use reth_primitives::{ForkFilter, Head}; @@ -32,7 +34,7 @@ pub fn rng_secret_key() -> SecretKey { /// All network related initialization settings. #[derive(Debug)] -pub struct NetworkConfig { +pub struct NetworkConfig { /// The client type that can interact with the chain. /// /// This type is used to fetch the block number after we established a session and received the @@ -66,7 +68,7 @@ pub struct NetworkConfig { /// first hardfork, `Frontier` for mainnet. pub fork_filter: ForkFilter, /// The block importer type. - pub block_import: Box, + pub block_import: Box>, /// The default mode of the network. pub network_mode: NetworkMode, /// The executor to use for spawning tasks. @@ -87,9 +89,9 @@ pub struct NetworkConfig { // === impl NetworkConfig === -impl NetworkConfig<()> { +impl NetworkConfig<(), N> { /// Convenience method for creating the corresponding builder type - pub fn builder(secret_key: SecretKey) -> NetworkConfigBuilder { + pub fn builder(secret_key: SecretKey) -> NetworkConfigBuilder { NetworkConfigBuilder::new(secret_key) } @@ -99,7 +101,7 @@ impl NetworkConfig<()> { } } -impl NetworkConfig { +impl NetworkConfig { /// Create a new instance with all mandatory fields set, rest is field with defaults. pub fn new(client: C, secret_key: SecretKey) -> Self where @@ -134,12 +136,13 @@ impl NetworkConfig { } } -impl NetworkConfig +impl NetworkConfig where C: BlockNumReader + 'static, + N: NetworkPrimitives, { /// Convenience method for calling [`NetworkManager::new`]. - pub async fn manager(self) -> Result { + pub async fn manager(self) -> Result, NetworkError> { NetworkManager::new(self).await } } @@ -164,7 +167,7 @@ where /// Builder for [`NetworkConfig`](struct.NetworkConfig.html). #[derive(Debug)] -pub struct NetworkConfigBuilder { +pub struct NetworkConfigBuilder { /// The node's secret key, from which the node's identity is derived. secret_key: SecretKey, /// How to configure discovery over DNS. @@ -196,7 +199,7 @@ pub struct NetworkConfigBuilder { /// Whether tx gossip is disabled tx_gossip_disabled: bool, /// The block importer type - block_import: Option>, + block_import: Option>>, /// How to instantiate transactions manager. transactions_manager_config: TransactionsManagerConfig, /// The NAT resolver for external IP @@ -206,7 +209,7 @@ pub struct NetworkConfigBuilder { // === impl NetworkConfigBuilder === #[allow(missing_docs)] -impl NetworkConfigBuilder { +impl NetworkConfigBuilder { /// Create a new builder instance with a random secret key. pub fn with_rng_secret_key() -> Self { Self::new(rng_secret_key()) @@ -480,7 +483,7 @@ impl NetworkConfigBuilder { } /// Sets the block import type. - pub fn block_import(mut self, block_import: Box) -> Self { + pub fn block_import(mut self, block_import: Box>) -> Self { self.block_import = Some(block_import); self } @@ -490,7 +493,7 @@ impl NetworkConfigBuilder { pub fn build_with_noop_provider( self, chain_spec: Arc, - ) -> NetworkConfig> + ) -> NetworkConfig, N> where ChainSpec: EthChainSpec + Hardforks + 'static, { @@ -509,7 +512,7 @@ impl NetworkConfigBuilder { /// The given client is to be used for interacting with the chain, for example fetching the /// corresponding block for a given block hash we receive from a peer in the status message when /// establishing a connection. - pub fn build(self, client: C) -> NetworkConfig + pub fn build(self, client: C) -> NetworkConfig where C: ChainSpecProvider, { diff --git a/crates/net/network/src/eth_requests.rs b/crates/net/network/src/eth_requests.rs index 1f20be53967d..8121b9675ed3 100644 --- a/crates/net/network/src/eth_requests.rs +++ b/crates/net/network/src/eth_requests.rs @@ -12,8 +12,8 @@ use alloy_eips::BlockHashOrNumber; use alloy_rlp::Encodable; use futures::StreamExt; use reth_eth_wire::{ - BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, - HeadersDirection, NodeData, Receipts, + BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData, + GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts, }; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::error::RequestResult; @@ -54,7 +54,7 @@ const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024; /// This can be spawned to another task and is supposed to be run as background service. #[derive(Debug)] #[must_use = "Manager does nothing unless polled."] -pub struct EthRequestHandler { +pub struct EthRequestHandler { /// The client type that can interact with the chain. client: C, /// Used for reporting peers. @@ -62,15 +62,15 @@ pub struct EthRequestHandler { #[allow(dead_code)] peers: PeersHandle, /// Incoming request from the [`NetworkManager`](crate::NetworkManager). - incoming_requests: ReceiverStream, + incoming_requests: ReceiverStream>, /// Metrics for the eth request handler. metrics: EthRequestHandlerMetrics, } // === impl EthRequestHandler === -impl EthRequestHandler { +impl EthRequestHandler { /// Create a new instance - pub fn new(client: C, peers: PeersHandle, incoming: Receiver) -> Self { + pub fn new(client: C, peers: PeersHandle, incoming: Receiver>) -> Self { Self { client, peers, @@ -148,7 +148,7 @@ where &self, _peer_id: PeerId, request: GetBlockHeaders, - response: oneshot::Sender>, + response: oneshot::Sender>>, ) { self.metrics.eth_headers_requests_received_total.increment(1); let headers = self.get_headers_response(request); @@ -159,7 +159,7 @@ where &self, _peer_id: PeerId, request: GetBlockBodies, - response: oneshot::Sender>, + response: oneshot::Sender>>, ) { self.metrics.eth_bodies_requests_received_total.increment(1); let mut bodies = Vec::new(); @@ -272,7 +272,7 @@ where /// All `eth` request related to blocks delegated by the network. #[derive(Debug)] -pub enum IncomingEthRequest { +pub enum IncomingEthRequest { /// Request Block headers from the peer. /// /// The response should be sent through the channel. @@ -282,7 +282,7 @@ pub enum IncomingEthRequest { /// The specific block headers requested. request: GetBlockHeaders, /// The channel sender for the response containing block headers. - response: oneshot::Sender>, + response: oneshot::Sender>>, }, /// Request Block bodies from the peer. /// @@ -293,7 +293,7 @@ pub enum IncomingEthRequest { /// The specific block bodies requested. request: GetBlockBodies, /// The channel sender for the response containing block bodies. - response: oneshot::Sender>, + response: oneshot::Sender>>, }, /// Request Node Data from the peer. /// diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 0e433a388628..0eae99e7c50a 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -46,7 +46,9 @@ //! //! ``` //! # async fn launch() { -//! use reth_network::{config::rng_secret_key, NetworkConfig, NetworkManager}; +//! use reth_network::{ +//! config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager, +//! }; //! use reth_network_peers::mainnet_nodes; //! use reth_provider::test_utils::NoopProvider; //! @@ -59,7 +61,7 @@ //! let config = NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client); //! //! // create the network instance -//! let network = NetworkManager::new(config).await.unwrap(); +//! let network = NetworkManager::::new(config).await.unwrap(); //! //! // keep a handle to the network and spawn it //! let handle = network.handle().clone(); @@ -138,6 +140,7 @@ mod state; mod swarm; pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols}; +pub use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives}; pub use reth_network_api::{ BlockDownloaderProvider, DiscoveredEvent, DiscoveryEvent, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, PeerRequest, PeerRequestSender, Peers, PeersInfo, diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 3a7f94985fc9..0738be1bcac2 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -29,7 +29,10 @@ use std::{ use futures::{Future, StreamExt}; use parking_lot::Mutex; -use reth_eth_wire::{capability::CapabilityMessage, Capabilities, DisconnectReason}; +use reth_eth_wire::{ + capability::CapabilityMessage, Capabilities, DisconnectReason, EthNetworkPrimitives, + NetworkPrimitives, +}; use reth_fs_util::{self as fs, FsPathError}; use reth_metrics::common::mpsc::UnboundedMeteredSender; use reth_network_api::{ @@ -76,17 +79,17 @@ use crate::{ /// include_mmd!("docs/mermaid/network-manager.mmd") #[derive(Debug)] #[must_use = "The NetworkManager does nothing unless polled"] -pub struct NetworkManager { +pub struct NetworkManager { /// The type that manages the actual network part, which includes connections. - swarm: Swarm, + swarm: Swarm, /// Underlying network handle that can be shared. - handle: NetworkHandle, + handle: NetworkHandle, /// Receiver half of the command channel set up between this type and the [`NetworkHandle`] - from_handle_rx: UnboundedReceiverStream, + from_handle_rx: UnboundedReceiverStream>, /// Handles block imports according to the `eth` protocol. - block_import: Box, + block_import: Box>, /// Sender for high level network events. - event_sender: EventSender, + event_sender: EventSender>>, /// Sender half to send events to the /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured. to_transactions_manager: Option>, @@ -103,7 +106,7 @@ pub struct NetworkManager { /// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with /// requests. This channel size is set at /// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY) - to_eth_request_handler: Option>, + to_eth_request_handler: Option>>, /// Tracks the number of active session (connected peers). /// /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`] @@ -116,7 +119,7 @@ pub struct NetworkManager { } // === impl NetworkManager === -impl NetworkManager { +impl NetworkManager { /// Sets the dedicated channel for events indented for the /// [`TransactionsManager`](crate::transactions::TransactionsManager). pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender) { @@ -126,7 +129,7 @@ impl NetworkManager { /// Sets the dedicated channel for events indented for the /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler). - pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender) { + pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender>) { self.to_eth_request_handler = Some(tx); } @@ -138,7 +141,7 @@ impl NetworkManager { /// Returns the [`NetworkHandle`] that can be cloned and shared. /// /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`] - pub const fn handle(&self) -> &NetworkHandle { + pub const fn handle(&self) -> &NetworkHandle { &self.handle } @@ -165,7 +168,7 @@ impl NetworkManager { /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the /// state of the entire network. pub async fn new( - config: NetworkConfig, + config: NetworkConfig, ) -> Result { let NetworkConfig { client, @@ -253,7 +256,7 @@ impl NetworkManager { let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel(); - let event_sender: EventSender = Default::default(); + let event_sender: EventSender>> = Default::default(); let handle = NetworkHandle::new( Arc::clone(&num_active_peers), @@ -314,14 +317,14 @@ impl NetworkManager { /// } /// ``` pub async fn builder( - config: NetworkConfig, - ) -> Result, NetworkError> { + config: NetworkConfig, + ) -> Result, NetworkError> { let network = Self::new(config).await?; Ok(network.into_builder()) } /// Create a [`NetworkBuilder`] to configure all components of the network - pub const fn into_builder(self) -> NetworkBuilder<(), ()> { + pub const fn into_builder(self) -> NetworkBuilder<(), (), N> { NetworkBuilder { network: self, transactions: (), request_handler: () } } @@ -369,7 +372,7 @@ impl NetworkManager { /// Returns a new [`FetchClient`] that can be cloned and shared. /// /// The [`FetchClient`] is the entrypoint for sending requests to the network. - pub fn fetch_client(&self) -> FetchClient { + pub fn fetch_client(&self) -> FetchClient { self.swarm.state().fetch_client() } @@ -416,7 +419,7 @@ impl NetworkManager { /// Sends an event to the [`EthRequestManager`](crate::eth_requests::EthRequestHandler) if /// configured. - fn delegate_eth_request(&self, event: IncomingEthRequest) { + fn delegate_eth_request(&self, event: IncomingEthRequest) { if let Some(ref reqs) = self.to_eth_request_handler { let _ = reqs.try_send(event).map_err(|e| { if let TrySendError::Full(_) = e { @@ -428,7 +431,7 @@ impl NetworkManager { } /// Handle an incoming request from the peer - fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest) { + fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest) { match req { PeerRequest::GetBlockHeaders { request, response } => { self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders { @@ -469,7 +472,7 @@ impl NetworkManager { } /// Invoked after a `NewBlock` message from the peer was validated - fn on_block_import_result(&mut self, outcome: BlockImportOutcome) { + fn on_block_import_result(&mut self, outcome: BlockImportOutcome) { let BlockImportOutcome { peer, result } = outcome; match result { Ok(validated_block) => match validated_block { @@ -511,7 +514,7 @@ impl NetworkManager { } /// Handles a received Message from the peer's session. - fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) { + fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) { match msg { PeerMessage::NewBlockHashes(hashes) => { self.within_pow_or_disconnect(peer_id, |this| { @@ -551,7 +554,7 @@ impl NetworkManager { } /// Handler for received messages from a handle - fn on_handle_message(&mut self, msg: NetworkHandleMessage) { + fn on_handle_message(&mut self, msg: NetworkHandleMessage) { match msg { NetworkHandleMessage::DiscoveryListener(tx) => { self.swarm.state_mut().discovery_mut().add_listener(tx); @@ -646,7 +649,7 @@ impl NetworkManager { } } - fn on_swarm_event(&mut self, event: SwarmEvent) { + fn on_swarm_event(&mut self, event: SwarmEvent) { // handle event match event { SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message), @@ -981,7 +984,7 @@ impl NetworkManager { } } -impl Future for NetworkManager { +impl Future for NetworkManager { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 4175757e0cf1..1715fa63e2f4 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -11,7 +11,10 @@ use enr::Enr; use parking_lot::Mutex; use reth_discv4::{Discv4, NatResolver}; use reth_discv5::Discv5; -use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions}; +use reth_eth_wire::{ + DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, NewBlock, + NewPooledTransactionHashes, SharedTransactions, +}; use reth_network_api::{ test_utils::{PeersHandle, PeersHandleProvider}, BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent, @@ -39,20 +42,20 @@ use crate::{ /// /// See also [`NetworkManager`](crate::NetworkManager). #[derive(Clone, Debug)] -pub struct NetworkHandle { +pub struct NetworkHandle { /// The Arc'ed delegate that contains the state. - inner: Arc, + inner: Arc>, } // === impl NetworkHandle === -impl NetworkHandle { +impl NetworkHandle { /// Creates a single new instance. #[allow(clippy::too_many_arguments)] pub(crate) fn new( num_active_peers: Arc, listener_address: Arc>, - to_manager_tx: UnboundedSender, + to_manager_tx: UnboundedSender>, secret_key: SecretKey, local_peer_id: PeerId, peers: PeersHandle, @@ -61,7 +64,7 @@ impl NetworkHandle { tx_gossip_disabled: bool, discv4: Option, discv5: Option, - event_sender: EventSender, + event_sender: EventSender>>, nat: Option, ) -> Self { let inner = NetworkInner { @@ -89,7 +92,7 @@ impl NetworkHandle { &self.inner.local_peer_id } - fn manager(&self) -> &UnboundedSender { + fn manager(&self) -> &UnboundedSender> { &self.inner.to_manager_tx } @@ -99,7 +102,7 @@ impl NetworkHandle { } /// Sends a [`NetworkHandleMessage`] to the manager - pub(crate) fn send_message(&self, msg: NetworkHandleMessage) { + pub(crate) fn send_message(&self, msg: NetworkHandleMessage) { let _ = self.inner.to_manager_tx.send(msg); } @@ -113,12 +116,12 @@ impl NetworkHandle { /// Caution: in `PoS` this is a noop because new blocks are no longer announced over devp2p. /// Instead they are sent to the node by CL and can be requested over devp2p. /// Broadcasting new blocks is considered a protocol violation. - pub fn announce_block(&self, block: NewBlock, hash: B256) { + pub fn announce_block(&self, block: NewBlock, hash: B256) { self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash)) } /// Sends a [`PeerRequest`] to the given peer's session. - pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) { + pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) { self.send_message(NetworkHandleMessage::EthRequest { peer_id, request }) } @@ -186,8 +189,8 @@ impl NetworkHandle { // === API Implementations === -impl NetworkEventListenerProvider for NetworkHandle { - fn event_listener(&self) -> EventStream { +impl NetworkEventListenerProvider for NetworkHandle { + fn event_listener(&self) -> EventStream>> { self.inner.event_sender.new_listener() } @@ -198,13 +201,13 @@ impl NetworkEventListenerProvider for NetworkHandle { } } -impl NetworkProtocols for NetworkHandle { +impl NetworkProtocols for NetworkHandle { fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol) { self.send_message(NetworkHandleMessage::AddRlpxSubProtocol(protocol)) } } -impl PeersInfo for NetworkHandle { +impl PeersInfo for NetworkHandle { fn num_connected_peers(&self) -> usize { self.inner.num_active_peers.load(Ordering::Relaxed) } @@ -337,13 +340,13 @@ impl Peers for NetworkHandle { } } -impl PeersHandleProvider for NetworkHandle { +impl PeersHandleProvider for NetworkHandle { fn peers_handle(&self) -> &PeersHandle { &self.inner.peers } } -impl NetworkInfo for NetworkHandle { +impl NetworkInfo for NetworkHandle { fn local_addr(&self) -> SocketAddr { *self.inner.listener_address.lock() } @@ -367,7 +370,7 @@ impl NetworkInfo for NetworkHandle { } } -impl SyncStateProvider for NetworkHandle { +impl SyncStateProvider for NetworkHandle { fn is_syncing(&self) -> bool { self.inner.is_syncing.load(Ordering::Relaxed) } @@ -380,7 +383,7 @@ impl SyncStateProvider for NetworkHandle { } } -impl NetworkSyncUpdater for NetworkHandle { +impl NetworkSyncUpdater for NetworkHandle { fn update_sync_state(&self, state: SyncState) { let future_state = state.is_syncing(); let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed); @@ -396,8 +399,8 @@ impl NetworkSyncUpdater for NetworkHandle { } } -impl BlockDownloaderProvider for NetworkHandle { - type Client = FetchClient; +impl BlockDownloaderProvider for NetworkHandle { + type Client = FetchClient; async fn fetch_client(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -407,11 +410,11 @@ impl BlockDownloaderProvider for NetworkHandle { } #[derive(Debug)] -struct NetworkInner { +struct NetworkInner { /// Number of active peer sessions the node's currently handling. num_active_peers: Arc, /// Sender half of the message channel to the [`crate::NetworkManager`]. - to_manager_tx: UnboundedSender, + to_manager_tx: UnboundedSender>, /// The local address that accepts incoming connections. listener_address: Arc>, /// The secret key used for authenticating sessions. @@ -435,7 +438,7 @@ struct NetworkInner { /// The instance of the discv5 service discv5: Option, /// Sender for high level network events. - event_sender: EventSender, + event_sender: EventSender>>, /// The NAT resolver nat: Option, } @@ -448,7 +451,7 @@ pub trait NetworkProtocols: Send + Sync { /// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager). #[derive(Debug)] -pub(crate) enum NetworkHandleMessage { +pub(crate) enum NetworkHandleMessage { /// Marks a peer as trusted. AddTrustedPeerId(PeerId), /// Adds an address for a peer, including its ID, kind, and socket address. @@ -458,7 +461,7 @@ pub(crate) enum NetworkHandleMessage { /// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason. DisconnectPeer(PeerId, Option), /// Broadcasts an event to announce a new block to all nodes. - AnnounceBlock(NewBlock, B256), + AnnounceBlock(NewBlock, B256), /// Sends a list of transactions to the given peer. SendTransaction { /// The ID of the peer to which the transactions are sent. @@ -478,12 +481,12 @@ pub(crate) enum NetworkHandleMessage { /// The peer to send the request to. peer_id: PeerId, /// The request to send to the peer's sessions. - request: PeerRequest, + request: PeerRequest, }, /// Applies a reputation change to the given peer. ReputationChange(PeerId, ReputationChangeKind), /// Returns the client that can be used to interact with the network. - FetchClient(oneshot::Sender), + FetchClient(oneshot::Sender>), /// Applies a status update. StatusUpdate { /// The head status to apply. diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 4e23c8527b47..48ae61e0dd0d 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -34,9 +34,10 @@ use std::{ use alloy_primitives::{TxHash, B256}; use futures::{stream::FuturesUnordered, Future, StreamExt}; use reth_eth_wire::{ - DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, - NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68, - PooledTransactions, RequestTxHashes, Transactions, + DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData, + HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes, + NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, + RequestTxHashes, Transactions, }; use reth_metrics::common::mpsc::UnboundedMeteredReceiver; use reth_network_api::{ @@ -200,15 +201,15 @@ impl TransactionsHandle { /// propagate new transactions over the network. #[derive(Debug)] #[must_use = "Manager does nothing unless polled."] -pub struct TransactionsManager { +pub struct TransactionsManager { /// Access to the transaction pool. pool: Pool, /// Network access. - network: NetworkHandle, + network: NetworkHandle, /// Subscriptions to all network related events. /// /// From which we get all new incoming transaction related messages. - network_events: EventStream, + network_events: EventStream>>, /// Transaction fetcher to handle inflight and missing transaction requests. transaction_fetcher: TransactionFetcher, /// All currently pending transactions grouped by peers. diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index ec891e5b39a9..0a17cbd563ed 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -8,7 +8,7 @@ use alloy_provider::{ext::AdminApi, ProviderBuilder}; use futures::StreamExt; use reth_chainspec::MAINNET; use reth_discv4::Discv4Config; -use reth_eth_wire::{DisconnectReason, HeadersDirection}; +use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, HeadersDirection}; use reth_net_banlist::BanList; use reth_network::{ test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT}, @@ -204,8 +204,9 @@ async fn test_connect_with_boot_nodes() { let mut discv4 = Discv4Config::builder(); discv4.add_boot_nodes(mainnet_nodes()); - let config = - NetworkConfigBuilder::new(secret_key).discovery(discv4).build(NoopProvider::default()); + let config = NetworkConfigBuilder::::new(secret_key) + .discovery(discv4) + .build(NoopProvider::default()); let network = NetworkManager::new(config).await.unwrap(); let handle = network.handle().clone(); @@ -572,7 +573,7 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() { let secret_key = SecretKey::new(&mut rand::thread_rng()); let peers_config = PeersConfig::default().with_max_inbound(0); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::::new(secret_key) .listener_port(0) .disable_discovery() .peer_config(peers_config) diff --git a/crates/net/network/tests/it/startup.rs b/crates/net/network/tests/it/startup.rs index d84ff492e5e7..862281ab1ffd 100644 --- a/crates/net/network/tests/it/startup.rs +++ b/crates/net/network/tests/it/startup.rs @@ -5,6 +5,7 @@ use std::{ use reth_chainspec::MAINNET; use reth_discv4::{Discv4Config, NatResolver}; +use reth_eth_wire::EthNetworkPrimitives; use reth_network::{ error::{NetworkError, ServiceKind}, Discovery, NetworkConfigBuilder, NetworkManager, @@ -26,7 +27,7 @@ fn is_addr_in_use_kind(err: &NetworkError, kind: ServiceKind) -> bool { #[tokio::test(flavor = "multi_thread")] async fn test_is_default_syncing() { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::::new(secret_key) .disable_discovery() .listener_port(0) .build(NoopProvider::default()); @@ -37,13 +38,13 @@ async fn test_is_default_syncing() { #[tokio::test(flavor = "multi_thread")] async fn test_listener_addr_in_use() { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::::new(secret_key) .disable_discovery() .listener_port(0) .build(NoopProvider::default()); let network = NetworkManager::new(config).await.unwrap(); let listener_port = network.local_addr().port(); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::::new(secret_key) .listener_port(listener_port) .disable_discovery() .build(NoopProvider::default()); @@ -72,7 +73,7 @@ async fn test_discovery_addr_in_use() { #[tokio::test(flavor = "multi_thread")] async fn test_tcp_port_node_record_no_discovery() { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::::new(secret_key) .listener_port(0) .disable_discovery() .build_with_noop_provider(MAINNET.clone()); @@ -90,7 +91,7 @@ async fn test_tcp_port_node_record_no_discovery() { #[tokio::test(flavor = "multi_thread")] async fn test_tcp_port_node_record_discovery() { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::::new(secret_key) .listener_port(0) .discovery_port(0) .disable_dns_discovery() @@ -109,7 +110,7 @@ async fn test_tcp_port_node_record_discovery() { #[tokio::test(flavor = "multi_thread")] async fn test_node_record_address_with_nat() { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::::new(secret_key) .add_nat(Some(NatResolver::ExternalIp("10.1.1.1".parse().unwrap()))) .disable_discv4_discovery() .disable_dns_discovery() @@ -125,7 +126,7 @@ async fn test_node_record_address_with_nat() { #[tokio::test(flavor = "multi_thread")] async fn test_node_record_address_with_nat_disable_discovery() { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::::new(secret_key) .add_nat(Some(NatResolver::ExternalIp("10.1.1.1".parse().unwrap()))) .disable_discovery() .listener_port(0) diff --git a/examples/bsc-p2p/src/main.rs b/examples/bsc-p2p/src/main.rs index e46ea4bec357..9e83f34e92f3 100644 --- a/examples/bsc-p2p/src/main.rs +++ b/examples/bsc-p2p/src/main.rs @@ -14,7 +14,9 @@ use chainspec::{boot_nodes, bsc_chain_spec}; use reth_discv4::Discv4ConfigBuilder; -use reth_network::{NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager}; +use reth_network::{ + EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, +}; use reth_network_api::PeersInfo; use reth_primitives::{ForkHash, ForkId}; use reth_tracing::{ @@ -62,7 +64,7 @@ async fn main() { // latest BSC forkId, we need to override this to allow connections from BSC nodes let fork_id = ForkId { hash: ForkHash([0x07, 0xb5, 0x43, 0x28]), next: 0 }; net_cfg.fork_filter.set_current_fork_id(fork_id); - let net_manager = NetworkManager::new(net_cfg).await.unwrap(); + let net_manager = NetworkManager::::new(net_cfg).await.unwrap(); // The network handle is our entrypoint into the network. let net_handle = net_manager.handle().clone(); diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index e16f71071c8c..702d0e8cf5ef 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -14,8 +14,8 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use reth::builder::NodeHandle; use reth_network::{ - config::SecretKey, protocol::IntoRlpxSubProtocol, NetworkConfig, NetworkManager, - NetworkProtocols, + config::SecretKey, protocol::IntoRlpxSubProtocol, EthNetworkPrimitives, NetworkConfig, + NetworkManager, NetworkProtocols, }; use reth_network_api::{test_utils::PeersHandleProvider, NetworkInfo}; use reth_node_ethereum::EthereumNode; @@ -53,7 +53,7 @@ fn main() -> eyre::Result<()> { .build_with_noop_provider(node.chain_spec()); // spawn the second network instance - let subnetwork = NetworkManager::new(net_cfg).await?; + let subnetwork = NetworkManager::::new(net_cfg).await?; let subnetwork_peer_id = *subnetwork.peer_id(); let subnetwork_peer_addr = subnetwork.local_addr(); let subnetwork_handle = subnetwork.peers_handle(); diff --git a/examples/network/src/main.rs b/examples/network/src/main.rs index 1d8f436f318f..bd4f232a754c 100644 --- a/examples/network/src/main.rs +++ b/examples/network/src/main.rs @@ -8,7 +8,8 @@ use futures::StreamExt; use reth_network::{ - config::rng_secret_key, NetworkConfig, NetworkEventListenerProvider, NetworkManager, + config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkEventListenerProvider, + NetworkManager, }; use reth_provider::test_utils::NoopProvider; @@ -24,7 +25,7 @@ async fn main() -> eyre::Result<()> { let config = NetworkConfig::builder(local_key).mainnet_boot_nodes().build(client); // create the network instance - let network = NetworkManager::new(config).await?; + let network = NetworkManager::::new(config).await?; // get a handle to the network to interact with it let handle = network.handle().clone(); diff --git a/examples/polygon-p2p/src/main.rs b/examples/polygon-p2p/src/main.rs index 6078ae14cb85..bcc17a24f8d2 100644 --- a/examples/polygon-p2p/src/main.rs +++ b/examples/polygon-p2p/src/main.rs @@ -12,7 +12,8 @@ use chain_cfg::{boot_nodes, head, polygon_chain_spec}; use reth_discv4::Discv4ConfigBuilder; use reth_network::{ - config::NetworkMode, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, + config::NetworkMode, EthNetworkPrimitives, NetworkConfig, NetworkEvent, + NetworkEventListenerProvider, NetworkManager, }; use reth_tracing::{ tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, @@ -57,7 +58,7 @@ async fn main() { discv4_cfg.add_boot_nodes(boot_nodes()).lookup_interval(interval); let net_cfg = net_cfg.set_discovery_v4(discv4_cfg.build()); - let net_manager = NetworkManager::new(net_cfg).await.unwrap(); + let net_manager = NetworkManager::::new(net_cfg).await.unwrap(); // The network handle is our entrypoint into the network. let net_handle = net_manager.handle();