diff --git a/Cargo.lock b/Cargo.lock index 2c383b29a6ff..d7d0e3a4267c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,9 +297,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backon" -version = "0.2.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6cd1a59bc091e593ee9ed62df4e4a07115e00a0e0a52fd7e0e04540773939b80" +checksum = "f34fac4d7cdaefa2deded0eda2d5d59dbfd43370ff3f856209e72340ae84c294" dependencies = [ "futures", "pin-project", @@ -4369,6 +4369,7 @@ dependencies = [ "reth-rlp", "reth-rpc", "reth-rpc-builder", + "reth-rpc-engine-api", "reth-staged-sync", "reth-stages", "reth-tasks", @@ -4557,6 +4558,7 @@ name = "reth-eth-wire" version = "0.1.0" dependencies = [ "arbitrary", + "async-trait", "bytes", "ethers-core", "futures", @@ -4833,28 +4835,12 @@ dependencies = [ name = "reth-provider" version = "0.1.0" dependencies = [ - "arbitrary", - "async-trait", "auto_impl 1.0.1", - "bytes", - "futures", - "heapless", - "hex-literal", - "modular-bitfield", - "parity-scale-codec", "parking_lot 0.12.1", - "postcard", - "rand 0.8.5", - "reth-codecs", "reth-db", "reth-interfaces", "reth-primitives", - "reth-rpc-types", - "secp256k1 0.24.3", - "test-fuzz", "thiserror", - "tokio", - "tokio-stream", ] [[package]] @@ -4901,6 +4887,7 @@ dependencies = [ "jsonwebtoken", "pin-project", "rand 0.8.5", + "reth-executor", "reth-interfaces", "reth-network-api", "reth-primitives", @@ -4911,6 +4898,7 @@ dependencies = [ "reth-rpc-types", "reth-tasks", "reth-transaction-pool", + "revm", "secp256k1 0.26.0", "serde", "serde_json", @@ -4943,6 +4931,7 @@ dependencies = [ "reth-provider", "reth-rpc", "reth-rpc-api", + "reth-rpc-engine-api", "reth-rpc-types", "reth-tracing", "reth-transaction-pool", @@ -5098,6 +5087,7 @@ version = "0.1.0" dependencies = [ "aquamarine", "async-trait", + "auto_impl 1.0.1", "bitflags", "fnv", "futures-util", @@ -5107,6 +5097,7 @@ dependencies = [ "rand 0.8.5", "reth-metrics-derive", "reth-primitives", + "reth-rlp", "ruint", "serde", "thiserror", @@ -5242,8 +5233,7 @@ dependencies = [ [[package]] name = "ruint" version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad3a104dc8c3867f653b0fec89c65e00b0ceb752718ad282177a7e0f33257ac" +source = "git+https://github.com/mattsse/uint?branch=matt/skip-leading-zeros#7aa078a0d3ea5a05c09e7a2b42c06921d55e7c67" dependencies = [ "arbitrary", "derive_more", @@ -5259,8 +5249,7 @@ dependencies = [ [[package]] name = "ruint-macro" version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62cc5760263ea229d367e7dff3c0cbf09e4797a125bd87059a6c095804f3b2d1" +source = "git+https://github.com/mattsse/uint?branch=matt/skip-leading-zeros#7aa078a0d3ea5a05c09e7a2b42c06921d55e7c67" [[package]] name = "rustc-demangle" diff --git a/Cargo.toml b/Cargo.toml index 4fc4b060459f..bd29418d4b66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,3 +41,5 @@ default-members = ["bin/reth"] [patch.crates-io] revm = { git = "https://github.com/bluealloy/revm" } revm-primitives = { git = "https://github.com/bluealloy/revm" } +# patched for quantity U256 responses +ruint = { git = "https://github.com/mattsse/uint", branch = "matt/skip-leading-zeros" } diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 84bca20202f3..4e32f4e5a66a 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -19,6 +19,7 @@ reth-transaction-pool = { path = "../../crates/transaction-pool", features = ["t reth-consensus = { path = "../../crates/consensus" } reth-executor = { path = "../../crates/executor" } reth-eth-wire = { path = "../../crates/net/eth-wire" } +reth-rpc-engine-api = { path = "../../crates/rpc/rpc-engine-api" } reth-rpc-builder = { path = "../../crates/rpc/rpc-builder" } reth-rpc = { path = "../../crates/rpc/rpc" } reth-rlp = { path = "../../crates/rlp" } @@ -58,7 +59,7 @@ tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] } tokio-stream = "0.1" futures = "0.3.25" tempfile = { version = "3.3.0" } -backon = "0.2.0" +backon = "0.4" comfy-table = "6.1.4" crossterm = "0.25.0" tui = "0.19.0" diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index acdfd0cad3fa..62d9961cbaf6 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -2,14 +2,15 @@ use crate::dirs::{JwtSecretPath, PlatformPath}; use clap::Args; -use jsonrpsee::core::Error as RpcError; +use jsonrpsee::{core::Error as RpcError, server::ServerHandle}; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{JwtError, JwtSecret}; use reth_rpc_builder::{ - IpcServerBuilder, RethRpcModule, RpcModuleSelection, RpcServerConfig, RpcServerHandle, - ServerBuilder, TransportRpcModuleConfig, DEFAULT_HTTP_RPC_PORT, DEFAULT_IPC_ENDPOINT, + constants, IpcServerBuilder, RethRpcModule, RpcModuleSelection, RpcServerConfig, + RpcServerHandle, ServerBuilder, TransportRpcModuleConfig, }; +use reth_rpc_engine_api::EngineApiHandle; use reth_transaction_pool::TransactionPool; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -48,7 +49,7 @@ pub struct RpcServerArgs { #[arg(long = "ws.addr")] pub ws_addr: Option, - /// Http server port to listen on + /// Ws server port to listen on #[arg(long = "ws.port")] pub ws_port: Option, @@ -64,9 +65,17 @@ pub struct RpcServerArgs { #[arg(long)] pub ipcpath: Option, + /// Auth server address to listen on + #[arg(long = "authrpc.addr")] + pub auth_addr: Option, + + /// Auth server port to listen on + #[arg(long = "authrpc.port")] + pub auth_port: Option, + /// Path to a JWT secret to use for authenticated RPC endpoints #[arg(long = "authrpc.jwtsecret", value_name = "PATH", global = true, required = false)] - authrpc_jwtsecret: Option>, + auth_jwtsecret: Option>, } impl RpcServerArgs { @@ -81,7 +90,7 @@ impl RpcServerArgs { /// duration of the execution, and SHOULD store the hex-encoded secret as a jwt.hex file on /// the filesystem. This file can then be used to provision the counterpart client. pub(crate) fn jwt_secret(&self) -> Result { - let arg = self.authrpc_jwtsecret.as_ref(); + let arg = self.auth_jwtsecret.as_ref(); let path: Option<&Path> = arg.map(|p| p.as_ref()); match path { Some(fpath) => JwtSecret::from_file(fpath), @@ -94,7 +103,7 @@ impl RpcServerArgs { } /// Convenience function for starting a rpc server with configs which extracted from cli args. - pub(crate) async fn start_server( + pub(crate) async fn start_rpc_server( &self, client: Client, pool: Pool, @@ -115,6 +124,27 @@ impl RpcServerArgs { .await } + /// Create Engine API server. + pub(crate) async fn start_auth_server( + &self, + client: Client, + pool: Pool, + network: Network, + handle: EngineApiHandle, + ) -> Result + where + Client: BlockProvider + HeaderProvider + StateProviderFactory + Clone + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, + { + let socket_address = SocketAddr::new( + self.auth_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), + self.auth_port.unwrap_or(constants::DEFAULT_AUTH_PORT), + ); + let secret = self.jwt_secret().map_err(|err| RpcError::Custom(err.to_string()))?; + reth_rpc_builder::auth::launch(client, pool, network, handle, socket_address, secret).await + } + /// Creates the [TransportRpcModuleConfig] from cli args. fn transport_rpc_module_config(&self) -> TransportRpcModuleConfig { let mut config = TransportRpcModuleConfig::default(); @@ -138,7 +168,7 @@ impl RpcServerArgs { if self.http { let socket_address = SocketAddr::new( self.http_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), - self.http_port.unwrap_or(DEFAULT_HTTP_RPC_PORT), + self.http_port.unwrap_or(constants::DEFAULT_HTTP_RPC_PORT), ); config = config .with_http_address(socket_address) @@ -149,7 +179,7 @@ impl RpcServerArgs { if self.ws { let socket_address = SocketAddr::new( self.ws_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), - self.ws_port.unwrap_or(DEFAULT_HTTP_RPC_PORT), + self.ws_port.unwrap_or(constants::DEFAULT_HTTP_RPC_PORT), ); config = config.with_ws_address(socket_address).with_http(ServerBuilder::new()); } @@ -157,7 +187,7 @@ impl RpcServerArgs { if !self.ipcdisable { let ipc_builder = IpcServerBuilder::default(); config = config.with_ipc(ipc_builder).with_ipc_endpoint( - self.ipcpath.as_ref().unwrap_or(&DEFAULT_IPC_ENDPOINT.to_string()), + self.ipcpath.as_ref().unwrap_or(&constants::DEFAULT_IPC_ENDPOINT.to_string()), ); } @@ -167,10 +197,9 @@ impl RpcServerArgs { #[cfg(test)] mod tests { - use std::net::SocketAddrV4; - use super::*; use clap::Parser; + use std::net::SocketAddrV4; /// A helper type to parse Args more easily #[derive(Parser)] @@ -227,12 +256,15 @@ mod tests { let config = args.rpc_server_config(); assert_eq!( config.http_address().unwrap(), - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_HTTP_RPC_PORT)) + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::UNSPECIFIED, + constants::DEFAULT_HTTP_RPC_PORT + )) ); assert_eq!( config.ws_address().unwrap(), SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8888)) ); - assert_eq!(config.ipc_endpoint().unwrap().path(), DEFAULT_IPC_ENDPOINT); + assert_eq!(config.ipc_endpoint().unwrap().path(), constants::DEFAULT_IPC_ENDPOINT); } } diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index a9499e65e77b..f5008f2da9a0 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -173,3 +173,17 @@ impl ImportCommand { confy::load_path::(&self.config).wrap_err("Could not load config") } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_common_import_command_chain_args() { + for chain in ["mainnet", "sepolia", "goerli"] { + let args: ImportCommand = + ImportCommand::parse_from(["reth", "--chain", chain, "--path", "."]); + assert_eq!(args.chain.chain, chain.parse().unwrap()); + } + } +} diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 79faa462250a..d22105b05064 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -20,3 +20,4 @@ pub mod runner; pub mod stage; pub mod test_eth_chain; pub mod test_vectors; +pub mod utils; diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 2b0e22b3f77b..10a83e7c4fb5 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -6,6 +6,7 @@ use crate::{ dirs::{ConfigPath, DbPath, PlatformPath}, prometheus_exporter, runner::CliContext, + utils::get_single_header, }; use clap::{crate_version, Parser}; use eyre::Context; @@ -31,12 +32,12 @@ use reth_interfaces::{ sync::SyncStateUpdater, }; use reth_network::{ - error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, + error::NetworkError, FetchClient, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, }; use reth_network_api::NetworkInfo; -use reth_primitives::{BlockNumber, ChainSpec, Head, H256}; +use reth_primitives::{BlockHashOrNumber, BlockNumber, ChainSpec, Head, H256}; use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; - +use reth_rpc_engine_api::{EngineApi, EngineApiHandle}; use reth_staged_sync::{ utils::{ chainspec::genesis_value_parser, @@ -51,7 +52,8 @@ use reth_stages::{ }; use reth_tasks::TaskExecutor; use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; -use tracing::{debug, info, trace, warn}; +use tokio::sync::{mpsc::unbounded_channel, watch}; +use tracing::*; /// Start the node #[derive(Debug, Parser)] @@ -125,6 +127,7 @@ impl Command { info!(target: "reth::cli", path = %self.db, "Opening database"); let db = Arc::new(init_db(&self.db)?); + let shareable_db = ShareableDatabase::new(Arc::clone(&db), self.chain.clone()); info!(target: "reth::cli", "Database opened"); self.start_metrics_endpoint()?; @@ -133,7 +136,7 @@ impl Command { init_genesis(db.clone(), self.chain.clone())?; - let consensus = self.init_consensus()?; + let (consensus, forkchoice_state_tx) = self.init_consensus()?; info!(target: "reth::cli", "Consensus engine initialized"); self.init_trusted_nodes(&mut config); @@ -144,19 +147,29 @@ impl Command { let network = self.start_network(network_config, &ctx.task_executor, ()).await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); - // TODO: Use the resolved secret to spawn the Engine API server - // Look at `reth_rpc::AuthLayer` for integration hints - let _secret = self.rpc.jwt_secret(); + let test_transaction_pool = reth_transaction_pool::test_utils::testing_pool(); + info!(target: "reth::cli", "Test transaction pool initialized"); let _rpc_server = self .rpc - .start_server( - ShareableDatabase::new(db.clone(), self.chain.clone()), - reth_transaction_pool::test_utils::testing_pool(), + .start_rpc_server(shareable_db.clone(), test_transaction_pool.clone(), network.clone()) + .await?; + info!(target: "reth::cli", "Started RPC server"); + + let engine_api_handle = + self.init_engine_api(Arc::clone(&db), forkchoice_state_tx, &ctx.task_executor); + info!(target: "reth::cli", "Engine API handler initialized"); + + let _auth_server = self + .rpc + .start_auth_server( + shareable_db, + test_transaction_pool, network.clone(), + engine_api_handle, ) .await?; - info!(target: "reth::cli", "Started RPC server"); + info!(target: "reth::cli", "Started Auth server"); let (mut pipeline, events) = self .build_networked_pipeline( @@ -193,9 +206,18 @@ impl Command { task_executor: &TaskExecutor, ) -> eyre::Result<(Pipeline, impl SyncStateUpdater>, impl Stream)> { - // building network downloaders using the fetch client - let fetch_client = Arc::new(network.fetch_client().await?); + let fetch_client = network.fetch_client().await?; + let max_block = if let Some(block) = self.max_block { + Some(block) + } else if let Some(tip) = self.tip { + Some(self.lookup_or_fetch_tip(db.clone(), fetch_client.clone(), tip).await?) + } else { + None + }; + // TODO: remove Arc requirement from downloader builders. + // building network downloaders using the fetch client + let fetch_client = Arc::new(fetch_client); let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers) .build(fetch_client.clone(), consensus.clone()) .into_task_with(task_executor); @@ -205,7 +227,14 @@ impl Command { .into_task_with(task_executor); let mut pipeline = self - .build_pipeline(config, header_downloader, body_downloader, network.clone(), consensus) + .build_pipeline( + config, + header_downloader, + body_downloader, + network.clone(), + consensus, + max_block, + ) .await?; let events = stream_select( @@ -239,7 +268,7 @@ impl Command { } } - fn init_consensus(&self) -> eyre::Result> { + fn init_consensus(&self) -> eyre::Result<(Arc, watch::Sender)> { let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone()); if let Some(tip) = self.tip { @@ -256,7 +285,24 @@ impl Command { warn!(target: "reth::cli", warn_msg); } - Ok(consensus) + Ok((consensus, notifier)) + } + + fn init_engine_api( + &self, + db: Arc>, + forkchoice_state_tx: watch::Sender, + task_executor: &TaskExecutor, + ) -> EngineApiHandle { + let (message_tx, message_rx) = unbounded_channel(); + let engine_api = EngineApi::new( + ShareableDatabase::new(db, self.chain.clone()), + self.chain.clone(), + message_rx, + forkchoice_state_tx, + ); + task_executor.spawn(engine_api); + message_tx } /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected @@ -287,7 +333,7 @@ impl Command { Ok(handle) } - fn fetch_head(&self, db: Arc>) -> Result { + fn lookup_head(&self, db: Arc>) -> Result { db.view(|tx| { let head = FINISH.get_progress(tx)?.unwrap_or_default(); let header = tx @@ -310,13 +356,42 @@ impl Command { .map_err(Into::into) } + /// Attempt to look up the block number for the tip hash in the database. + /// If it doesn't exist, download the header and return the block number. + /// + /// NOTE: The download is attempted with infinite retries. + async fn lookup_or_fetch_tip( + &self, + db: Arc>, + fetch_client: FetchClient, + tip: H256, + ) -> Result { + if let Some(number) = db.view(|tx| tx.get::(tip))?? { + debug!(target: "reth::cli", ?tip, number, "Successfully looked up tip in the database"); + return Ok(number) + } + + debug!(target: "reth::cli", ?tip, "Fetching tip header from the network."); + loop { + match get_single_header(fetch_client.clone(), BlockHashOrNumber::Hash(tip)).await { + Ok(tip_header) => { + debug!(target: "reth::cli", ?tip, number = tip_header.number, "Successfully fetched tip"); + return Ok(tip_header.number) + } + Err(error) => { + error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying..."); + } + } + } + } + fn load_network_config( &self, config: &Config, db: Arc>, executor: TaskExecutor, ) -> NetworkConfig>>> { - let head = self.fetch_head(Arc::clone(&db)).expect("the head block is missing"); + let head = self.lookup_head(Arc::clone(&db)).expect("the head block is missing"); self.network .network_config(config, self.chain.clone()) @@ -332,6 +407,7 @@ impl Command { body_downloader: B, updater: U, consensus: &Arc, + max_block: Option, ) -> eyre::Result, U>> where H: HeaderDownloader + 'static, @@ -342,7 +418,7 @@ impl Command { let mut builder = Pipeline::builder(); - if let Some(max_block) = self.max_block { + if let Some(max_block) = max_block { debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); builder = builder.with_max_block(max_block) } @@ -500,3 +576,16 @@ pub async fn handle_events(mut events: impl Stream + Unpin) { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_common_node_command_chain_args() { + for chain in ["mainnet", "sepolia", "goerli"] { + let args: Command = Command::parse_from(["reth", "--chain", chain]); + assert_eq!(args.chain.chain, chain.parse().unwrap()); + } + } +} diff --git a/bin/reth/src/p2p/mod.rs b/bin/reth/src/p2p/mod.rs index bc036d7d7dfc..d8b27777ba55 100644 --- a/bin/reth/src/p2p/mod.rs +++ b/bin/reth/src/p2p/mod.rs @@ -2,17 +2,14 @@ use crate::{ args::DiscoveryArgs, dirs::{ConfigPath, PlatformPath}, + utils::get_single_header, }; -use backon::{ConstantBackoff, Retryable}; +use backon::{ConstantBuilder, Retryable}; use clap::{Parser, Subcommand}; use reth_db::mdbx::{Env, EnvKind, WriteMap}; use reth_discv4::NatResolver; -use reth_interfaces::p2p::{ - bodies::client::BodiesClient, - headers::client::{HeadersClient, HeadersRequest}, -}; -use reth_network::FetchClient; -use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader}; +use reth_interfaces::p2p::bodies::client::BodiesClient; +use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord}; use reth_provider::ShareableDatabase; use reth_staged_sync::{ utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser}, @@ -113,12 +110,12 @@ impl Command { let fetch_client = network.fetch_client().await?; let retries = self.retries.max(1); - let backoff = ConstantBackoff::default().with_max_times(retries); + let backoff = ConstantBuilder::default().with_max_times(retries); match self.command { Subcommands::Header { id } => { - let header = (move || self.get_single_header(fetch_client.clone(), id)) - .retry(backoff) + let header = (move || get_single_header(fetch_client.clone(), id)) + .retry(&backoff) .notify(|err, _| println!("Error requesting header: {err}. Retrying...")) .await?; println!("Successfully downloaded header: {header:?}"); @@ -130,12 +127,9 @@ impl Command { println!("Block number provided. Downloading header first..."); let client = fetch_client.clone(); let header = (move || { - self.get_single_header( - client.clone(), - BlockHashOrNumber::Number(number), - ) + get_single_header(client.clone(), BlockHashOrNumber::Number(number)) }) - .retry(backoff.clone()) + .retry(&backoff) .notify(|err, _| println!("Error requesting header: {err}. Retrying...")) .await?; header.hash() @@ -145,7 +139,7 @@ impl Command { let client = fetch_client.clone(); async move { client.get_block_bodies(vec![hash]).await } }) - .retry(backoff) + .retry(&backoff) .notify(|err, _| println!("Error requesting block: {err}. Retrying...")) .await? .split(); @@ -162,43 +156,4 @@ impl Command { Ok(()) } - - /// Get a single header from network - pub async fn get_single_header( - &self, - client: FetchClient, - id: BlockHashOrNumber, - ) -> eyre::Result { - let request = HeadersRequest { - direction: reth_primitives::HeadersDirection::Rising, - limit: 1, - start: id, - }; - - let (_, response) = client.get_headers(request).await?.split(); - - if response.len() != 1 { - eyre::bail!( - "Invalid number of headers received. Expected: 1. Received: {}", - response.len() - ) - } - - let header = response.into_iter().next().unwrap().seal_slow(); - - let valid = match id { - BlockHashOrNumber::Hash(hash) => header.hash() == hash, - BlockHashOrNumber::Number(number) => header.number == number, - }; - - if !valid { - eyre::bail!( - "Received invalid header. Received: {:?}. Expected: {:?}", - header.num_hash(), - id - ); - } - - Ok(header) - } } diff --git a/bin/reth/src/utils.rs b/bin/reth/src/utils.rs new file mode 100644 index 000000000000..e163afa71cec --- /dev/null +++ b/bin/reth/src/utils.rs @@ -0,0 +1,43 @@ +//! Common CLI utility functions. + +use reth_interfaces::p2p::{ + download::DownloadClient, + headers::client::{HeadersClient, HeadersRequest}, + priority::Priority, +}; +use reth_network::FetchClient; +use reth_primitives::{BlockHashOrNumber, HeadersDirection, SealedHeader}; + +/// Get a single header from network +pub async fn get_single_header( + client: FetchClient, + id: BlockHashOrNumber, +) -> eyre::Result { + let request = HeadersRequest { direction: HeadersDirection::Rising, limit: 1, start: id }; + + let (peer_id, response) = + client.get_headers_with_priority(request, Priority::High).await?.split(); + + if response.len() != 1 { + client.report_bad_message(peer_id); + eyre::bail!("Invalid number of headers received. Expected: 1. Received: {}", response.len()) + } + + let header = response.into_iter().next().unwrap().seal_slow(); + + let valid = match id { + BlockHashOrNumber::Hash(hash) => header.hash() == hash, + BlockHashOrNumber::Number(number) => header.number == number, + }; + + if !valid { + client.report_bad_message(peer_id); + eyre::bail!( + "Received invalid header. Received: {:?}. Expected: {:?}", + header.num_hash(), + id + ); + } + + Ok(header) +} diff --git a/book/getting_started/installation.md b/book/getting_started/installation.md index 46a3c05b8cca..5f0ceebc06a8 100644 --- a/book/getting_started/installation.md +++ b/book/getting_started/installation.md @@ -15,7 +15,7 @@ Then clone the repository and build the binary: ```console git clone https://github.com/paradigmxyz/reth cd reth -cargo install --release --locked --path . --bin reth +cargo install --locked --path bin/reth --bin reth ``` The binary will now be in a platform specific folder, and should be accessible as `reth` via the command line. diff --git a/crates/net/eth-wire/Cargo.toml b/crates/net/eth-wire/Cargo.toml index 264c3a449ce1..fb76a352b665 100644 --- a/crates/net/eth-wire/Cargo.toml +++ b/crates/net/eth-wire/Cargo.toml @@ -15,12 +15,14 @@ serde = { version = "1", optional = true } # reth reth-codecs = { path = "../../storage/codecs" } reth-primitives = { path = "../../primitives" } +reth-ecies = { path = "../ecies" } reth-rlp = { path = "../../rlp", features = ["alloc", "derive", "std", "ethereum-types", "smol_str"] } # used for Chain and builders ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false } tokio = { version = "1.21.2", features = ["full"] } +tokio-util = { version = "0.7.4", features = ["io", "codec"] } futures = "0.3.24" tokio-stream = "0.1.11" pin-project = "1.0" @@ -28,6 +30,7 @@ tracing = "0.1.37" snap = "1.0.5" smol_str = "0.1" metrics = "0.20.1" +async-trait = "0.1" # arbitrary utils arbitrary = { version = "1.1.7", features = ["derive"], optional = true } @@ -36,7 +39,6 @@ proptest-derive = { version = "0.3", optional = true } [dev-dependencies] reth-primitives = { path = "../../primitives", features = ["arbitrary"] } -reth-ecies = { path = "../ecies" } reth-tracing = { path = "../../tracing" } ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false } diff --git a/crates/net/eth-wire/src/disconnect.rs b/crates/net/eth-wire/src/disconnect.rs index 6ddf4367f974..b72d7bf9a2bf 100644 --- a/crates/net/eth-wire/src/disconnect.rs +++ b/crates/net/eth-wire/src/disconnect.rs @@ -1,10 +1,15 @@ //! Disconnect +use bytes::Bytes; +use futures::{Sink, SinkExt}; use reth_codecs::derive_arbitrary; +use reth_ecies::stream::ECIESStream; use reth_primitives::bytes::{Buf, BufMut}; use reth_rlp::{Decodable, DecodeError, Encodable, Header}; use std::fmt::Display; use thiserror::Error; +use tokio::io::AsyncWrite; +use tokio_util::codec::{Encoder, Framed}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -143,6 +148,45 @@ impl Decodable for DisconnectReason { } } +/// This trait is meant to allow higher level protocols like `eth` to disconnect from a peer, using +/// lower-level disconnect functions (such as those that exist in the `p2p` protocol) if the +/// underlying stream supports it. +#[async_trait::async_trait] +pub trait CanDisconnect: Sink + Unpin + Sized { + /// Disconnects from the underlying stream, using a [`DisconnectReason`] as disconnect + /// information if the stream implements a protocol that can carry the additional disconnect + /// metadata. + async fn disconnect( + &mut self, + reason: DisconnectReason, + ) -> Result<(), >::Error>; +} + +// basic impls for things like Framed +#[async_trait::async_trait] +impl CanDisconnect for Framed +where + T: AsyncWrite + Unpin + Send, + U: Encoder + Send, +{ + async fn disconnect( + &mut self, + _reason: DisconnectReason, + ) -> Result<(), >::Error> { + self.close().await + } +} + +#[async_trait::async_trait] +impl CanDisconnect for ECIESStream +where + S: AsyncWrite + Unpin + Send, +{ + async fn disconnect(&mut self, _reason: DisconnectReason) -> Result<(), std::io::Error> { + self.close().await + } +} + #[cfg(test)] mod tests { use crate::{p2pstream::P2PMessage, DisconnectReason}; diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index c61113c75641..605b8e6a675f 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -2,7 +2,7 @@ use crate::{ errors::{EthHandshakeError, EthStreamError}, message::{EthBroadcastMessage, ProtocolBroadcastMessage}, types::{EthMessage, ProtocolMessage, Status}, - EthVersion, + CanDisconnect, DisconnectReason, EthVersion, }; use futures::{ready, Sink, SinkExt, StreamExt}; use pin_project::pin_project; @@ -43,8 +43,8 @@ impl UnauthedEthStream { impl UnauthedEthStream where - S: Stream> + Sink + Unpin, - EthStreamError: From, + S: Stream> + CanDisconnect + Unpin, + EthStreamError: From + From<>::Error>, { /// Consumes the [`UnauthedEthStream`] and returns an [`EthStream`] after the `Status` /// handshake is completed successfully. This also returns the `Status` message sent by the @@ -67,13 +67,18 @@ where self.inner.send(our_status_bytes).await?; tracing::trace!("waiting for eth status from peer"); - let their_msg = self - .inner - .next() - .await - .ok_or(EthStreamError::EthHandshakeError(EthHandshakeError::NoResponse))??; + let their_msg_res = self.inner.next().await; + + let their_msg = match their_msg_res { + Some(msg) => msg, + None => { + self.inner.disconnect(DisconnectReason::DisconnectRequested).await?; + return Err(EthStreamError::EthHandshakeError(EthHandshakeError::NoResponse)) + } + }?; if their_msg.len() > MAX_MESSAGE_SIZE { + self.inner.disconnect(DisconnectReason::ProtocolBreach).await?; return Err(EthStreamError::MessageTooBig(their_msg.len())) } @@ -82,6 +87,7 @@ where Ok(m) => m, Err(err) => { tracing::debug!("decode error in eth handshake: msg={their_msg:x}"); + self.inner.disconnect(DisconnectReason::DisconnectRequested).await?; return Err(err) } }; @@ -95,6 +101,7 @@ where "validating incoming eth status from peer" ); if status.genesis != resp.genesis { + self.inner.disconnect(DisconnectReason::ProtocolBreach).await?; return Err(EthHandshakeError::MismatchedGenesis { expected: status.genesis, got: resp.genesis, @@ -103,6 +110,7 @@ where } if status.version != resp.version { + self.inner.disconnect(DisconnectReason::ProtocolBreach).await?; return Err(EthHandshakeError::MismatchedProtocolVersion { expected: status.version, got: resp.version, @@ -111,6 +119,7 @@ where } if status.chain != resp.chain { + self.inner.disconnect(DisconnectReason::ProtocolBreach).await?; return Err(EthHandshakeError::MismatchedChain { expected: status.chain, got: resp.chain, @@ -121,6 +130,7 @@ where // TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times // larger, it will still fit within 100 bits if status.total_difficulty.bit_len() > 100 { + self.inner.disconnect(DisconnectReason::ProtocolBreach).await?; return Err(EthHandshakeError::TotalDifficultyBitLenTooLarge { maximum: 100, got: status.total_difficulty.bit_len(), @@ -128,7 +138,12 @@ where .into()) } - fork_filter.validate(resp.forkid).map_err(EthHandshakeError::InvalidFork)?; + if let Err(err) = + fork_filter.validate(resp.forkid).map_err(EthHandshakeError::InvalidFork) + { + self.inner.disconnect(DisconnectReason::ProtocolBreach).await?; + return Err(err.into()) + } // now we can create the `EthStream` because the peer has successfully completed // the handshake @@ -136,9 +151,12 @@ where Ok((stream, resp)) } - _ => Err(EthStreamError::EthHandshakeError( - EthHandshakeError::NonStatusMessageInHandshake, - )), + _ => { + self.inner.disconnect(DisconnectReason::ProtocolBreach).await?; + Err(EthStreamError::EthHandshakeError( + EthHandshakeError::NonStatusMessageInHandshake, + )) + } } } } @@ -239,10 +257,10 @@ where } } -impl Sink for EthStream +impl Sink for EthStream where - S: Sink + Unpin, - EthStreamError: From, + S: CanDisconnect + Unpin, + EthStreamError: From<>::Error>, { type Error = EthStreamError; @@ -252,6 +270,15 @@ where fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> { if matches!(item, EthMessage::Status(_)) { + // TODO: to disconnect here we would need to do something similar to P2PStream's + // start_disconnect, which would ideally be a part of the CanDisconnect trait, or at + // least similar. + // + // Other parts of reth do not need traits like CanDisconnect because they work + // exclusively with EthStream>, where the inner P2PStream is accessible, + // allowing for its start_disconnect method to be called. + // + // self.project().inner.start_disconnect(DisconnectReason::ProtocolBreach); return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake)) } @@ -273,6 +300,17 @@ where } } +#[async_trait::async_trait] +impl CanDisconnect for EthStream +where + S: CanDisconnect + Send, + EthStreamError: From<>::Error>, +{ + async fn disconnect(&mut self, reason: DisconnectReason) -> Result<(), EthStreamError> { + self.inner.disconnect(reason).await.map_err(Into::into) + } +} + #[cfg(test)] mod tests { use super::UnauthedEthStream; diff --git a/crates/net/eth-wire/src/lib.rs b/crates/net/eth-wire/src/lib.rs index 309f38a2269f..44715489b62c 100644 --- a/crates/net/eth-wire/src/lib.rs +++ b/crates/net/eth-wire/src/lib.rs @@ -24,7 +24,7 @@ pub use tokio_util::codec::{ }; pub use crate::{ - disconnect::DisconnectReason, + disconnect::{CanDisconnect, DisconnectReason}, ethstream::{EthStream, UnauthedEthStream, MAX_MESSAGE_SIZE}, hello::HelloMessage, p2pstream::{P2PMessage, P2PMessageID, P2PStream, ProtocolVersion, UnauthedP2PStream}, diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index 6bbf31e231f0..25be3ae0d2b4 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -1,6 +1,7 @@ #![allow(dead_code, unreachable_pub, missing_docs, unused_variables)] use crate::{ capability::{Capability, SharedCapability}, + disconnect::CanDisconnect, errors::{P2PHandshakeError, P2PStreamError}, pinger::{Pinger, PingerEvent}, DisconnectReason, HelloMessage, @@ -72,25 +73,6 @@ impl UnauthedP2PStream { } } -impl UnauthedP2PStream -where - S: Sink + Unpin, -{ - /// Send a disconnect message during the handshake. This is sent without snappy compression. - pub async fn send_disconnect( - &mut self, - reason: DisconnectReason, - ) -> Result<(), P2PStreamError> { - let mut buf = BytesMut::new(); - P2PMessage::Disconnect(reason).encode(&mut buf); - tracing::trace!( - %reason, - "Sending disconnect message during the handshake", - ); - self.inner.send(buf.freeze()).await.map_err(P2PStreamError::Io) - } -} - impl UnauthedP2PStream where S: Stream> + Sink + Unpin, @@ -180,6 +162,35 @@ where } } +impl UnauthedP2PStream +where + S: Sink + Unpin, +{ + /// Send a disconnect message during the handshake. This is sent without snappy compression. + pub async fn send_disconnect( + &mut self, + reason: DisconnectReason, + ) -> Result<(), P2PStreamError> { + let mut buf = BytesMut::new(); + P2PMessage::Disconnect(reason).encode(&mut buf); + tracing::trace!( + %reason, + "Sending disconnect message during the handshake", + ); + self.inner.send(buf.freeze()).await.map_err(P2PStreamError::Io) + } +} + +#[async_trait::async_trait] +impl CanDisconnect for P2PStream +where + S: Sink + Unpin + Send + Sync, +{ + async fn disconnect(&mut self, reason: DisconnectReason) -> Result<(), P2PStreamError> { + self.disconnect(reason).await + } +} + /// A P2PStream wraps over any `Stream` that yields bytes and makes it compatible with `p2p` /// protocol messages. #[pin_project] @@ -284,13 +295,13 @@ impl P2PStream { impl P2PStream where - S: Sink + Unpin, + S: Sink + Unpin + Send, { /// Disconnects the connection by sending a disconnect message. /// /// This future resolves once the disconnect message has been sent and the stream has been /// closed. - pub async fn disconnect(mut self, reason: DisconnectReason) -> Result<(), P2PStreamError> { + pub async fn disconnect(&mut self, reason: DisconnectReason) -> Result<(), P2PStreamError> { self.start_disconnect(reason)?; self.close().await } @@ -821,7 +832,7 @@ mod tests { let (server_hello, _) = eth_hello(); - let (p2p_stream, _) = + let (mut p2p_stream, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap(); p2p_stream.disconnect(expected_disconnect).await.unwrap(); diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index d1e99e6cd968..b1a715fafe72 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -753,9 +753,9 @@ mod tests { &self, local_addr: SocketAddr, f: F, - ) -> Pin + Send + Sync>> + ) -> Pin + Send>> where - F: FnOnce(EthStream>>) -> O + Send + Sync + 'static, + F: FnOnce(EthStream>>) -> O + Send + 'static, O: Future + Send + Sync, { let status = self.status; diff --git a/crates/primitives/src/jsonu256.rs b/crates/primitives/src/jsonu256.rs index 0332c2da1878..58c71580615f 100644 --- a/crates/primitives/src/jsonu256.rs +++ b/crates/primitives/src/jsonu256.rs @@ -111,9 +111,6 @@ mod test { let data = JsonU256(U256::from(16)); let serialized = serde_json::to_string(&data).unwrap(); - assert_eq!( - serialized, - r#""0x0000000000000000000000000000000000000000000000000000000000000010""# - ); + assert_eq!(serialized, r#""0x10""#); } } diff --git a/crates/rpc/rpc-api/src/eth.rs b/crates/rpc/rpc-api/src/eth.rs index 2b08e40f5e68..5b6553e73186 100644 --- a/crates/rpc/rpc-api/src/eth.rs +++ b/crates/rpc/rpc-api/src/eth.rs @@ -62,11 +62,11 @@ pub trait EthApi { /// Returns the number of uncles in a block from a block matching the given block hash. #[method(name = "eth_getUncleCountByBlockHash")] - async fn block_uncles_count_by_hash(&self, hash: H256) -> Result; + async fn block_uncles_count_by_hash(&self, hash: H256) -> Result>; /// Returns the number of uncles in a block with given block number. #[method(name = "eth_getUncleCountByBlockNumber")] - async fn block_uncles_count_by_number(&self, number: BlockNumberOrTag) -> Result; + async fn block_uncles_count_by_number(&self, number: BlockNumberOrTag) -> Result>; /// Returns an uncle block of the given block and index. #[method(name = "eth_getUncleByBlockHashAndIndex")] diff --git a/crates/rpc/rpc-api/src/eth_filter.rs b/crates/rpc/rpc-api/src/eth_filter.rs index addd8ab9e47a..e7bdbe200f03 100644 --- a/crates/rpc/rpc-api/src/eth_filter.rs +++ b/crates/rpc/rpc-api/src/eth_filter.rs @@ -1,5 +1,5 @@ use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc}; -use reth_primitives::rpc::Filter; +use reth_primitives::filter::Filter; use reth_rpc_types::{FilterChanges, FilterId, Log}; /// Rpc Interface for poll-based ethereum filter API. diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 9b1b144b1c4a..67e32a50adc0 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -13,12 +13,13 @@ reth-network-api = { path = "../../net/network-api" } reth-provider = { path = "../../storage/provider" } reth-rpc = { path = "../rpc" } reth-rpc-api = { path = "../rpc-api" } +reth-rpc-engine-api = { path = "../rpc-engine-api" } reth-rpc-types = { path = "../rpc-types" } reth-transaction-pool = { path = "../../transaction-pool" } jsonrpsee = { version = "0.16", features = ["server"] } tower-http = { version = "0.3", features = ["full"] } -tower = {version = "0.4" , features = ["full"] } +tower = { version = "0.4", features = ["full"] } hyper = "0.14" strum = { version = "0.24", features = ["derive"] } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs new file mode 100644 index 000000000000..9adaa96f8049 --- /dev/null +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -0,0 +1,76 @@ +use crate::{constants::DEFAULT_AUTH_PORT, RpcServerConfig}; +use hyper::{http::HeaderValue, Method}; +pub use jsonrpsee::server::ServerBuilder; +use jsonrpsee::{ + core::{ + server::{host_filtering::AllowHosts, rpc_module::Methods}, + Error as RpcError, + }, + server::{middleware, Server, ServerHandle}, + RpcModule, +}; +use reth_ipc::server::IpcServer; +pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; +use reth_network_api::{NetworkInfo, Peers}; +use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory}; +use reth_rpc::{ + AdminApi, AuthLayer, DebugApi, EngineApi, EthApi, JwtAuthValidator, JwtSecret, NetApi, + TraceApi, Web3Api, +}; +use reth_rpc_api::servers::*; +use reth_rpc_engine_api::EngineApiHandle; +use reth_transaction_pool::TransactionPool; +use serde::{Deserialize, Serialize, Serializer}; +use std::{ + collections::{hash_map::Entry, HashMap}, + fmt, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + str::FromStr, +}; +use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames}; +use tower::layer::util::{Identity, Stack}; +use tower_http::cors::{AllowOrigin, Any, CorsLayer}; + +/// Configure and launch an auth server with `engine` and `eth` namespaces. +pub async fn launch( + client: Client, + pool: Pool, + network: Network, + handle: EngineApiHandle, + socket_addr: SocketAddr, + secret: JwtSecret, +) -> Result +where + Client: BlockProvider + HeaderProvider + StateProviderFactory + Clone + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, +{ + launch_with_eth_api(EthApi::new(client, pool, network), handle, socket_addr, secret).await +} + +/// Configure and launch an auth server with existing EthApi implementation. +pub async fn launch_with_eth_api( + eth_api: EthApi, + handle: EngineApiHandle, + socket_addr: SocketAddr, + secret: JwtSecret, +) -> Result +where + Client: BlockProvider + HeaderProvider + StateProviderFactory + Clone + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, +{ + // Configure the module and start the server. + let mut module = RpcModule::new(()); + module.merge(EngineApi::new(handle).into_rpc()); + module.merge(eth_api.into_rpc()); + + // Create auth middleware. + let middleware = + tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret))); + + // By default both http and ws are enabled. + let server = ServerBuilder::new().set_middleware(middleware).build(socket_addr).await?; + + server.start(module) +} diff --git a/crates/rpc/rpc-builder/src/constants.rs b/crates/rpc/rpc-builder/src/constants.rs new file mode 100644 index 000000000000..a1b2bc36a82d --- /dev/null +++ b/crates/rpc/rpc-builder/src/constants.rs @@ -0,0 +1,16 @@ +/// The default port for the http server +pub const DEFAULT_HTTP_RPC_PORT: u16 = 8545; + +/// The default port for the ws server +pub const DEFAULT_WS_RPC_PORT: u16 = 8546; + +/// The default port for the auth server. +pub const DEFAULT_AUTH_PORT: u16 = 8551; + +/// The default IPC endpoint +#[cfg(windows)] +pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc"; + +/// The default IPC endpoint +#[cfg(not(windows))] +pub const DEFAULT_IPC_ENDPOINT: &str = "/tmp/reth.ipc"; diff --git a/crates/rpc/rpc-builder/src/cors.rs b/crates/rpc/rpc-builder/src/cors.rs new file mode 100644 index 000000000000..7b7492c86d17 --- /dev/null +++ b/crates/rpc/rpc-builder/src/cors.rs @@ -0,0 +1,44 @@ +use hyper::{http::HeaderValue, Method}; +use tower_http::cors::{AllowOrigin, Any, CorsLayer}; + +/// Error thrown when parsing cors domains went wrong +#[derive(Debug, thiserror::Error)] +pub(crate) enum CorsDomainError { + #[error("{domain} is an invalid header value")] + InvalidHeader { domain: String }, + #[error("Wildcard origin (`*`) cannot be passed as part of a list: {input}")] + WildCardNotAllowed { input: String }, +} + +/// Creates a [CorsLayer] from the given domains +pub(crate) fn create_cors_layer(http_cors_domains: &str) -> Result { + let cors = match http_cors_domains.trim() { + "*" => CorsLayer::new() + .allow_methods([Method::GET, Method::POST]) + .allow_origin(Any) + .allow_headers(Any), + _ => { + let iter = http_cors_domains.split(','); + if iter.clone().any(|o| o == "*") { + return Err(CorsDomainError::WildCardNotAllowed { + input: http_cors_domains.to_string(), + }) + } + + let origins = iter + .map(|domain| { + domain + .parse::() + .map_err(|_| CorsDomainError::InvalidHeader { domain: domain.to_string() }) + }) + .collect::, _>>()?; + + let origin = AllowOrigin::list(origins); + CorsLayer::new() + .allow_methods([Method::GET, Method::POST]) + .allow_origin(origin) + .allow_headers(Any) + } + }; + Ok(cors) +} diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 5c8659a2e152..41cbe57b0251 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -52,7 +52,6 @@ //! ``` use hyper::{http::HeaderValue, Method}; -pub use jsonrpsee::server::ServerBuilder; use jsonrpsee::{ core::{ server::{host_filtering::AllowHosts, rpc_module::Methods}, @@ -62,7 +61,6 @@ use jsonrpsee::{ RpcModule, }; use reth_ipc::server::IpcServer; -pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{AdminApi, DebugApi, EthApi, NetApi, TraceApi, Web3Api}; @@ -77,21 +75,20 @@ use std::{ }; use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames}; use tower::layer::util::{Identity, Stack}; -use tower_http::cors::{AllowOrigin, Any, CorsLayer}; +use tower_http::cors::CorsLayer; -/// The default port for the http server -pub const DEFAULT_HTTP_RPC_PORT: u16 = 8545; +pub use jsonrpsee::server::ServerBuilder; +pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; -/// The default port for the ws server -pub const DEFAULT_WS_RPC_PORT: u16 = 8546; +/// Auth server utilities. +pub mod auth; -/// The default IPC endpoint -#[cfg(windows)] -pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc"; +/// Common RPC constants. +pub mod constants; +use constants::*; -/// The default IPC endpoint -#[cfg(not(windows))] -pub const DEFAULT_IPC_ENDPOINT: &str = "/tmp/reth.ipc"; +/// Cors utilities. +mod cors; /// Convenience function for starting a server in one step. pub async fn launch( @@ -508,10 +505,10 @@ pub struct RpcServerConfig { http_server_config: Option, /// Cors Domains http_cors_domains: Option, - /// Configs for WS server - ws_server_config: Option, /// Address where to bind the http server to http_addr: Option, + /// Configs for WS server + ws_server_config: Option, /// Address where to bind the ws server to ws_addr: Option, /// Configs for JSON-RPC IPC server @@ -621,7 +618,7 @@ impl RpcServerConfig { ))); if let Some(builder) = self.http_server_config { - if let Some(cors) = self.http_cors_domains.as_deref().map(create_cors_layer) { + if let Some(cors) = self.http_cors_domains.as_deref().map(cors::create_cors_layer) { let cors = cors.map_err(|err| RpcError::Custom(err.to_string()))?; let middleware = tower::ServiceBuilder::new().layer(cors); let http_server = @@ -658,48 +655,6 @@ impl RpcServerConfig { } } -/// Error thrown when parsing cors domains went wrong -#[derive(Debug, thiserror::Error)] -enum CorsDomainError { - #[error("{domain} is an invalid header value")] - InvalidHeader { domain: String }, - #[error("Wildcard origin (`*`) cannot be passed as part of a list: {input}")] - WildCardNotAllowed { input: String }, -} - -/// Creates a [CorsLayer] from the given domains -fn create_cors_layer(http_cors_domains: &str) -> Result { - let cors = match http_cors_domains.trim() { - "*" => CorsLayer::new() - .allow_methods([Method::GET, Method::POST]) - .allow_origin(Any) - .allow_headers(Any), - _ => { - let iter = http_cors_domains.split(','); - if iter.clone().any(|o| o == "*") { - return Err(CorsDomainError::WildCardNotAllowed { - input: http_cors_domains.to_string(), - }) - } - - let origins = iter - .map(|domain| { - domain - .parse::() - .map_err(|_| CorsDomainError::InvalidHeader { domain: domain.to_string() }) - }) - .collect::, _>>()?; - - let origin = AllowOrigin::list(origins); - CorsLayer::new() - .allow_methods([Method::GET, Method::POST]) - .allow_origin(origin) - .allow_headers(Any) - } - }; - Ok(cors) -} - /// Holds modules to be installed per transport type /// /// # Example diff --git a/crates/rpc/rpc-builder/tests/it/http.rs b/crates/rpc/rpc-builder/tests/it/http.rs index 9bf2c9f455eb..f6eec08ba3cd 100644 --- a/crates/rpc/rpc-builder/tests/it/http.rs +++ b/crates/rpc/rpc-builder/tests/it/http.rs @@ -71,16 +71,12 @@ where EthApiClient::block_by_number(client, block_number, false).await.unwrap(); EthApiClient::block_transaction_count_by_number(client, block_number).await.unwrap(); EthApiClient::block_transaction_count_by_hash(client, hash).await.unwrap(); + EthApiClient::block_uncles_count_by_hash(client, hash).await.unwrap(); + EthApiClient::block_uncles_count_by_number(client, block_number).await.unwrap(); // Unimplemented assert!(is_unimplemented(EthApiClient::syncing(client).await.err().unwrap())); assert!(is_unimplemented(EthApiClient::author(client).await.err().unwrap())); - assert!(is_unimplemented( - EthApiClient::block_uncles_count_by_hash(client, hash).await.err().unwrap() - )); - assert!(is_unimplemented( - EthApiClient::block_uncles_count_by_number(client, block_number).await.err().unwrap() - )); assert!(is_unimplemented( EthApiClient::uncle_by_block_hash_and_index(client, hash, index).await.err().unwrap() )); diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 2fc4d789fb4f..a635a1f61e75 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -10,7 +10,7 @@ use reth_primitives::{ BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, TransactionSigned, H64, U256, }; -use reth_provider::{BlockProvider, HeaderProvider, StateProvider}; +use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory}; use reth_rlp::Decodable; use reth_rpc_types::engine::{ ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, @@ -19,13 +19,15 @@ use reth_rpc_types::engine::{ use std::{ future::Future, pin::Pin, - sync::Arc, task::{ready, Context, Poll}, }; -use tokio::sync::{oneshot, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio_stream::wrappers::UnboundedReceiverStream; -/// The Engine API response sender +/// The Engine API handle. +pub type EngineApiHandle = mpsc::UnboundedSender; + +/// The Engine API response sender. pub type EngineApiSender = oneshot::Sender>; /// The upper limit for payload bodies request. @@ -35,7 +37,7 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024; /// functions in the Execution layer that are crucial for the consensus process. #[must_use = "EngineApi does nothing unless polled."] pub struct EngineApi { - client: Arc, + client: Client, /// Consensus configuration chain_spec: ChainSpec, message_rx: UnboundedReceiverStream, @@ -45,7 +47,22 @@ pub struct EngineApi { // remote_store: HashMap, } -impl EngineApi { +impl EngineApi { + /// Create new instance of [EngineApi]. + pub fn new( + client: Client, + chain_spec: ChainSpec, + message_rx: mpsc::UnboundedReceiver, + forkchoice_state_tx: watch::Sender, + ) -> Self { + Self { + client, + chain_spec, + message_rx: UnboundedReceiverStream::new(message_rx), + forkchoice_state_tx, + } + } + fn on_message(&mut self, msg: EngineApiMessage) { match msg { EngineApiMessage::GetPayload(payload_id, tx) => { @@ -285,14 +302,14 @@ impl EngineApi { })) } - let mut state_provider = SubState::new(State::new(&*self.client)); + let state_provider = self.client.latest()?; let total_difficulty = parent_td + block.header.difficulty; match executor::execute_and_verify_receipt( &block.unseal(), total_difficulty, None, &self.chain_spec, - &mut state_provider, + &mut SubState::new(State::new(&state_provider)), ) { Ok(_) => Ok(PayloadStatus::new(PayloadStatusEnum::Valid, block_hash)), Err(err) => Ok(PayloadStatus::new( @@ -394,7 +411,7 @@ impl EngineApi { impl Future for EngineApi where - Client: HeaderProvider + BlockProvider + StateProvider + Unpin, + Client: HeaderProvider + BlockProvider + StateProviderFactory + Unpin, { type Output = (); @@ -419,12 +436,13 @@ mod tests { use reth_interfaces::test_utils::generators::random_block; use reth_primitives::{H256, MAINNET}; use reth_provider::test_utils::MockEthProvider; + use std::sync::Arc; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedSender}, watch::Receiver as WatchReceiver, }; - fn setup_engine_api() -> (EngineApiTestHandle, EngineApi) { + fn setup_engine_api() -> (EngineApiTestHandle, EngineApi>) { let chain_spec = MAINNET.clone(); let client = Arc::new(MockEthProvider::default()); let (msg_tx, msg_rx) = unbounded_channel(); diff --git a/crates/rpc/rpc-engine-api/src/lib.rs b/crates/rpc/rpc-engine-api/src/lib.rs index daeab9bb2f88..b97f1b6facbb 100644 --- a/crates/rpc/rpc-engine-api/src/lib.rs +++ b/crates/rpc/rpc-engine-api/src/lib.rs @@ -17,6 +17,6 @@ mod message; /// Engine API error. mod error; -pub use engine_api::{EngineApi, EngineApiSender}; +pub use engine_api::{EngineApi, EngineApiHandle, EngineApiSender}; pub use error::*; pub use message::{EngineApiMessage, EngineApiMessageVersion}; diff --git a/crates/rpc/rpc-types/src/eth/block.rs b/crates/rpc/rpc-types/src/eth/block.rs index 3e408072215a..fc045e331e5a 100644 --- a/crates/rpc/rpc-types/src/eth/block.rs +++ b/crates/rpc/rpc-types/src/eth/block.rs @@ -73,16 +73,21 @@ pub struct Block { impl Block { /// Converts the given primitive block into a [Block] response with the given /// [BlockTransactionsKind] + /// + /// If a `block_hash` is provided, then this is used, otherwise the block hash is computed. pub fn from_block( block: PrimitiveBlock, total_difficulty: U256, kind: BlockTransactionsKind, + block_hash: Option, ) -> Result { match kind { BlockTransactionsKind::Hashes => { - Ok(Self::from_block_hashes_only(block, total_difficulty)) + Ok(Self::from_block_with_tx_hashes(block, total_difficulty, block_hash)) + } + BlockTransactionsKind::Full => { + Self::from_block_full(block, total_difficulty, block_hash) } - BlockTransactionsKind::Full => Self::from_block_full(block, total_difficulty), } } @@ -91,8 +96,12 @@ impl Block { /// /// This will populate the `transactions` field with only the hashes of the transactions in the /// block: [BlockTransactions::Hashes] - pub fn from_block_hashes_only(block: PrimitiveBlock, total_difficulty: U256) -> Self { - let block_hash = block.header.hash_slow(); + pub fn from_block_with_tx_hashes( + block: PrimitiveBlock, + total_difficulty: U256, + block_hash: Option, + ) -> Self { + let block_hash = block_hash.unwrap_or_else(|| block.header.hash_slow()); let transactions = block.body.iter().map(|tx| tx.hash).collect(); Self::from_block_with_transactions( @@ -111,8 +120,9 @@ impl Block { pub fn from_block_full( block: PrimitiveBlock, total_difficulty: U256, + block_hash: Option, ) -> Result { - let block_hash = block.header.hash_slow(); + let block_hash = block_hash.unwrap_or_else(|| block.header.hash_slow()); let block_number = block.number; let mut transactions = Vec::with_capacity(block.body.len()); for (idx, tx) in block.body.iter().enumerate() { diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 4719d79aebd4..35897baacf83 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -19,8 +19,12 @@ reth-provider = { path = "../../storage/provider", features = ["test-utils"] } reth-transaction-pool = { path = "../../transaction-pool", features = ["test-utils"]} reth-network-api = { path = "../../net/network-api", features = ["test-utils"] } reth-rpc-engine-api = { path = "../rpc-engine-api" } +reth-executor = { path = "../../executor" } reth-tasks = { path = "../../tasks" } +# eth +revm = "3.0.0" + # rpc jsonrpsee = { version = "0.16" } http = "0.2.8" diff --git a/crates/rpc/rpc/src/engine/mod.rs b/crates/rpc/rpc/src/engine/mod.rs index 677b658c4df2..2b917d394bcf 100644 --- a/crates/rpc/rpc/src/engine/mod.rs +++ b/crates/rpc/rpc/src/engine/mod.rs @@ -8,22 +8,26 @@ use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{BlockHash, BlockNumber, H64}; use reth_rpc_api::EngineApiServer; use reth_rpc_engine_api::{ - EngineApiError, EngineApiMessage, EngineApiMessageVersion, EngineApiResult, + EngineApiError, EngineApiHandle, EngineApiMessage, EngineApiMessageVersion, EngineApiResult, REQUEST_TOO_LARGE_CODE, UNKNOWN_PAYLOAD_CODE, }; use reth_rpc_types::engine::{ ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, TransitionConfiguration, CAPABILITIES, }; -use tokio::sync::{ - mpsc::UnboundedSender, - oneshot::{self, Receiver}, -}; +use tokio::sync::oneshot::{self, Receiver}; /// The server implementation of Engine API pub struct EngineApi { /// Handle to the consensus engine - engine_tx: UnboundedSender, + engine_tx: EngineApiHandle, +} + +impl EngineApi { + /// Creates a new instance of [EngineApi]. + pub fn new(engine_tx: EngineApiHandle) -> Self { + Self { engine_tx } + } } impl std::fmt::Debug for EngineApi { diff --git a/crates/rpc/rpc/src/eth/api/block.rs b/crates/rpc/rpc/src/eth/api/block.rs index 62b77ea4d6ef..0863a48fdc05 100644 --- a/crates/rpc/rpc/src/eth/api/block.rs +++ b/crates/rpc/rpc/src/eth/api/block.rs @@ -12,6 +12,17 @@ impl EthApi where Client: BlockProvider + StateProviderFactory + 'static, { + /// Returns the uncle headers of the given block + /// + /// Returns an empty vec if there are none. + pub(crate) fn ommers( + &self, + block_id: impl Into, + ) -> EthResult>> { + let block_id = block_id.into(); + Ok(self.client().ommers(block_id)?) + } + pub(crate) async fn block_transaction_count( &self, block_id: impl Into, @@ -43,7 +54,7 @@ where .client() .header_td(&block_hash)? .ok_or_else(|| EthApiError::UnknownBlockNumber)?; - let block = Block::from_block(block, total_difficulty, full.into())?; + let block = Block::from_block(block, total_difficulty, full.into(), Some(block_hash))?; Ok(Some(block.into())) } else { Ok(None) diff --git a/crates/rpc/rpc/src/eth/api/call.rs b/crates/rpc/rpc/src/eth/api/call.rs new file mode 100644 index 000000000000..fad0984e45a7 --- /dev/null +++ b/crates/rpc/rpc/src/eth/api/call.rs @@ -0,0 +1,64 @@ +//! Contains RPC handler implementations specific to endpoints that call/execute within evm. + +#![allow(unused)] // TODO rm later + +use crate::{eth::error::EthResult, EthApi}; +use reth_executor::revm_wrap::{State, SubState}; +use reth_primitives::{BlockId, U256}; +use reth_provider::{BlockProvider, StateProvider, StateProviderFactory}; +use reth_rpc_types::CallRequest; +use revm::primitives::{BlockEnv, Env, ResultAndState}; + +impl EthApi +where + Client: BlockProvider + StateProviderFactory + 'static, +{ + /// Creates the [Env] used by the [EVM](revm::EVM) when executing a call. + fn build_call_env(&self, request: CallRequest, block_env: BlockEnv) -> Env { + todo!() + } + + /// Executes the call request against the given `state` without committing any changes to the + /// database + pub(crate) fn call_with( + &self, + request: CallRequest, + state: S, + block_env: BlockEnv, + // TODO add overrides + ) -> EthResult + where + S: StateProvider, + { + // the revm database impl + let mut db = SubState::new(State::new(state)); + let mut evm = revm::EVM::new(); + evm.env = self.build_call_env(request, block_env); + evm.database(db); + // TODO error conversion from EMVError to EthApiErr + let res = evm.transact_ref().unwrap(); + Ok(res) + } + + /// Estimate gas needed for execution of the `request` at the [BlockId] . + pub(crate) fn estimate_gas_at(&self, mut request: CallRequest, at: BlockId) -> EthResult { + // TODO get a StateProvider for the given blockId and BlockEnv + todo!() + } + + /// Estimates the gas usage of the `request` with the state. + fn estimate_gas_with( + &self, + mut request: CallRequest, + state: S, + block_env: BlockEnv, + ) -> EthResult + where + S: StateProvider, + { + // TODO: check if transfer + // binary search over range to find best gas + + todo!() + } +} diff --git a/crates/rpc/rpc/src/eth/api/mod.rs b/crates/rpc/rpc/src/eth/api/mod.rs index 176d40ce3e1d..4d635cf682c1 100644 --- a/crates/rpc/rpc/src/eth/api/mod.rs +++ b/crates/rpc/rpc/src/eth/api/mod.rs @@ -13,11 +13,14 @@ use reth_primitives::{ use reth_provider::{BlockProvider, StateProviderFactory}; use std::num::NonZeroUsize; +use crate::eth::error::EthResult; +use reth_provider::providers::ChainState; use reth_rpc_types::FeeHistoryCache; use reth_transaction_pool::TransactionPool; use std::sync::Arc; mod block; +mod call; mod server; mod state; mod transactions; @@ -100,6 +103,14 @@ where self.client().convert_block_number(num) } + /// Helper function to execute a closure with the database at a specific block. + pub(crate) fn with_state_at(&self, _at: BlockId, _f: F) -> EthResult + where + F: FnOnce(ChainState<'_>) -> T, + { + unimplemented!() + } + /// Returns the state at the given [BlockId] enum or the latest. pub(crate) fn state_at_block_id_or_latest( &self, diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index 4a91e757f340..75edc235fc41 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -29,36 +29,44 @@ where Client: BlockProvider + HeaderProvider + StateProviderFactory + 'static, Network: 'static, { + /// Handler for: `eth_protocolVersion` async fn protocol_version(&self) -> Result { EthApiSpec::protocol_version(self).await.to_rpc_result() } + /// Handler for: `eth_syncing` fn syncing(&self) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_coinbase` async fn author(&self) -> Result
{ Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_accounts` async fn accounts(&self) -> Result> { Ok(EthApiSpec::accounts(self)) } + /// Handler for: `eth_blockNumber` fn block_number(&self) -> Result { Ok(U256::from( EthApiSpec::chain_info(self).with_message("failed to read chain info")?.best_number, )) } + /// Handler for: `eth_chainId` async fn chain_id(&self) -> Result> { Ok(Some(EthApiSpec::chain_id(self))) } + /// Handler for: `eth_getBlockByHash` async fn block_by_hash(&self, hash: H256, full: bool) -> Result> { Ok(EthApi::block(self, hash, full).await?) } + /// Handler for: `eth_getBlockByNumber` async fn block_by_number( &self, number: BlockNumberOrTag, @@ -67,10 +75,12 @@ where Ok(EthApi::block(self, number, full).await?) } + /// Handler for: `eth_getBlockTransactionCountByHash` async fn block_transaction_count_by_hash(&self, hash: H256) -> Result> { Ok(EthApi::block_transaction_count(self, hash).await?.map(U256::from)) } + /// Handler for: `eth_getBlockTransactionCountByNumber` async fn block_transaction_count_by_number( &self, number: BlockNumberOrTag, @@ -78,14 +88,17 @@ where Ok(EthApi::block_transaction_count(self, number).await?.map(U256::from)) } - async fn block_uncles_count_by_hash(&self, _hash: H256) -> Result { - Err(internal_rpc_err("unimplemented")) + /// Handler for: `eth_getUncleCountByBlockHash` + async fn block_uncles_count_by_hash(&self, hash: H256) -> Result> { + Ok(EthApi::ommers(self, hash)?.map(|ommers| U256::from(ommers.len()))) } - async fn block_uncles_count_by_number(&self, _number: BlockNumberOrTag) -> Result { - Err(internal_rpc_err("unimplemented")) + /// Handler for: `eth_getUncleCountByBlockNumber` + async fn block_uncles_count_by_number(&self, number: BlockNumberOrTag) -> Result> { + Ok(EthApi::ommers(self, number)?.map(|ommers| U256::from(ommers.len()))) } + /// Handler for: `eth_getUncleByBlockHashAndIndex` async fn uncle_by_block_hash_and_index( &self, _hash: H256, @@ -94,6 +107,7 @@ where Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_getUncleByBlockNumberAndIndex` async fn uncle_by_block_number_and_index( &self, _number: BlockNumberOrTag, @@ -102,6 +116,7 @@ where Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_getTransactionByHash` async fn transaction_by_hash( &self, _hash: H256, @@ -109,6 +124,7 @@ where Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_getTransactionByBlockHashAndIndex` async fn transaction_by_block_hash_and_index( &self, _hash: H256, @@ -117,6 +133,7 @@ where Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_getTransactionByBlockNumberAndIndex` async fn transaction_by_block_number_and_index( &self, _number: BlockNumberOrTag, @@ -125,14 +142,17 @@ where Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_getTransactionReceipt` async fn transaction_receipt(&self, _hash: H256) -> Result> { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_getBalance` async fn balance(&self, address: Address, block_number: Option) -> Result { Ok(EthApi::balance(self, address, block_number)?) } + /// Handler for: `eth_getStorageAt` async fn storage_at( &self, address: Address, @@ -142,6 +162,7 @@ where Ok(EthApi::storage_at(self, address, index, block_number)?) } + /// Handler for: `eth_getTransactionCount` async fn transaction_count( &self, address: Address, @@ -150,14 +171,17 @@ where Ok(EthApi::get_transaction_count(self, address, block_number)?) } + /// Handler for: `eth_getCode` async fn get_code(&self, address: Address, block_number: Option) -> Result { Ok(EthApi::get_code(self, address, block_number)?) } + /// Handler for: `eth_call` async fn call(&self, _request: CallRequest, _block_number: Option) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_createAccessList` async fn create_access_list( &self, _request: CallRequest, @@ -166,6 +190,7 @@ where Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_estimateGas` async fn estimate_gas( &self, _request: CallRequest, @@ -174,6 +199,7 @@ where Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_gasPrice` async fn gas_price(&self) -> Result { Err(internal_rpc_err("unimplemented")) } @@ -186,6 +212,7 @@ where // To minimize the number of database seeks required to query the missing data, we calculate the // first non-cached block number and last non-cached block number. After that, we query this // range of consecutive blocks from the database. + /// Handler for: `eth_feeHistory` async fn fee_history( &self, block_count: U64, @@ -283,50 +310,62 @@ where }) } + /// Handler for: `eth_maxPriorityFeePerGas` async fn max_priority_fee_per_gas(&self) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_mining` async fn is_mining(&self) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_hashrate` async fn hashrate(&self) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_getWork` async fn get_work(&self) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_submitHashrate` async fn submit_hashrate(&self, _hashrate: U256, _id: H256) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_submitWork` async fn submit_work(&self, _nonce: H64, _pow_hash: H256, _mix_digest: H256) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_sendTransaction` async fn send_transaction(&self, _request: TransactionRequest) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_sendRawTransaction` async fn send_raw_transaction(&self, tx: Bytes) -> Result { Ok(EthApi::send_raw_transaction(self, tx).await?) } + /// Handler for: `eth_sign` async fn sign(&self, _address: Address, _message: Bytes) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_signTransaction` async fn sign_transaction(&self, _transaction: CallRequest) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_signTypedData` async fn sign_typed_data(&self, _address: Address, _data: Value) -> Result { Err(internal_rpc_err("unimplemented")) } + /// Handler for: `eth_getProof` async fn get_proof( &self, _address: Address, @@ -353,6 +392,7 @@ mod tests { use crate::EthApi; #[tokio::test] + /// Handler for: `eth_test_fee_history` async fn test_fee_history() { let eth_api = EthApi::new(NoopProvider::default(), testing_pool(), NoopNetwork::default()); diff --git a/crates/rpc/rpc/src/eth/error.rs b/crates/rpc/rpc/src/eth/error.rs index 1e704127496d..f28897434846 100644 --- a/crates/rpc/rpc/src/eth/error.rs +++ b/crates/rpc/rpc/src/eth/error.rs @@ -1,14 +1,36 @@ //! Error variants for the `eth_` namespace. use jsonrpsee::{core::Error as RpcError, types::error::INVALID_PARAMS_CODE}; -use reth_rpc_types::BlockError; -use reth_transaction_pool::error::PoolError; use crate::result::{internal_rpc_err, rpc_err}; +use reth_rpc_types::BlockError; +use reth_transaction_pool::error::PoolError; /// Result alias pub(crate) type EthResult = Result; +/// List of JSON-RPC error codes +#[derive(Debug, Copy, PartialEq, Eq, Clone)] +pub(crate) enum EthRpcErrorCode { + /// Failed to send transaction, See also + TransactionRejected, + /// Custom geth error code, + ExecutionError, + /// + InvalidInput, +} + +impl EthRpcErrorCode { + /// Returns the error code as `i32` + pub(crate) const fn code(&self) -> i32 { + match *self { + EthRpcErrorCode::TransactionRejected => -32003, + EthRpcErrorCode::ExecutionError => 3, + EthRpcErrorCode::InvalidInput => -32000, + } + } +} + /// Errors that can occur when interacting with the `eth_` namespace #[derive(Debug, thiserror::Error)] #[allow(missing_docs)] @@ -26,6 +48,8 @@ pub(crate) enum EthApiError { UnknownBlockNumber, #[error("Invalid block range")] InvalidBlockRange, + #[error(transparent)] + InvalidTransaction(#[from] InvalidTransactionError), /// Thrown when constructing an RPC block from a primitive block data failed. #[error(transparent)] InvalidBlockData(#[from] BlockError), @@ -42,11 +66,86 @@ impl From for RpcError { EthApiError::EmptyRawTransactionData | EthApiError::UnknownBlockNumber | EthApiError::InvalidBlockRange => rpc_err(INVALID_PARAMS_CODE, value.to_string(), None), + EthApiError::InvalidTransaction(err) => err.into(), err => internal_rpc_err(err.to_string()), } } } +/// An error due to invalid transaction +/// +/// These error variants can be thrown when the transaction is checked prior to execution +#[derive(thiserror::Error, Debug)] +pub enum InvalidTransactionError { + /// returned if the nonce of a transaction is lower than the one present in the local chain. + #[error("nonce too low")] + NonceTooLow, + /// returned if the nonce of a transaction is higher than the next one expected based on the + /// local chain. + #[error("Nonce too high")] + NonceTooHigh, + /// Returned if the nonce of a transaction is too high + /// Incrementing the nonce would lead to invalid state (overflow) + #[error("nonce has max value")] + NonceMaxValue, + /// thrown if the transaction sender doesn't have enough funds for a transfer + #[error("insufficient funds for transfer")] + InsufficientFundsForTransfer, + /// thrown if creation transaction provides the init code bigger than init code size limit. + #[error("max initcode size exceeded")] + MaxInitCodeSizeExceeded, + /// Represents the inability to cover max cost + value (account balance too low). + #[error("Insufficient funds for gas * price + value")] + InsufficientFunds, + /// Thrown when calculating gas usage + #[error("gas uint64 overflow")] + GasUintOverflow, + /// returned if the transaction is specified to use less gas than required to start the + /// invocation. + #[error("intrinsic gas too low")] + GasTooLow, + /// returned if the transaction gas exceeds the limit + #[error("intrinsic gas too high")] + GasTooHigh, + /// thrown if a transaction is not supported in the current network configuration. + #[error("transaction type not supported")] + TxTypeNotSupported, + /// Thrown to ensure no one is able to specify a transaction with a tip higher than the total + /// fee cap. + #[error("max priority fee per gas higher than max fee per gas")] + TipAboveFeeCap, + /// A sanity error to avoid huge numbers specified in the tip field. + #[error("max priority fee per gas higher than 2^256-1")] + TipVeryHigh, + /// A sanity error to avoid huge numbers specified in the fee cap field. + #[error("max fee per gas higher than 2^256-1")] + FeeCapVeryHigh, + /// Thrown post London if the transaction's fee is less than the base fee of the block + #[error("max fee per gas less than block base fee")] + FeeCapTooLow, + /// Thrown if the sender of a transaction is a contract. + #[error("sender not an eoa")] + SenderNoEOA, +} + +impl InvalidTransactionError { + /// Returns the rpc error code for this error. + fn error_code(&self) -> i32 { + match self { + InvalidTransactionError::GasTooLow | InvalidTransactionError::GasTooHigh => { + EthRpcErrorCode::InvalidInput.code() + } + _ => EthRpcErrorCode::TransactionRejected.code(), + } + } +} + +impl From for RpcError { + fn from(err: InvalidTransactionError) -> Self { + rpc_err(err.error_code(), err.to_string(), None) + } +} + /// A helper error type that mirrors `geth` Txpool's error messages #[derive(Debug, thiserror::Error)] pub(crate) enum GethTxPoolError { diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index d510d62996fa..3942a5fd45ab 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -8,8 +8,8 @@ use jsonrpsee::{ server::{IdProvider, RandomIntegerIdProvider}, }; use reth_primitives::{ - rpc::{Filter, FilterBlockOption, FilteredParams}, - U256, + filter::{Filter, FilterBlockOption, FilteredParams}, + Block, U256, }; use reth_provider::BlockProvider; use reth_rpc_api::EthFilterApiServer; @@ -197,44 +197,50 @@ where /// - underlying database error /// - amount of matches exceeds configured limit #[allow(dead_code)] - fn filter_logs( - &self, - filter: &Filter, - _from_block: u64, - _to_block: u64, - ) -> RpcResult> { - let logs = Vec::new(); - let topics = if filter.has_topics() { - let params = FilteredParams::new(Some(filter.clone())); - Some(params.flat_topics) - } else { - None - }; + fn filter_logs(&self, filter: &Filter, from_block: u64, to_block: u64) -> RpcResult> { + let mut logs = Vec::new(); + let filter_params = FilteredParams::new(Some(filter.clone())); + + let topics = + if filter.has_topics() { Some(filter_params.flat_topics.clone()) } else { None }; - let _address_filter = FilteredParams::address_filter(&filter.address); - let _topics_filter = FilteredParams::topics_filter(&topics); - - // TODO blocked by - // let block_number = from_block; - // - // while block_number <= to_block { - // let _block = self - // .client - // .block_by_number(BlockNumberOrTag::Number(block_number.into())) - // .to_rpc_result()? - // .ok_or(EthApiError::UnknownBlockNumber)?; - // - // - // - // if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) && - // FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter) - // { - // // TODO filter logs from transactions - // } - // } + // derive bloom filters from filter input + let address_filter = FilteredParams::address_filter(&filter.address); + let topics_filter = FilteredParams::topics_filter(&topics); + + for block_number in from_block..=to_block { + if let Some(block) = self.client.block_by_number(block_number).to_rpc_result()? { + // only if filter matches + if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) && + FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter) + { + self.append_matching_block_logs(&mut logs, &filter_params, block); + + // TODO size check + } + } + } Ok(logs) } + + /// Appends all logs emitted in the `block` that match the `filter` to the `logs` vector. + #[allow(clippy::ptr_arg)] + fn append_matching_block_logs( + &self, + _logs: &mut Vec, + _filter: &FilteredParams, + block: Block, + ) { + let _block_log_index: u32 = 0; + let _block_hash = block.hash_slow(); + + // loop over all transactions in the block + for tx in block.body { + let _transaction_log_index: u32 = 0; + let _transaction_hash = tx.hash; + } + } } /// All active filters diff --git a/crates/staged-sync/src/utils/chainspec.rs b/crates/staged-sync/src/utils/chainspec.rs index 76a6401d29eb..27151bbbd9d6 100644 --- a/crates/staged-sync/src/utils/chainspec.rs +++ b/crates/staged-sync/src/utils/chainspec.rs @@ -29,3 +29,16 @@ pub fn genesis_value_parser(s: &str) -> Result { } }) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_chain_spec() { + for chain in ["mainnet", "sepolia", "goerli"] { + chain_spec_value_parser(chain).unwrap(); + genesis_value_parser(chain).unwrap(); + } + } +} diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages/src/pipeline/ctrl.rs index 5fd1259cfef4..2223527f46d0 100644 --- a/crates/stages/src/pipeline/ctrl.rs +++ b/crates/stages/src/pipeline/ctrl.rs @@ -15,11 +15,14 @@ pub(crate) enum ControlFlow { /// The progress of the last stage progress: BlockNumber, }, - NoProgress, + NoProgress { + /// The current stage progress. + stage_progress: Option, + }, } impl ControlFlow { pub(crate) fn should_continue(&self) -> bool { - matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress) + matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress { .. }) } } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 797c9b33639d..88712737cd8f 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -145,6 +145,13 @@ impl Pipeline { .zip(self.max_block) .map_or(false, |(progress, target)| progress >= target) { + trace!( + target: "sync::pipeline", + ?next_action, + minimum_progress = ?self.progress.minimum_progress, + max_block = ?self.max_block, + "Terminating pipeline." + ); return Ok(()) } } @@ -168,18 +175,18 @@ impl Pipeline { updater.update_sync_state(state); } - trace!( - target: "sync::pipeline", - stage = %stage_id, - "Executing stage" - ); + trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage"); let next = self .execute_stage_to_completion(db, previous_stage, stage_index) .instrument(info_span!("execute", stage = %stage_id)) .await?; match next { - ControlFlow::NoProgress => {} // noop + ControlFlow::NoProgress { stage_progress } => { + if let Some(progress) = stage_progress { + self.progress.update(progress); + } + } ControlFlow::Continue { progress } => self.progress.update(progress), ControlFlow::Unwind { target, bad_block } => { // reset the sync state @@ -277,7 +284,7 @@ impl Pipeline { self.listeners.notify(PipelineEvent::Skipped { stage_id }); // We reached the maximum block, so we skip the stage - return Ok(ControlFlow::NoProgress) + return Ok(ControlFlow::NoProgress { stage_progress: prev_progress }) } self.listeners @@ -308,7 +315,7 @@ impl Pipeline { return Ok(if made_progress { ControlFlow::Continue { progress: stage_progress } } else { - ControlFlow::NoProgress + ControlFlow::NoProgress { stage_progress: Some(stage_progress) } }) } } @@ -405,7 +412,7 @@ impl PipelineProgress { fn next_ctrl(&self) -> ControlFlow { match self.progress { Some(progress) => ControlFlow::Continue { progress }, - None => ControlFlow::NoProgress, + None => ControlFlow::NoProgress { stage_progress: None }, } } } @@ -458,7 +465,7 @@ mod tests { fn progress_ctrl_flow() { let mut progress = PipelineProgress::default(); - assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress); + assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { stage_progress: None }); progress.update(1); assert_eq!(progress.next_ctrl(), ControlFlow::Continue { progress: 1 }); diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index b71b70ce6bef..ce809e54c1c9 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -8,52 +8,22 @@ readme = "README.md" description = "Reth storage provider." [dependencies] -#reth -reth-codecs = { path = "../codecs" } +# reth reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } -reth-rpc-types = { path = "../../rpc/rpc-types" } reth-db = { path = "../db" } -# codecs -postcard = { version = "1.0.2", features = ["alloc"] } -parity-scale-codec = { version = "3.2.1", features = ["bytes"] } - # misc -async-trait = "0.1.57" thiserror = "1.0.37" auto_impl = "1.0" -tokio = { version = "1.21.2", features = ["sync"] } -bytes = "1.4" -futures = "0.3.25" -tokio-stream = "0.1.11" -rand = "0.8.5" -modular-bitfield = "0.11.2" -heapless = "0.7.16" # feature test-utils -arbitrary = { version = "1.1.7", features = ["derive"], optional = true } -secp256k1 = { version = "0.24.2", default-features = false, features = [ - "alloc", - "recovery", - "rand", -], optional = true } parking_lot = { version = "0.12", optional = true } [dev-dependencies] reth-db = { path = "../db", features = ["test-utils"] } -test-fuzz = "3.0.4" -tokio = { version = "1.21.2", features = ["full"] } -tokio-stream = { version = "0.1.11", features = ["sync"] } -arbitrary = { version = "1.1.7", features = ["derive"] } -hex-literal = "0.3" -secp256k1 = { version = "0.24.2", default-features = false, features = [ - "alloc", - "recovery", - "rand", -] } parking_lot = "0.12" [features] bench = [] -test-utils = ["tokio-stream/sync", "secp256k1", "parking_lot"] +test-utils = ["parking_lot"] diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index ff56d4cea2f1..5660770a91eb 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -1,4 +1,4 @@ -#![warn(missing_docs, unreachable_pub)] +#![warn(missing_docs, unreachable_pub, unused_crate_dependencies)] #![deny(unused_must_use, rust_2018_idioms)] #![doc(test( no_crate_inject, diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 8b8b1b2b87ca..dd7f86bfc8ef 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -135,6 +135,17 @@ impl BlockProvider for ShareableDatabase { Ok(None) } + + fn ommers(&self, id: BlockId) -> Result>> { + if let Some(number) = self.block_number_for_id(id)? { + let tx = self.db.tx()?; + // TODO: this can be optimized to return empty Vec post-merge + let ommers = tx.get::(number)?.map(|o| o.ommers); + return Ok(ommers) + } + + Ok(None) + } } impl TransactionsProvider for ShareableDatabase { diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index d609de6b35df..9a82741e172a 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -198,6 +198,10 @@ impl BlockProvider for MockEthProvider { } } } + + fn ommers(&self, _id: BlockId) -> Result>> { + Ok(None) + } } impl AccountProvider for MockEthProvider { @@ -244,3 +248,23 @@ impl StateProviderFactory for MockEthProvider { todo!() } } + +impl StateProviderFactory for Arc { + type HistorySP<'a> = &'a MockEthProvider where Self: 'a; + type LatestSP<'a> = &'a MockEthProvider where Self: 'a; + + fn latest(&self) -> Result> { + Ok(self) + } + + fn history_by_block_number( + &self, + _block: reth_primitives::BlockNumber, + ) -> Result> { + todo!() + } + + fn history_by_block_hash(&self, _block: BlockHash) -> Result> { + todo!() + } +} diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 67900d2020fd..a71d4882abae 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -35,14 +35,18 @@ impl BlockProvider for NoopProvider { fn block(&self, _id: BlockId) -> Result> { Ok(None) } + + fn ommers(&self, _id: BlockId) -> Result>> { + Ok(None) + } } impl TransactionsProvider for NoopProvider { - fn transaction_by_hash(&self, _hash: TxHash) -> Result> { + fn transaction_by_id(&self, _id: TxNumber) -> Result> { Ok(None) } - fn transaction_by_id(&self, _id: TxNumber) -> Result> { + fn transaction_by_hash(&self, _hash: TxHash) -> Result> { Ok(None) } diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index 7d289e2e9af2..2b04b601e0b2 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -1,15 +1,22 @@ use crate::{BlockIdProvider, HeaderProvider, TransactionsProvider}; use reth_interfaces::Result; -use reth_primitives::{Block, BlockId, BlockNumberOrTag, H256}; +use reth_primitives::{Block, BlockId, BlockNumberOrTag, Header, H256}; /// Api trait for fetching `Block` related data. #[auto_impl::auto_impl(&, Arc)] pub trait BlockProvider: BlockIdProvider + HeaderProvider + TransactionsProvider + Send + Sync { - /// Returns the block. Returns `None` if block is not found. + /// Returns the block. + /// + /// Returns `None` if block is not found. fn block(&self, id: BlockId) -> Result>; + /// Returns the ommers/uncle headers of the given block. + /// + /// Returns `None` if block is not found. + fn ommers(&self, id: BlockId) -> Result>>; + /// Returns the block. Returns `None` if block is not found. fn block_by_hash(&self, hash: H256) -> Result> { self.block(hash.into()) diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 3b8fefc4dce7..34a09589b3bd 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -17,8 +17,9 @@ normal = [ [dependencies] -# eth +# reth reth-primitives = { path = "../primitives" } +reth-rlp = { path = "../rlp" } # async/futures async-trait = "0.1" @@ -37,6 +38,7 @@ tracing = "0.1" serde = { version = "1.0", features = ["derive", "rc"], optional = true } fnv = "1.0.7" bitflags = "1.3" +auto_impl = "1.0" # ruint # Using the uint! requires the crate to be imported diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 001a9bc0349f..3772b41861b0 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -202,6 +202,7 @@ where TransactionValidationOutcome::Valid { balance, state_nonce, transaction } => { let sender_id = self.get_sender_id(transaction.sender()); let transaction_id = TransactionId::new(sender_id, transaction.nonce()); + let encoded_length = transaction.encoded_length(); let tx = ValidPoolTransaction { cost: transaction.cost(), @@ -210,6 +211,7 @@ where propagate: false, timestamp: Instant::now(), origin, + encoded_length, }; let added = self.pool.write().add_transaction(tx, balance, state_nonce)?; diff --git a/crates/transaction-pool/src/test_utils/mock.rs b/crates/transaction-pool/src/test_utils/mock.rs index ba88d15ebdb4..da03552b807c 100644 --- a/crates/transaction-pool/src/test_utils/mock.rs +++ b/crates/transaction-pool/src/test_utils/mock.rs @@ -13,7 +13,7 @@ use rand::{ }; use reth_primitives::{ Address, FromRecoveredTransaction, IntoRecoveredTransaction, Transaction, TransactionKind, - TransactionSignedEcRecovered, TxEip1559, TxHash, TxLegacy, H256, U128, U256, + TransactionSignedEcRecovered, TxEip1559, TxHash, TxLegacy, TxType, H256, U128, U256, }; use std::{ops::Range, sync::Arc, time::Instant}; @@ -345,6 +345,17 @@ impl PoolTransaction for MockTransaction { fn size(&self) -> usize { 0 } + + fn tx_type(&self) -> u8 { + match self { + MockTransaction::Legacy { .. } => TxType::Legacy.into(), + MockTransaction::Eip1559 { .. } => TxType::EIP1559.into(), + } + } + + fn encoded_length(&self) -> usize { + 0 + } } impl FromRecoveredTransaction for MockTransaction { @@ -427,6 +438,7 @@ impl MockTransactionFactory { transaction: MockTransaction, ) -> MockValidTx { let transaction_id = self.tx_id(&transaction); + let encoded_length = transaction.encoded_length(); MockValidTx { propagate: false, transaction_id, @@ -434,6 +446,7 @@ impl MockTransactionFactory { transaction, timestamp: Instant::now(), origin, + encoded_length, } } diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index d9b9461b0b66..94a4d358094d 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -3,6 +3,7 @@ use reth_primitives::{ Address, FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, Transaction, TransactionKind, TransactionSignedEcRecovered, TxHash, H256, U256, }; +use reth_rlp::Encodable; use std::{collections::HashMap, fmt, sync::Arc}; use tokio::sync::mpsc::Receiver; @@ -18,6 +19,7 @@ use serde::{Deserialize, Serialize}; /// Note: This requires `Clone` for convenience, since it is assumed that this will be implemented /// for a wrapped `Arc` type, see also [`Pool`](crate::Pool). #[async_trait::async_trait] +#[auto_impl::auto_impl(Arc)] pub trait TransactionPool: Send + Sync + Clone { /// The transaction type of the pool type Transaction: PoolTransaction; @@ -287,6 +289,12 @@ pub trait PoolTransaction: fmt::Debug + Send + Sync + FromRecoveredTransaction { /// Returns a measurement of the heap usage of this type and all its internals. fn size(&self) -> usize; + + /// Returns the transaction type + fn tx_type(&self) -> u8; + + /// Returns the length of the rlp encoded object + fn encoded_length(&self) -> usize; } /// The default [PoolTransaction] for the [Pool](crate::Pool). @@ -372,6 +380,16 @@ impl PoolTransaction for PooledTransaction { fn size(&self) -> usize { self.transaction.transaction.input().len() } + + /// Returns the transaction type + fn tx_type(&self) -> u8 { + self.transaction.tx_type().into() + } + + /// Returns the length of the rlp encoded object + fn encoded_length(&self) -> usize { + self.transaction.length() + } } impl FromRecoveredTransaction for PooledTransaction { diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index 5fcdf2340a0f..a5d855b3a097 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -94,6 +94,8 @@ pub struct ValidPoolTransaction { pub timestamp: Instant, /// Where this transaction originated from. pub origin: TransactionOrigin, + /// The length of the rlp encoded transaction (cached) + pub encoded_length: usize, } // === impl ValidPoolTransaction === @@ -160,6 +162,7 @@ impl Clone for ValidPoolTransaction { cost: self.cost, timestamp: self.timestamp, origin: self.origin, + encoded_length: self.encoded_length, } } }