diff --git a/Cargo.lock b/Cargo.lock index c62b0f700e2a..92170732c66e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8989,6 +8989,7 @@ dependencies = [ "assert_matches", "async-trait", "chrono", + "once_cell", "serde", "test-casing", "thiserror", diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 1355c05d6d0c..f29fffe7dd8e 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -638,7 +638,8 @@ async fn init_tasks( "Running tree data fetcher (allows a node to operate w/o a Merkle tree or w/o waiting the tree to catch up). \ This is an experimental feature; do not use unless you know what you're doing" ); - let fetcher = TreeDataFetcher::new(main_node_client.clone(), connection_pool.clone()); + let fetcher = TreeDataFetcher::new(main_node_client.clone(), connection_pool.clone()) + .with_l1_data(eth_client.clone(), config.remote.diamond_proxy_addr)?; app_health.insert_component(fetcher.health_check())?; task_handles.push(tokio::spawn(fetcher.run(stop_receiver.clone()))); } diff --git a/core/lib/basic_types/src/web3/mod.rs b/core/lib/basic_types/src/web3/mod.rs index bbf511cdd0f0..4ee7d2944b08 100644 --- a/core/lib/basic_types/src/web3/mod.rs +++ b/core/lib/basic_types/src/web3/mod.rs @@ -138,30 +138,36 @@ impl<'a> Visitor<'a> for BytesVisitor { // `Log`: from `web3::types::log` /// Filter -#[derive(Default, Debug, PartialEq, Clone, Serialize)] +#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)] pub struct Filter { /// From Block #[serde(rename = "fromBlock", skip_serializing_if = "Option::is_none")] - from_block: Option, + pub from_block: Option, /// To Block #[serde(rename = "toBlock", skip_serializing_if = "Option::is_none")] - to_block: Option, + pub to_block: Option, /// Block Hash #[serde(rename = "blockHash", skip_serializing_if = "Option::is_none")] - block_hash: Option, + pub block_hash: Option, /// Address #[serde(skip_serializing_if = "Option::is_none")] - address: Option>, + pub address: Option>, /// Topics #[serde(skip_serializing_if = "Option::is_none")] - topics: Option>>>, + pub topics: Option>>>, /// Limit #[serde(skip_serializing_if = "Option::is_none")] - limit: Option, + pub limit: Option, } #[derive(Default, Debug, PartialEq, Clone)] -struct ValueOrArray(Vec); +pub struct ValueOrArray(Vec); + +impl ValueOrArray { + pub fn flatten(self) -> Vec { + self.0 + } +} impl Serialize for ValueOrArray where @@ -179,6 +185,25 @@ where } } +impl<'de, T> Deserialize<'de> for ValueOrArray +where + T: Deserialize<'de>, +{ + fn deserialize>(deserializer: D) -> Result { + #[derive(Deserialize)] + #[serde(untagged)] + enum Repr { + Single(T), + Sequence(Vec), + } + + Ok(match Repr::::deserialize(deserializer)? { + Repr::Single(element) => Self(vec![element]), + Repr::Sequence(elements) => Self(elements), + }) + } +} + // Filter Builder #[derive(Default, Clone)] pub struct FilterBuilder { @@ -271,7 +296,7 @@ fn topic_to_option(topic: ethabi::Topic) -> Option> { } /// A log produced by a transaction. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] pub struct Log { /// H160 pub address: H160, diff --git a/core/lib/eth_client/src/clients/http/query.rs b/core/lib/eth_client/src/clients/http/query.rs index 6cb7a44eaf3b..d7db9860926f 100644 --- a/core/lib/eth_client/src/clients/http/query.rs +++ b/core/lib/eth_client/src/clients/http/query.rs @@ -3,11 +3,11 @@ use std::fmt; use async_trait::async_trait; use jsonrpsee::core::ClientError; use zksync_types::{web3, Address, L1ChainId, H256, U256, U64}; -use zksync_web3_decl::error::{ClientRpcContext, EnrichedClientError}; +use zksync_web3_decl::error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult}; use super::{decl::L1EthNamespaceClient, Method, COUNTERS, LATENCIES}; use crate::{ - types::{Error, ExecutedTxStatus, FailureInfo}, + types::{ExecutedTxStatus, FailureInfo}, EthInterface, RawTransactionBytes, }; @@ -16,15 +16,14 @@ impl EthInterface for T where T: L1EthNamespaceClient + fmt::Debug + Send + Sync, { - async fn fetch_chain_id(&self) -> Result { + async fn fetch_chain_id(&self) -> EnrichedClientResult { COUNTERS.call[&(Method::ChainId, self.component())].inc(); let latency = LATENCIES.direct[&Method::ChainId].start(); let raw_chain_id = self.chain_id().rpc_context("chain_id").await?; latency.observe(); let chain_id = u64::try_from(raw_chain_id).map_err(|err| { let err = ClientError::Custom(format!("invalid chainId: {err}")); - let err = EnrichedClientError::new(err, "chain_id").with_arg("chain_id", &raw_chain_id); - Error::EthereumGateway(err) + EnrichedClientError::new(err, "chain_id").with_arg("chain_id", &raw_chain_id) })?; Ok(L1ChainId(chain_id)) } @@ -33,7 +32,7 @@ where &self, account: Address, block: web3::BlockNumber, - ) -> Result { + ) -> EnrichedClientResult { COUNTERS.call[&(Method::NonceAtForAccount, self.component())].inc(); let latency = LATENCIES.direct[&Method::NonceAtForAccount].start(); let nonce = self @@ -46,7 +45,7 @@ where Ok(nonce) } - async fn block_number(&self) -> Result { + async fn block_number(&self) -> EnrichedClientResult { COUNTERS.call[&(Method::BlockNumber, self.component())].inc(); let latency = LATENCIES.direct[&Method::BlockNumber].start(); let block_number = self @@ -57,7 +56,7 @@ where Ok(block_number) } - async fn get_gas_price(&self) -> Result { + async fn get_gas_price(&self) -> EnrichedClientResult { COUNTERS.call[&(Method::GetGasPrice, self.component())].inc(); let latency = LATENCIES.direct[&Method::GetGasPrice].start(); let network_gas_price = self.gas_price().rpc_context("gas_price").await?; @@ -65,7 +64,7 @@ where Ok(network_gas_price) } - async fn send_raw_tx(&self, tx: RawTransactionBytes) -> Result { + async fn send_raw_tx(&self, tx: RawTransactionBytes) -> EnrichedClientResult { let latency = LATENCIES.direct[&Method::SendRawTx].start(); let tx = self .send_raw_transaction(web3::Bytes(tx.0)) @@ -79,7 +78,7 @@ where &self, upto_block: usize, block_count: usize, - ) -> Result, Error> { + ) -> EnrichedClientResult> { const MAX_REQUEST_CHUNK: usize = 1024; COUNTERS.call[&(Method::BaseFeeHistory, self.component())].inc(); @@ -111,7 +110,7 @@ where Ok(history.into_iter().map(|fee| fee.as_u64()).collect()) } - async fn get_pending_block_base_fee_per_gas(&self) -> Result { + async fn get_pending_block_base_fee_per_gas(&self) -> EnrichedClientResult { COUNTERS.call[&(Method::PendingBlockBaseFee, self.component())].inc(); let latency = LATENCIES.direct[&Method::PendingBlockBaseFee].start(); @@ -140,7 +139,7 @@ where Ok(block.base_fee_per_gas.unwrap()) } - async fn get_tx_status(&self, hash: H256) -> Result, Error> { + async fn get_tx_status(&self, hash: H256) -> EnrichedClientResult> { COUNTERS.call[&(Method::GetTxStatus, self.component())].inc(); let latency = LATENCIES.direct[&Method::GetTxStatus].start(); @@ -162,7 +161,7 @@ where Ok(res) } - async fn failure_reason(&self, tx_hash: H256) -> Result, Error> { + async fn failure_reason(&self, tx_hash: H256) -> EnrichedClientResult> { let latency = LATENCIES.direct[&Method::FailureReason].start(); let transaction = self .get_transaction_by_hash(tx_hash) @@ -218,7 +217,7 @@ where gas_limit, })) } else { - Err(err.into()) + Err(err) } } Ok(_) => Ok(None), @@ -231,7 +230,7 @@ where } } - async fn get_tx(&self, hash: H256) -> Result, Error> { + async fn get_tx(&self, hash: H256) -> EnrichedClientResult> { COUNTERS.call[&(Method::GetTx, self.component())].inc(); let tx = self .get_transaction_by_hash(hash) @@ -245,7 +244,7 @@ where &self, request: web3::CallRequest, block: Option, - ) -> Result { + ) -> EnrichedClientResult { let latency = LATENCIES.direct[&Method::CallContractFunction].start(); let block = block.unwrap_or_else(|| web3::BlockNumber::Latest.into()); let output_bytes = self @@ -258,7 +257,10 @@ where Ok(output_bytes) } - async fn tx_receipt(&self, tx_hash: H256) -> Result, Error> { + async fn tx_receipt( + &self, + tx_hash: H256, + ) -> EnrichedClientResult> { COUNTERS.call[&(Method::TxReceipt, self.component())].inc(); let latency = LATENCIES.direct[&Method::TxReceipt].start(); let receipt = self @@ -270,7 +272,7 @@ where Ok(receipt) } - async fn eth_balance(&self, address: Address) -> Result { + async fn eth_balance(&self, address: Address) -> EnrichedClientResult { COUNTERS.call[&(Method::EthBalance, self.component())].inc(); let latency = LATENCIES.direct[&Method::EthBalance].start(); let balance = self @@ -282,19 +284,22 @@ where Ok(balance) } - async fn logs(&self, filter: web3::Filter) -> Result, Error> { + async fn logs(&self, filter: &web3::Filter) -> EnrichedClientResult> { COUNTERS.call[&(Method::Logs, self.component())].inc(); let latency = LATENCIES.direct[&Method::Logs].start(); let logs = self .get_logs(filter.clone()) .rpc_context("get_logs") - .with_arg("filter", &filter) + .with_arg("filter", filter) .await?; latency.observe(); Ok(logs) } - async fn block(&self, block_id: web3::BlockId) -> Result>, Error> { + async fn block( + &self, + block_id: web3::BlockId, + ) -> EnrichedClientResult>> { COUNTERS.call[&(Method::Block, self.component())].inc(); let latency = LATENCIES.direct[&Method::Block].start(); let block = match block_id { diff --git a/core/lib/eth_client/src/clients/http/signing.rs b/core/lib/eth_client/src/clients/http/signing.rs index f58130c51c74..bf7cdc5e06b3 100644 --- a/core/lib/eth_client/src/clients/http/signing.rs +++ b/core/lib/eth_client/src/clients/http/signing.rs @@ -10,7 +10,7 @@ use zksync_web3_decl::client::{DynClient, L1}; use super::{Method, LATENCIES}; use crate::{ - types::{encode_blob_tx_with_sidecar, Error, SignedCallResult}, + types::{encode_blob_tx_with_sidecar, ContractCallError, SignedCallResult, SigningError}, BoundEthInterface, CallFunctionArgs, EthInterface, Options, RawTransactionBytes, }; @@ -115,7 +115,7 @@ impl BoundEthInterface for SigningClient { data: Vec, contract_addr: H160, options: Options, - ) -> Result { + ) -> Result { let latency = LATENCIES.direct[&Method::SignPreparedTx].start(); // Fetch current max priority fee per gas let max_priority_fee_per_gas = match options.max_priority_fee_per_gas { @@ -125,10 +125,10 @@ impl BoundEthInterface for SigningClient { if options.transaction_type == Some(EIP_4844_TX_TYPE.into()) { if options.max_fee_per_blob_gas.is_none() { - return Err(Error::Eip4844MissingMaxFeePerBlobGas); + return Err(SigningError::Eip4844MissingMaxFeePerBlobGas); } if options.blob_versioned_hashes.is_none() { - return Err(Error::Eip4844MissingBlobVersionedHashes); + return Err(SigningError::Eip4844MissingBlobVersionedHashes); } } @@ -141,7 +141,7 @@ impl BoundEthInterface for SigningClient { }; if max_fee_per_gas < max_priority_fee_per_gas { - return Err(Error::WrongFeeProvided( + return Err(SigningError::WrongFeeProvided( max_fee_per_gas, max_priority_fee_per_gas, )); @@ -198,7 +198,7 @@ impl BoundEthInterface for SigningClient { token_address: Address, address: Address, erc20_abi: ðabi::Contract, - ) -> Result { + ) -> Result { let latency = LATENCIES.direct[&Method::Allowance].start(); let allowance: U256 = CallFunctionArgs::new("allowance", (self.inner.sender_account, address)) diff --git a/core/lib/eth_client/src/clients/mock.rs b/core/lib/eth_client/src/clients/mock.rs index a5dcd00dc8d1..f4f9467641f7 100644 --- a/core/lib/eth_client/src/clients/mock.rs +++ b/core/lib/eth_client/src/clients/mock.rs @@ -13,7 +13,7 @@ use zksync_types::{ use zksync_web3_decl::client::{DynClient, MockClient, L1}; use crate::{ - types::{Error, SignedCallResult}, + types::{ContractCallError, SignedCallResult, SigningError}, BoundEthInterface, Options, RawTransactionBytes, }; @@ -475,7 +475,7 @@ impl MockEthereum { mut raw_tx: Vec, contract_addr: Address, options: Options, - ) -> Result { + ) -> Result { let max_fee_per_gas = options.max_fee_per_gas.unwrap_or(self.max_fee_per_gas); let max_priority_fee_per_gas = options .max_priority_fee_per_gas @@ -570,7 +570,7 @@ impl BoundEthInterface for MockEthereum { data: Vec, contract_addr: H160, options: Options, - ) -> Result { + ) -> Result { self.sign_prepared_tx(data, contract_addr, options) } @@ -579,7 +579,7 @@ impl BoundEthInterface for MockEthereum { _token_address: Address, _contract_address: Address, _erc20_abi: ðabi::Contract, - ) -> Result { + ) -> Result { unimplemented!("Not needed right now") } } diff --git a/core/lib/eth_client/src/lib.rs b/core/lib/eth_client/src/lib.rs index 20a843e9504f..206eee01227c 100644 --- a/core/lib/eth_client/src/lib.rs +++ b/core/lib/eth_client/src/lib.rs @@ -11,11 +11,14 @@ use zksync_types::{ Address, L1ChainId, H160, H256, U256, U64, }; use zksync_web3_decl::client::{DynClient, L1}; -pub use zksync_web3_decl::{error::EnrichedClientError, jsonrpsee::core::ClientError}; +pub use zksync_web3_decl::{ + error::{EnrichedClientError, EnrichedClientResult}, + jsonrpsee::core::ClientError, +}; pub use crate::types::{ - encode_blob_tx_with_sidecar, CallFunctionArgs, ContractCall, ContractError, Error, - ExecutedTxStatus, FailureInfo, RawTransactionBytes, SignedCallResult, + encode_blob_tx_with_sidecar, CallFunctionArgs, ContractCall, ContractCallError, + ExecutedTxStatus, FailureInfo, RawTransactionBytes, SignedCallResult, SigningError, }; pub mod clients; @@ -76,14 +79,14 @@ impl Options { pub trait EthInterface: Sync + Send { /// Fetches the L1 chain ID (in contrast to [`BoundEthInterface::chain_id()`] which returns /// the *expected* L1 chain ID). - async fn fetch_chain_id(&self) -> Result; + async fn fetch_chain_id(&self) -> EnrichedClientResult; /// Returns the nonce of the provided account at the specified block. async fn nonce_at_for_account( &self, account: Address, block: BlockNumber, - ) -> Result; + ) -> EnrichedClientResult; /// Collects the base fee history for the specified block range. /// @@ -93,25 +96,25 @@ pub trait EthInterface: Sync + Send { &self, from_block: usize, block_count: usize, - ) -> Result, Error>; + ) -> EnrichedClientResult>; /// Returns the `base_fee_per_gas` value for the currently pending L1 block. - async fn get_pending_block_base_fee_per_gas(&self) -> Result; + async fn get_pending_block_base_fee_per_gas(&self) -> EnrichedClientResult; /// Returns the current gas price. - async fn get_gas_price(&self) -> Result; + async fn get_gas_price(&self) -> EnrichedClientResult; /// Returns the current block number. - async fn block_number(&self) -> Result; + async fn block_number(&self) -> EnrichedClientResult; /// Sends a transaction to the Ethereum network. - async fn send_raw_tx(&self, tx: RawTransactionBytes) -> Result; + async fn send_raw_tx(&self, tx: RawTransactionBytes) -> EnrichedClientResult; /// Fetches the transaction status for a specified transaction hash. /// /// Returns `Ok(None)` if the transaction is either not found or not executed yet. /// Returns `Err` only if the request fails (e.g. due to network issues). - async fn get_tx_status(&self, hash: H256) -> Result, Error>; + async fn get_tx_status(&self, hash: H256) -> EnrichedClientResult>; /// For a reverted transaction, attempts to recover information on the revert reason. /// @@ -119,16 +122,16 @@ pub trait EthInterface: Sync + Send { /// Returns `Ok(None)` if the transaction isn't found, wasn't executed yet, or if it was /// executed successfully. /// Returns `Err` only if the request fails (e.g. due to network issues). - async fn failure_reason(&self, tx_hash: H256) -> Result, Error>; + async fn failure_reason(&self, tx_hash: H256) -> EnrichedClientResult>; /// Returns the transaction for the specified hash. - async fn get_tx(&self, hash: H256) -> Result, Error>; + async fn get_tx(&self, hash: H256) -> EnrichedClientResult>; /// Returns the receipt for the specified transaction hash. - async fn tx_receipt(&self, tx_hash: H256) -> Result, Error>; + async fn tx_receipt(&self, tx_hash: H256) -> EnrichedClientResult>; /// Returns the ETH balance of the specified token for the specified address. - async fn eth_balance(&self, address: Address) -> Result; + async fn eth_balance(&self, address: Address) -> EnrichedClientResult; /// Invokes a function on a contract specified by `contract_address` / `contract_abi` using /// `eth_call`. @@ -136,13 +139,13 @@ pub trait EthInterface: Sync + Send { &self, request: web3::CallRequest, block: Option, - ) -> Result; + ) -> EnrichedClientResult; /// Returns the logs for the specified filter. - async fn logs(&self, filter: Filter) -> Result, Error>; + async fn logs(&self, filter: &Filter) -> EnrichedClientResult>; /// Returns the block header for the specified block number or hash. - async fn block(&self, block_id: BlockId) -> Result>, Error>; + async fn block(&self, block_id: BlockId) -> EnrichedClientResult>>; } /// An extension of `EthInterface` trait, which is used to perform queries that are bound to @@ -190,7 +193,7 @@ pub trait BoundEthInterface: AsRef> + 'static + Sync + Send + fmt: token_address: Address, address: Address, erc20_abi: ðabi::Contract, - ) -> Result; + ) -> Result; /// Signs the transaction and sends it to the Ethereum network. /// Expected to use credentials associated with `Self::sender_account()`. @@ -199,7 +202,7 @@ pub trait BoundEthInterface: AsRef> + 'static + Sync + Send + fmt: data: Vec, contract_addr: H160, options: Options, - ) -> Result; + ) -> Result; } impl Clone for Box { @@ -210,19 +213,19 @@ impl Clone for Box { impl dyn BoundEthInterface { /// Returns the nonce of the `Self::sender_account()` at the specified block. - pub async fn nonce_at(&self, block: BlockNumber) -> Result { + pub async fn nonce_at(&self, block: BlockNumber) -> EnrichedClientResult { self.as_ref() .nonce_at_for_account(self.sender_account(), block) .await } /// Returns the current nonce of the `Self::sender_account()`. - pub async fn current_nonce(&self) -> Result { + pub async fn current_nonce(&self) -> EnrichedClientResult { self.nonce_at(BlockNumber::Latest).await } /// Returns the pending nonce of the `Self::sender_account()`. - pub async fn pending_nonce(&self) -> Result { + pub async fn pending_nonce(&self) -> EnrichedClientResult { self.nonce_at(BlockNumber::Pending).await } @@ -232,13 +235,13 @@ impl dyn BoundEthInterface { &self, data: Vec, options: Options, - ) -> Result { + ) -> Result { self.sign_prepared_tx_for_addr(data, self.contract_addr(), options) .await } /// Returns the ETH balance of `Self::sender_account()`. - pub async fn sender_eth_balance(&self) -> Result { + pub async fn sender_eth_balance(&self) -> EnrichedClientResult { self.as_ref().eth_balance(self.sender_account()).await } diff --git a/core/lib/eth_client/src/types.rs b/core/lib/eth_client/src/types.rs index b82ec8232560..03da27e43851 100644 --- a/core/lib/eth_client/src/types.rs +++ b/core/lib/eth_client/src/types.rs @@ -79,18 +79,21 @@ impl ContractCall<'_> { &self.inner.params } - pub async fn call(&self, client: &DynClient) -> Result { + pub async fn call( + &self, + client: &DynClient, + ) -> Result { let func = self .contract_abi .function(&self.inner.name) - .map_err(ContractError::Function)?; - let encoded_input = - func.encode_input(&self.inner.params) - .map_err(|source| ContractError::EncodeInput { - signature: func.signature(), - input: self.inner.params.clone(), - source, - })?; + .map_err(ContractCallError::Function)?; + let encoded_input = func.encode_input(&self.inner.params).map_err(|source| { + ContractCallError::EncodeInput { + signature: func.signature(), + input: self.inner.params.clone(), + source, + } + })?; let request = web3::CallRequest { from: self.inner.from, @@ -110,25 +113,28 @@ impl ContractCall<'_> { .call_contract_function(request, self.inner.block) .await?; let output_tokens = func.decode_output(&encoded_output.0).map_err(|source| { - ContractError::DecodeOutput { + ContractCallError::DecodeOutput { signature: func.signature(), output: encoded_output, source, } })?; - Ok(Res::from_tokens(output_tokens.clone()).map_err(|source| { - ContractError::DetokenizeOutput { + Res::from_tokens(output_tokens.clone()).map_err(|source| { + ContractCallError::DetokenizeOutput { signature: func.signature(), output: output_tokens, source, } - })?) + }) } } /// Contract-related subset of Ethereum client errors. #[derive(Debug, thiserror::Error)] -pub enum ContractError { +pub enum ContractCallError { + /// Problem on the Ethereum client side (e.g. bad RPC call, network issues). + #[error("Request to ethereum gateway failed: {0}")] + EthereumGateway(#[from] EnrichedClientError), /// Failed resolving a function specified for the contract call in the contract ABI. #[error("failed resolving contract function: {0}")] Function(#[source] ethabi::Error), @@ -158,15 +164,12 @@ pub enum ContractError { }, } -/// Common error type exposed by the crate, +/// Common error type exposed by the crate. #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum SigningError { /// Problem on the Ethereum client side (e.g. bad RPC call, network issues). #[error("Request to ethereum gateway failed: {0}")] EthereumGateway(#[from] EnrichedClientError), - /// Problem with a contract call. - #[error("Call to contract failed: {0}")] - Contract(#[from] ContractError), /// Problem with transaction signer. #[error("Transaction signing failed: {0}")] Signer(#[from] zksync_eth_signer::SignerError), diff --git a/core/lib/multivm/src/versions/vm_1_3_2/transaction_data.rs b/core/lib/multivm/src/versions/vm_1_3_2/transaction_data.rs index 9be2597cf1d3..1c5911c15416 100644 --- a/core/lib/multivm/src/versions/vm_1_3_2/transaction_data.rs +++ b/core/lib/multivm/src/versions/vm_1_3_2/transaction_data.rs @@ -424,9 +424,9 @@ pub fn get_amortized_overhead( // properly maintained, since the pubdata is not published. If decided to use the pubdata // overhead, it needs to be updated. ``` // 3. ceil(O3 * overhead_for_block_gas) >= overhead_gas - // O3 = max_pubdata_in_tx / MAX_PUBDATA_PER_BLOCK = ceil(gas_limit / gas_per_pubdata_byte_limit) / MAX_PUBDATA_PER_BLOCK - // >= (gas_limit / (gas_per_pubdata_byte_limit * MAX_PUBDATA_PER_BLOCK). - // ``` + // O3 = max_pubdata_in_tx / MAX_PUBDATA_PER_BLOCK = ceil(gas_limit / gas_per_pubdata_byte_limit) + // / MAX_PUBDATA_PER_BLOCK >= (gas_limit / (gas_per_pubdata_byte_limit * + // MAX_PUBDATA_PER_BLOCK). ``` // Throwing off the `ceil`, while may provide marginally lower // overhead to the operator, provides substantially easier formula to work with. // diff --git a/core/lib/multivm/src/versions/vm_m6/transaction_data.rs b/core/lib/multivm/src/versions/vm_m6/transaction_data.rs index d3c44dd3daca..e3734130efe4 100644 --- a/core/lib/multivm/src/versions/vm_m6/transaction_data.rs +++ b/core/lib/multivm/src/versions/vm_m6/transaction_data.rs @@ -424,9 +424,9 @@ pub fn get_amortized_overhead( // properly maintained, since the pubdata is not published. If decided to use the pubdata // overhead, it needs to be updated. ``` // 3. ceil(O3 * overhead_for_block_gas) >= overhead_gas - // O3 = max_pubdata_in_tx / MAX_PUBDATA_PER_BLOCK = ceil(gas_limit / gas_per_pubdata_byte_limit) / MAX_PUBDATA_PER_BLOCK - // >= (gas_limit / (gas_per_pubdata_byte_limit * MAX_PUBDATA_PER_BLOCK). - // ``` + // O3 = max_pubdata_in_tx / MAX_PUBDATA_PER_BLOCK = ceil(gas_limit / gas_per_pubdata_byte_limit) + // / MAX_PUBDATA_PER_BLOCK >= (gas_limit / (gas_per_pubdata_byte_limit * + // MAX_PUBDATA_PER_BLOCK). ``` // Throwing off the `ceil`, while may provide marginally lower // overhead to the operator, provides substantially easier formula to work with. // @@ -438,16 +438,17 @@ pub fn get_amortized_overhead( // OB * (TL - OE) > (OE - 1) * EP * MP // OB * TL + EP * MP > OE * EP * MP + OE * OB // (OB * TL + EP * MP) / (EP * MP + OB) > OE - // OE = floor((OB * TL + EP * MP) / (EP * MP + OB)) with possible -1 if the division is without remainder - // let overhead_for_pubdata = { + // OE = floor((OB * TL + EP * MP) / (EP * MP + OB)) with possible -1 if the division is without + // remainder let overhead_for_pubdata = { // let numerator: U256 = overhead_for_block_gas * total_gas_limit // + gas_per_pubdata_byte_limit * U256::from(MAX_PUBDATA_PER_BLOCK); // let denominator = - // gas_per_pubdata_byte_limit * U256::from(MAX_PUBDATA_PER_BLOCK) + overhead_for_block_gas; + // gas_per_pubdata_byte_limit * U256::from(MAX_PUBDATA_PER_BLOCK) + + // overhead_for_block_gas; // // // Corner case: if `total_gas_limit` = `gas_per_pubdata_byte_limit` = 0 - // // then the numerator will be 0 and subtracting 1 will cause a panic, so we just return a zero. - // if numerator.is_zero() { + // // then the numerator will be 0 and subtracting 1 will cause a panic, so we just return a + // zero. if numerator.is_zero() { // 0.into() // } else { // (numerator - 1) / denominator @@ -461,8 +462,8 @@ pub fn get_amortized_overhead( // OB * (TL - OE) / MAX_TX_ERGS_LIMIT > (OE/K) - 1 // OB * (TL - OE) > (OE/K) * MAX_TX_ERGS_LIMIT - MAX_TX_ERGS_LIMIT // OB * TL + MAX_TX_ERGS_LIMIT > OE * ( MAX_TX_ERGS_LIMIT/K + OB) - // OE = floor(OB * TL + MAX_TX_ERGS_LIMIT / (MAX_TX_ERGS_LIMIT/K + OB)), with possible -1 if the division is without remainder - // ``` + // OE = floor(OB * TL + MAX_TX_ERGS_LIMIT / (MAX_TX_ERGS_LIMIT/K + OB)), with possible -1 if the + // division is without remainder ``` let overhead_for_gas = { let numerator = overhead_for_block_gas * total_gas_limit + U256::from(MAX_TX_ERGS_LIMIT); let denominator: U256 = U256::from( diff --git a/core/lib/multivm/src/versions/vm_refunds_enhancement/utils/overhead.rs b/core/lib/multivm/src/versions/vm_refunds_enhancement/utils/overhead.rs index 52359e46ca25..eb1177d8bb52 100644 --- a/core/lib/multivm/src/versions/vm_refunds_enhancement/utils/overhead.rs +++ b/core/lib/multivm/src/versions/vm_refunds_enhancement/utils/overhead.rs @@ -191,9 +191,9 @@ pub(crate) fn get_amortized_overhead( // properly maintained, since the pubdata is not published. If decided to use the pubdata // overhead, it needs to be updated. ``` // 3. ceil(O3 * overhead_for_block_gas) >= overhead_gas` - // O3 = max_pubdata_in_tx / MAX_PUBDATA_PER_BLOCK = ceil(gas_limit / gas_per_pubdata_byte_limit) / MAX_PUBDATA_PER_BLOCK` - // >= (gas_limit / (gas_per_pubdata_byte_limit * MAX_PUBDATA_PER_BLOCK). - // ``` + // O3 = max_pubdata_in_tx / MAX_PUBDATA_PER_BLOCK = ceil(gas_limit / gas_per_pubdata_byte_limit) + // / MAX_PUBDATA_PER_BLOCK` >= (gas_limit / (gas_per_pubdata_byte_limit * + // MAX_PUBDATA_PER_BLOCK). ``` // Throwing off the `ceil`, while may provide marginally lower // overhead to the operator, provides substantially easier formula to work with. // @@ -205,16 +205,17 @@ pub(crate) fn get_amortized_overhead( // OB * (TL - OE) > (OE - 1) * EP * MP // OB * TL + EP * MP > OE * EP * MP + OE * OB // (OB * TL + EP * MP) / (EP * MP + OB) > OE - // OE = floor((OB * TL + EP * MP) / (EP * MP + OB)) with possible -1 if the division is without remainder - // let overhead_for_pubdata = { + // OE = floor((OB * TL + EP * MP) / (EP * MP + OB)) with possible -1 if the division is without + // remainder let overhead_for_pubdata = { // let numerator: U256 = overhead_for_block_gas * total_gas_limit // + gas_per_pubdata_byte_limit * U256::from(MAX_PUBDATA_PER_BLOCK); // let denominator = - // gas_per_pubdata_byte_limit * U256::from(MAX_PUBDATA_PER_BLOCK) + overhead_for_block_gas; + // gas_per_pubdata_byte_limit * U256::from(MAX_PUBDATA_PER_BLOCK) + + // overhead_for_block_gas; // // // Corner case: if `total_gas_limit` = `gas_per_pubdata_byte_limit` = 0 - // // then the numerator will be 0 and subtracting 1 will cause a panic, so we just return a zero. - // if numerator.is_zero() { + // // then the numerator will be 0 and subtracting 1 will cause a panic, so we just return a + // zero. if numerator.is_zero() { // 0.into() // } else { // (numerator - 1) / denominator @@ -228,8 +229,8 @@ pub(crate) fn get_amortized_overhead( // OB * (TL - OE) / MAX_TX_ERGS_LIMIT > (OE/K) - 1 // OB * (TL - OE) > (OE/K) * MAX_TX_ERGS_LIMIT - MAX_TX_ERGS_LIMIT // OB * TL + MAX_TX_ERGS_LIMIT > OE * ( MAX_TX_ERGS_LIMIT/K + OB) - // OE = floor(OB * TL + MAX_TX_ERGS_LIMIT / (MAX_TX_ERGS_LIMIT/K + OB)), with possible -1 if the division is without remainder - // ``` + // OE = floor(OB * TL + MAX_TX_ERGS_LIMIT / (MAX_TX_ERGS_LIMIT/K + OB)), with possible -1 if the + // division is without remainder ``` let overhead_for_gas = { let numerator = overhead_for_block_gas * total_gas_limit + U256::from(MAX_TX_ERGS_LIMIT); let denominator: U256 = U256::from( diff --git a/core/lib/multivm/src/versions/vm_virtual_blocks/utils/overhead.rs b/core/lib/multivm/src/versions/vm_virtual_blocks/utils/overhead.rs index eed935983455..876d60b84931 100644 --- a/core/lib/multivm/src/versions/vm_virtual_blocks/utils/overhead.rs +++ b/core/lib/multivm/src/versions/vm_virtual_blocks/utils/overhead.rs @@ -191,9 +191,9 @@ pub(crate) fn get_amortized_overhead( // properly maintained, since the pubdata is not published. If decided to use the pubdata // overhead, it needs to be updated. ``` // 3. ceil(O3 * overhead_for_block_gas) >= overhead_gas - // O3 = max_pubdata_in_tx / MAX_PUBDATA_PER_BLOCK = ceil(gas_limit / gas_per_pubdata_byte_limit) / MAX_PUBDATA_PER_BLOCK - // >= (gas_limit / (gas_per_pubdata_byte_limit * MAX_PUBDATA_PER_BLOCK). - // ``` + // O3 = max_pubdata_in_tx / MAX_PUBDATA_PER_BLOCK = ceil(gas_limit / gas_per_pubdata_byte_limit) + // / MAX_PUBDATA_PER_BLOCK >= (gas_limit / (gas_per_pubdata_byte_limit * + // MAX_PUBDATA_PER_BLOCK). ``` // Throwing off the `ceil`, while may provide marginally lower // overhead to the operator, provides substantially easier formula to work with. // diff --git a/core/node/commitment_generator/src/validation_task.rs b/core/node/commitment_generator/src/validation_task.rs index 2ee2f36537a7..fe2c29495f97 100644 --- a/core/node/commitment_generator/src/validation_task.rs +++ b/core/node/commitment_generator/src/validation_task.rs @@ -3,7 +3,7 @@ use std::time::Duration; use tokio::sync::watch; use zksync_eth_client::{ clients::{DynClient, L1}, - CallFunctionArgs, ClientError, Error as EthClientError, + CallFunctionArgs, ClientError, ContractCallError, }; use zksync_types::{commitment::L1BatchCommitmentMode, Address}; @@ -67,7 +67,7 @@ impl L1BatchCommitmentModeValidationTask { // This case is accepted for backwards compatibility with older contracts, but emits // a warning in case the wrong contract address was passed by the // caller. - Err(EthClientError::EthereumGateway(err)) + Err(ContractCallError::EthereumGateway(err)) if matches!(err.as_ref(), ClientError::Call(_)) => { tracing::warn!( @@ -76,7 +76,7 @@ impl L1BatchCommitmentModeValidationTask { return Ok(()); } - Err(EthClientError::EthereumGateway(err)) if err.is_transient() => { + Err(ContractCallError::EthereumGateway(err)) if err.is_transient() => { tracing::warn!( "Transient error validating commitment mode, will retry after {:?}: {err}", self.retry_interval @@ -95,7 +95,7 @@ impl L1BatchCommitmentModeValidationTask { async fn get_pubdata_pricing_mode( diamond_proxy_address: Address, eth_client: &DynClient, - ) -> Result { + ) -> Result { CallFunctionArgs::new("getPubdataPricingMode", ()) .for_contract( diamond_proxy_address, diff --git a/core/node/consistency_checker/src/lib.rs b/core/node/consistency_checker/src/lib.rs index c3c6ea59e212..c2f753444d93 100644 --- a/core/node/consistency_checker/src/lib.rs +++ b/core/node/consistency_checker/src/lib.rs @@ -7,7 +7,7 @@ use zksync_contracts::PRE_BOOJUM_COMMIT_FUNCTION; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_eth_client::{ clients::{DynClient, L1}, - CallFunctionArgs, Error as L1ClientError, EthInterface, + CallFunctionArgs, ContractCallError, EnrichedClientError, EthInterface, }; use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_l1_contract_interface::{ @@ -29,7 +29,9 @@ mod tests; #[derive(Debug, thiserror::Error)] enum CheckError { #[error("Web3 error communicating with L1")] - Web3(#[from] L1ClientError), + Web3(#[from] EnrichedClientError), + #[error("error calling L1 contract")] + ContractCall(#[from] ContractCallError), /// Error that is caused by the main node providing incorrect information etc. #[error("failed validating commit transaction")] Validation(anyhow::Error), @@ -43,7 +45,7 @@ impl CheckError { fn is_transient(&self) -> bool { matches!( self, - Self::Web3(L1ClientError::EthereumGateway(err)) if err.is_transient() + Self::Web3(err) if err.is_transient() ) } } diff --git a/core/node/eth_sender/src/error.rs b/core/node/eth_sender/src/error.rs index 206bbf2d583a..61d92bcbe132 100644 --- a/core/node/eth_sender/src/error.rs +++ b/core/node/eth_sender/src/error.rs @@ -1,9 +1,12 @@ +use zksync_eth_client::{ContractCallError, EnrichedClientError}; use zksync_types::web3::contract; #[derive(Debug, thiserror::Error)] -pub enum ETHSenderError { - #[error("Ethereum gateway Error {0}")] - EthereumGateWayError(#[from] zksync_eth_client::Error), - #[error("Token parsing Error: {0}")] - ParseError(#[from] contract::Error), +pub enum EthSenderError { + #[error("Ethereum gateway error: {0}")] + EthereumGateway(#[from] EnrichedClientError), + #[error("Contract call error: {0}")] + ContractCall(#[from] ContractCallError), + #[error("Token parsing error: {0}")] + Parse(#[from] contract::Error), } diff --git a/core/node/eth_sender/src/eth_tx_aggregator.rs b/core/node/eth_sender/src/eth_tx_aggregator.rs index 926490cbe013..0d29d0ed1f54 100644 --- a/core/node/eth_sender/src/eth_tx_aggregator.rs +++ b/core/node/eth_sender/src/eth_tx_aggregator.rs @@ -29,7 +29,7 @@ use crate::{ metrics::{PubdataKind, METRICS}, utils::agg_l1_batch_base_cost, zksync_functions::ZkSyncFunctions, - Aggregator, ETHSenderError, + Aggregator, EthSenderError, }; /// Data queried from L1 using multicall contract. @@ -134,7 +134,7 @@ impl EthTxAggregator { Ok(()) } - pub(super) async fn get_multicall_data(&mut self) -> Result { + pub(super) async fn get_multicall_data(&mut self) -> Result { let calldata = self.generate_calldata_for_multicall(); let args = CallFunctionArgs::new(&self.functions.aggregate3.name, calldata).for_contract( self.l1_multicall3_address, @@ -222,14 +222,11 @@ impl EthTxAggregator { pub(super) fn parse_multicall_data( &self, token: Token, - ) -> Result { + ) -> Result { let parse_error = |tokens: &[Token]| { - Err(ETHSenderError::ParseError( - Web3ContractError::InvalidOutputType(format!( - "Failed to parse multicall token: {:?}", - tokens - )), - )) + Err(EthSenderError::Parse(Web3ContractError::InvalidOutputType( + format!("Failed to parse multicall token: {:?}", tokens), + ))) }; if let Token::Array(call_results) = token { @@ -243,24 +240,24 @@ impl EthTxAggregator { Multicall3Result::from_token(call_results_iterator.next().unwrap())?.return_data; if multicall3_bootloader.len() != 32 { - return Err(ETHSenderError::ParseError( - Web3ContractError::InvalidOutputType(format!( + return Err(EthSenderError::Parse(Web3ContractError::InvalidOutputType( + format!( "multicall3 bootloader hash data is not of the len of 32: {:?}", multicall3_bootloader - )), - )); + ), + ))); } let bootloader = H256::from_slice(&multicall3_bootloader); let multicall3_default_aa = Multicall3Result::from_token(call_results_iterator.next().unwrap())?.return_data; if multicall3_default_aa.len() != 32 { - return Err(ETHSenderError::ParseError( - Web3ContractError::InvalidOutputType(format!( + return Err(EthSenderError::Parse(Web3ContractError::InvalidOutputType( + format!( "multicall3 default aa hash data is not of the len of 32: {:?}", multicall3_default_aa - )), - )); + ), + ))); } let default_aa = H256::from_slice(&multicall3_default_aa); let base_system_contracts_hashes = BaseSystemContractsHashes { @@ -271,12 +268,12 @@ impl EthTxAggregator { let multicall3_verifier_params = Multicall3Result::from_token(call_results_iterator.next().unwrap())?.return_data; if multicall3_verifier_params.len() != 96 { - return Err(ETHSenderError::ParseError( - Web3ContractError::InvalidOutputType(format!( + return Err(EthSenderError::Parse(Web3ContractError::InvalidOutputType( + format!( "multicall3 verifier params data is not of the len of 96: {:?}", multicall3_default_aa - )), - )); + ), + ))); } let recursion_node_level_vk_hash = H256::from_slice(&multicall3_verifier_params[..32]); let recursion_leaf_level_vk_hash = @@ -292,24 +289,24 @@ impl EthTxAggregator { let multicall3_verifier_address = Multicall3Result::from_token(call_results_iterator.next().unwrap())?.return_data; if multicall3_verifier_address.len() != 32 { - return Err(ETHSenderError::ParseError( - Web3ContractError::InvalidOutputType(format!( + return Err(EthSenderError::Parse(Web3ContractError::InvalidOutputType( + format!( "multicall3 verifier address data is not of the len of 32: {:?}", multicall3_verifier_address - )), - )); + ), + ))); } let verifier_address = Address::from_slice(&multicall3_verifier_address[12..]); let multicall3_protocol_version = Multicall3Result::from_token(call_results_iterator.next().unwrap())?.return_data; if multicall3_protocol_version.len() != 32 { - return Err(ETHSenderError::ParseError( - Web3ContractError::InvalidOutputType(format!( + return Err(EthSenderError::Parse(Web3ContractError::InvalidOutputType( + format!( "multicall3 protocol version data is not of the len of 32: {:?}", multicall3_protocol_version - )), - )); + ), + ))); } let protocol_version = U256::from_big_endian(&multicall3_protocol_version); @@ -336,7 +333,7 @@ impl EthTxAggregator { async fn get_recursion_scheduler_level_vk_hash( &mut self, verifier_address: Address, - ) -> Result { + ) -> Result { let get_vk_hash = &self.functions.verification_key_hash; let vk_hash: H256 = CallFunctionArgs::new(&get_vk_hash.name, ()) .for_contract(verifier_address, &self.functions.verifier_contract) @@ -349,7 +346,7 @@ impl EthTxAggregator { async fn loop_iteration( &mut self, storage: &mut Connection<'_, Core>, - ) -> Result<(), ETHSenderError> { + ) -> Result<(), EthSenderError> { let MulticallData { base_system_contracts_hashes, verifier_params, @@ -549,7 +546,7 @@ impl EthTxAggregator { storage: &mut Connection<'_, Core>, aggregated_op: &AggregatedOperation, contracts_are_pre_shared_bridge: bool, - ) -> Result { + ) -> Result { let mut transaction = storage.start_transaction().await.unwrap(); let op_type = aggregated_op.get_action_type(); // We may be using a custom sender for commit transactions, so use this @@ -598,7 +595,7 @@ impl EthTxAggregator { &self, storage: &mut Connection<'_, Core>, from_addr: Option
, - ) -> Result { + ) -> Result { let db_nonce = storage .eth_sender_dal() .get_next_nonce(from_addr) diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index d33dff49f344..c3a39a6058db 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -6,8 +6,8 @@ use zksync_config::configs::eth_sender::SenderConfig; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_eth_client::{ clients::{DynClient, L1}, - encode_blob_tx_with_sidecar, BoundEthInterface, ClientError, EnrichedClientError, Error, - EthInterface, ExecutedTxStatus, Options, RawTransactionBytes, SignedCallResult, + encode_blob_tx_with_sidecar, BoundEthInterface, ClientError, EnrichedClientError, EthInterface, + ExecutedTxStatus, Options, RawTransactionBytes, SignedCallResult, }; use zksync_node_fee_model::l1_gas_price::L1TxParamsProvider; use zksync_shared_metrics::BlockL1Stage; @@ -19,7 +19,7 @@ use zksync_types::{ }; use zksync_utils::time::seconds_since_epoch; -use super::{metrics::METRICS, ETHSenderError}; +use super::{metrics::METRICS, EthSenderError}; #[derive(Debug)] struct EthFee { @@ -85,7 +85,7 @@ impl EthTxManager { async fn get_tx_status( &self, tx_hash: H256, - ) -> Result, ETHSenderError> { + ) -> Result, EthSenderError> { self.query_client() .get_tx_status(tx_hash) .await @@ -125,7 +125,7 @@ impl EthTxManager { storage: &mut Connection<'_, Core>, tx: &EthTx, time_in_mempool: u32, - ) -> Result { + ) -> Result { let base_fee_per_gas = self.gas_adjuster.get_base_fee(0); let priority_fee_per_gas = self.gas_adjuster.get_priority_fee(); let blob_base_fee_per_gas = Some(self.gas_adjuster.get_blob_base_fee()); @@ -199,7 +199,7 @@ impl EthTxManager { storage: &mut Connection<'_, Core>, eth_tx_id: u32, base_fee_per_gas: u64, - ) -> Result { + ) -> Result { let previous_sent_tx = storage .eth_sender_dal() .get_last_sent_eth_tx(eth_tx_id) @@ -227,7 +227,7 @@ impl EthTxManager { .with_arg("base_fee_per_gas", &base_fee_per_gas) .with_arg("previous_base_fee", &previous_base_fee) .with_arg("next_block_minimal_base_fee", &next_block_minimal_base_fee); - return Err(ETHSenderError::from(Error::EthereumGateway(err))); + return Err(err.into()); } // Increase `priority_fee_per_gas` by at least 20% to prevent "replacement transaction @@ -242,7 +242,7 @@ impl EthTxManager { tx: &EthTx, time_in_mempool: u32, current_block: L1BlockNumber, - ) -> Result { + ) -> Result { let EthFee { base_fee_per_gas, priority_fee_per_gas, @@ -310,7 +310,7 @@ impl EthTxManager { tx_history_id: u32, raw_tx: RawTransactionBytes, current_block: L1BlockNumber, - ) -> Result { + ) -> Result { match self.query_client().send_raw_tx(raw_tx).await { Ok(tx_hash) => { storage @@ -334,7 +334,7 @@ impl EthTxManager { async fn get_operator_nonce( &self, block_numbers: L1BlockNumbers, - ) -> Result { + ) -> Result { let finalized = self .ethereum_gateway .nonce_at(block_numbers.finalized.0.into()) @@ -354,7 +354,7 @@ impl EthTxManager { async fn get_blobs_operator_nonce( &self, block_numbers: L1BlockNumbers, - ) -> Result, ETHSenderError> { + ) -> Result, EthSenderError> { match &self.ethereum_gateway_blobs { None => Ok(None), Some(gateway) => { @@ -374,7 +374,7 @@ impl EthTxManager { } } - async fn get_l1_block_numbers(&self) -> Result { + async fn get_l1_block_numbers(&self) -> Result { let (finalized, safe) = if let Some(confirmations) = self.config.wait_confirmations { let latest_block_number = self.query_client().block_number().await?.as_u64(); @@ -418,7 +418,7 @@ impl EthTxManager { &mut self, storage: &mut Connection<'_, Core>, l1_block_numbers: L1BlockNumbers, - ) -> Result, ETHSenderError> { + ) -> Result, EthSenderError> { METRICS.track_block_numbers(&l1_block_numbers); let operator_nonce = self.get_operator_nonce(l1_block_numbers).await?; let blobs_operator_nonce = self.get_blobs_operator_nonce(l1_block_numbers).await?; @@ -458,7 +458,7 @@ impl EthTxManager { l1_block_numbers: L1BlockNumbers, operator_nonce: OperatorNonce, operator_address: Option
, - ) -> Result, ETHSenderError> { + ) -> Result, EthSenderError> { let inflight_txs = storage.eth_sender_dal().get_inflight_txs().await.unwrap(); METRICS.number_of_inflight_txs.set(inflight_txs.len()); @@ -799,7 +799,7 @@ impl EthTxManager { &mut self, storage: &mut Connection<'_, Core>, previous_block: L1BlockNumber, - ) -> Result { + ) -> Result { let l1_block_numbers = self.get_l1_block_numbers().await?; self.send_new_eth_txs(storage, l1_block_numbers.latest) diff --git a/core/node/eth_sender/src/lib.rs b/core/node/eth_sender/src/lib.rs index c0a4a892e52a..3ae29a520030 100644 --- a/core/node/eth_sender/src/lib.rs +++ b/core/node/eth_sender/src/lib.rs @@ -12,6 +12,6 @@ mod zksync_functions; mod tests; pub use self::{ - aggregator::Aggregator, error::ETHSenderError, eth_tx_aggregator::EthTxAggregator, + aggregator::Aggregator, error::EthSenderError, eth_tx_aggregator::EthTxAggregator, eth_tx_manager::EthTxManager, }; diff --git a/core/node/eth_sender/src/tests.rs b/core/node/eth_sender/src/tests.rs index 78b213e05473..2abc7ea757dc 100644 --- a/core/node/eth_sender/src/tests.rs +++ b/core/node/eth_sender/src/tests.rs @@ -29,7 +29,7 @@ use zksync_types::{ use crate::{ aggregated_operations::AggregatedOperation, eth_tx_manager::L1BlockNumbers, Aggregator, - ETHSenderError, EthTxAggregator, EthTxManager, + EthSenderError, EthTxAggregator, EthTxManager, }; // Alias to conveniently call static methods of `ETHSender`. @@ -1106,7 +1106,7 @@ async fn test_parse_multicall_data(commitment_mode: L1BatchCommitmentMode) { tester .aggregator .parse_multicall_data(wrong_data_instance.clone()), - Err(ETHSenderError::ParseError(Error::InvalidOutputType(_))) + Err(EthSenderError::Parse(Error::InvalidOutputType(_))) ); } } diff --git a/core/node/eth_watch/src/client.rs b/core/node/eth_watch/src/client.rs index cc41c77ac1d3..920e5e91ccd2 100644 --- a/core/node/eth_watch/src/client.rs +++ b/core/node/eth_watch/src/client.rs @@ -1,10 +1,10 @@ use std::fmt; use zksync_contracts::verifier_contract; -pub(super) use zksync_eth_client::Error as EthClientError; use zksync_eth_client::{ clients::{DynClient, L1}, - CallFunctionArgs, ClientError, EnrichedClientError, EthInterface, + CallFunctionArgs, ClientError, ContractCallError, EnrichedClientError, EnrichedClientResult, + EthInterface, }; use zksync_types::{ ethabi::Contract, @@ -21,11 +21,12 @@ pub trait EthClient: 'static + fmt::Debug + Send + Sync { from: BlockNumber, to: BlockNumber, retries_left: usize, - ) -> Result, EthClientError>; + ) -> EnrichedClientResult>; /// Returns finalized L1 block number. - async fn finalized_block_number(&self) -> Result; + async fn finalized_block_number(&self) -> EnrichedClientResult; /// Returns scheduler verification key hash by verifier address. - async fn scheduler_vk_hash(&self, verifier_address: Address) -> Result; + async fn scheduler_vk_hash(&self, verifier_address: Address) + -> Result; /// Sets list of topics to return events for. fn set_topics(&mut self, topics: Vec); } @@ -76,7 +77,7 @@ impl EthHttpQueryClient { from: BlockNumber, to: BlockNumber, topics: Vec, - ) -> Result, EthClientError> { + ) -> EnrichedClientResult> { let filter = FilterBuilder::default() .address( [ @@ -92,13 +93,16 @@ impl EthHttpQueryClient { .to_block(to) .topics(Some(topics), None, None, None) .build(); - self.client.logs(filter).await + self.client.logs(&filter).await } } #[async_trait::async_trait] impl EthClient for EthHttpQueryClient { - async fn scheduler_vk_hash(&self, verifier_address: Address) -> Result { + async fn scheduler_vk_hash( + &self, + verifier_address: Address, + ) -> Result { // New verifier returns the hash of the verification key. CallFunctionArgs::new("verificationKeyHash", ()) .for_contract(verifier_address, &self.verifier_contract_abi) @@ -111,13 +115,13 @@ impl EthClient for EthHttpQueryClient { from: BlockNumber, to: BlockNumber, retries_left: usize, - ) -> Result, EthClientError> { + ) -> EnrichedClientResult> { let mut result = self.get_filter_logs(from, to, self.topics.clone()).await; // This code is compatible with both Infura and Alchemy API providers. // Note: we don't handle rate-limits here - assumption is that we're never going to hit // them. - if let Err(EthClientError::EthereumGateway(err)) = &result { + if let Err(err) = &result { tracing::warn!("Provider returned error message: {err}"); let err_message = err.as_ref().to_string(); let err_code = if let ClientError::Call(err) = err.as_ref() { @@ -184,7 +188,7 @@ impl EthClient for EthHttpQueryClient { result } - async fn finalized_block_number(&self) -> Result { + async fn finalized_block_number(&self) -> EnrichedClientResult { if let Some(confirmations) = self.confirmations_for_eth_event { let latest_block_number = self.client.block_number().await?.as_u64(); Ok(latest_block_number.saturating_sub(confirmations)) diff --git a/core/node/eth_watch/src/event_processors/mod.rs b/core/node/eth_watch/src/event_processors/mod.rs index 52030a6bf9d5..1ebef6f3f097 100644 --- a/core/node/eth_watch/src/event_processors/mod.rs +++ b/core/node/eth_watch/src/event_processors/mod.rs @@ -1,12 +1,13 @@ use std::fmt; use zksync_dal::{Connection, Core}; +use zksync_eth_client::{ContractCallError, EnrichedClientError}; use zksync_types::{web3::Log, H256}; pub(crate) use self::{ governance_upgrades::GovernanceUpgradesEventProcessor, priority_ops::PriorityOpsEventProcessor, }; -use crate::client::{EthClient, EthClientError}; +use crate::client::EthClient; mod governance_upgrades; mod priority_ops; @@ -21,7 +22,9 @@ pub(super) enum EventProcessorError { source: anyhow::Error, }, #[error("Eth client error: {0}")] - Client(#[from] EthClientError), + Client(#[from] EnrichedClientError), + #[error("Contract call error: {0}")] + ContractCall(#[from] ContractCallError), /// Internal errors are considered fatal (i.e., they bubble up and lead to the watcher /// termination). #[error("internal processing error: {0:?}")] diff --git a/core/node/eth_watch/src/tests.rs b/core/node/eth_watch/src/tests.rs index bbdc138070b0..539e9e938c2b 100644 --- a/core/node/eth_watch/src/tests.rs +++ b/core/node/eth_watch/src/tests.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, convert::TryInto, sync::Arc}; use tokio::sync::RwLock; use zksync_contracts::{governance_contract, hyperchain_contract}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_eth_client::{ContractCallError, EnrichedClientResult}; use zksync_types::{ ethabi::{encode, Hash, Token}, l1::{L1Tx, OpProcessingType, PriorityQueueType}, @@ -13,10 +14,7 @@ use zksync_types::{ ProtocolVersionId, Transaction, H256, U256, }; -use crate::{ - client::{EthClient, EthClientError}, - EthWatch, -}; +use crate::{client::EthClient, EthWatch}; #[derive(Debug)] struct FakeEthClientData { @@ -106,7 +104,7 @@ impl EthClient for MockEthClient { from: BlockNumber, to: BlockNumber, _retries_left: usize, - ) -> Result, EthClientError> { + ) -> EnrichedClientResult> { let from = self.block_to_number(from).await; let to = self.block_to_number(to).await; let mut logs = vec![]; @@ -126,11 +124,14 @@ impl EthClient for MockEthClient { fn set_topics(&mut self, _topics: Vec) {} - async fn scheduler_vk_hash(&self, _verifier_address: Address) -> Result { + async fn scheduler_vk_hash( + &self, + _verifier_address: Address, + ) -> Result { Ok(H256::zero()) } - async fn finalized_block_number(&self) -> Result { + async fn finalized_block_number(&self) -> EnrichedClientResult { Ok(self.inner.read().await.last_finalized_block_number) } } diff --git a/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs b/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs index 5f0dbb43f051..d96fdd660ab6 100644 --- a/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs +++ b/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs @@ -8,7 +8,7 @@ use std::{ use tokio::sync::watch; use zksync_config::{configs::eth_sender::PubdataSendingMode, GasAdjusterConfig}; -use zksync_eth_client::{Error, EthInterface}; +use zksync_eth_client::EthInterface; use zksync_types::{commitment::L1BatchCommitmentMode, L1_GAS_PER_PUBDATA_BYTE, U256, U64}; use zksync_web3_decl::client::{DynClient, L1}; @@ -42,7 +42,7 @@ impl GasAdjuster { config: GasAdjusterConfig, pubdata_sending_mode: PubdataSendingMode, commitment_mode: L1BatchCommitmentMode, - ) -> Result { + ) -> anyhow::Result { let eth_client = eth_client.for_component("gas_adjuster"); // Subtracting 1 from the "latest" block number to prevent errors in case @@ -82,7 +82,7 @@ impl GasAdjuster { /// Performs an actualization routine for `GasAdjuster`. /// This method is intended to be invoked periodically. - pub async fn keep_updated(&self) -> Result<(), Error> { + pub async fn keep_updated(&self) -> anyhow::Result<()> { // Subtracting 1 from the "latest" block number to prevent errors in case // the info about the latest block is not yet present on the node. // This sometimes happens on Infura. @@ -239,7 +239,7 @@ impl GasAdjuster { async fn get_base_fees_history( eth_client: &DynClient, block_range: RangeInclusive, - ) -> Result<(Vec, Vec), Error> { + ) -> anyhow::Result<(Vec, Vec)> { let mut base_fee_history = Vec::new(); let mut blob_base_fee_history = Vec::new(); for block_number in block_range { diff --git a/core/node/genesis/src/lib.rs b/core/node/genesis/src/lib.rs index b33847c58b7b..73a26627ca6f 100644 --- a/core/node/genesis/src/lib.rs +++ b/core/node/genesis/src/lib.rs @@ -433,7 +433,7 @@ pub async fn save_set_chain_id_tx( .from_block(from.into()) .to_block(BlockNumber::Latest) .build(); - let mut logs = query_client.logs(filter).await?; + let mut logs = query_client.logs(&filter).await?; anyhow::ensure!( logs.len() == 1, "Expected a single set_chain_id event, got these {}: {:?}", diff --git a/core/node/node_sync/Cargo.toml b/core/node/node_sync/Cargo.toml index 248478abddf5..9fd0aad73094 100644 --- a/core/node/node_sync/Cargo.toml +++ b/core/node/node_sync/Cargo.toml @@ -38,4 +38,5 @@ thiserror.workspace = true zksync_node_test_utils.workspace = true assert_matches.workspace = true +once_cell.workspace = true test-casing.workspace = true diff --git a/core/node/node_sync/src/tree_data_fetcher/mod.rs b/core/node/node_sync/src/tree_data_fetcher/mod.rs index fed9bbe75265..a4bc9b3cb679 100644 --- a/core/node/node_sync/src/tree_data_fetcher/mod.rs +++ b/core/node/node_sync/src/tree_data_fetcher/mod.rs @@ -1,51 +1,32 @@ //! Fetcher responsible for getting Merkle tree outputs from the main node. -use std::{fmt, time::Duration}; +use std::time::Duration; use anyhow::Context as _; -use async_trait::async_trait; use serde::Serialize; #[cfg(test)] use tokio::sync::mpsc; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core, CoreDal, DalError}; use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; -use zksync_types::{api, block::L1BatchTreeData, L1BatchNumber}; +use zksync_types::{block::L1BatchTreeData, Address, L1BatchNumber}; use zksync_web3_decl::{ - client::{DynClient, L2}, - error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult}, - namespaces::ZksNamespaceClient, + client::{DynClient, L1, L2}, + error::EnrichedClientError, }; -use self::metrics::{ProcessingStage, TreeDataFetcherMetrics, METRICS}; +use self::{ + metrics::{ProcessingStage, TreeDataFetcherMetrics, METRICS}, + provider::{L1DataProvider, MissingData, TreeDataProvider}, +}; mod metrics; +mod provider; #[cfg(test)] mod tests; -#[async_trait] -trait MainNodeClient: fmt::Debug + Send + Sync + 'static { - async fn batch_details( - &self, - number: L1BatchNumber, - ) -> EnrichedClientResult>; -} - -#[async_trait] -impl MainNodeClient for Box> { - async fn batch_details( - &self, - number: L1BatchNumber, - ) -> EnrichedClientResult> { - self.get_l1_batch_details(number) - .rpc_context("get_l1_batch_details") - .with_arg("number", &number) - .await - } -} - #[derive(Debug, thiserror::Error)] -enum TreeDataFetcherError { +pub(crate) enum TreeDataFetcherError { #[error("error fetching data from main node")] Rpc(#[from] EnrichedClientError), #[error("internal error")] @@ -67,6 +48,8 @@ impl TreeDataFetcherError { } } +type TreeDataFetcherResult = Result; + #[derive(Debug, Serialize)] #[serde(untagged)] enum TreeDataFetcherHealth { @@ -108,7 +91,7 @@ enum StepOutcome { /// data will be checked against L1 commitment transactions by Consistency checker. #[derive(Debug)] pub struct TreeDataFetcher { - main_node_client: Box, + data_provider: Box, pool: ConnectionPool, metrics: &'static TreeDataFetcherMetrics, health_updater: HealthUpdater, @@ -123,7 +106,7 @@ impl TreeDataFetcher { /// Creates a new fetcher connected to the main node. pub fn new(client: Box>, pool: ConnectionPool) -> Self { Self { - main_node_client: Box::new(client.for_component("tree_data_fetcher")), + data_provider: Box::new(client.for_component("tree_data_fetcher")), pool, metrics: &METRICS, health_updater: ReactiveHealthCheck::new("tree_data_fetcher").1, @@ -133,6 +116,23 @@ impl TreeDataFetcher { } } + /// Attempts to fetch root hashes from L1 (namely, `BlockCommit` events emitted by the diamond + /// proxy) if possible. The main node will still be used as a fallback in case communicating + /// with L1 fails, or for newer batches, which may not be committed on L1. + pub fn with_l1_data( + mut self, + eth_client: Box>, + diamond_proxy_address: Address, + ) -> anyhow::Result { + let l1_provider = L1DataProvider::new( + self.pool.clone(), + eth_client.for_component("tree_data_fetcher"), + diamond_proxy_address, + )?; + self.data_provider = Box::new(l1_provider.with_fallback(self.data_provider)); + Ok(self) + } + /// Returns a health check for this fetcher. pub fn health_check(&self) -> ReactiveHealthCheck { self.health_updater.subscribe() @@ -171,29 +171,30 @@ impl TreeDataFetcher { }) } - async fn step(&self) -> Result { + async fn step(&mut self) -> Result { let Some(l1_batch_to_fetch) = self.get_batch_to_fetch().await? else { return Ok(StepOutcome::NoProgress); }; tracing::debug!("Fetching tree data for L1 batch #{l1_batch_to_fetch} from main node"); let stage_latency = self.metrics.stage_latency[&ProcessingStage::Fetch].start(); - let batch_details = self - .main_node_client - .batch_details(l1_batch_to_fetch) - .await? - .with_context(|| { - format!( + let root_hash_result = self.data_provider.batch_details(l1_batch_to_fetch).await?; + stage_latency.observe(); + let root_hash = match root_hash_result { + Ok(hash) => hash, + Err(MissingData::Batch) => { + let err = anyhow::anyhow!( "L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present on the main node, \ which is assumed to store batch info indefinitely" - ) - })?; - stage_latency.observe(); - let Some(root_hash) = batch_details.base.root_hash else { - tracing::debug!( - "L1 batch #{l1_batch_to_fetch} does not have root hash computed on the main node" - ); - return Ok(StepOutcome::RemoteHashMissing); + ); + return Err(err.into()); + } + Err(MissingData::RootHash) => { + tracing::debug!( + "L1 batch #{l1_batch_to_fetch} does not have root hash computed on the main node" + ); + return Ok(StepOutcome::RemoteHashMissing); + } }; let stage_latency = self.metrics.stage_latency[&ProcessingStage::Persistence].start(); @@ -226,7 +227,7 @@ impl TreeDataFetcher { /// Runs this component until a fatal error occurs or a stop signal is received. Transient /// errors (e.g., no network connection) are handled gracefully by retrying after a delay. - pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + pub async fn run(mut self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { self.metrics.observe_info(&self); self.health_updater .update(Health::from(HealthStatus::Ready)); diff --git a/core/node/node_sync/src/tree_data_fetcher/provider/mod.rs b/core/node/node_sync/src/tree_data_fetcher/provider/mod.rs new file mode 100644 index 000000000000..a744929f5c56 --- /dev/null +++ b/core/node/node_sync/src/tree_data_fetcher/provider/mod.rs @@ -0,0 +1,328 @@ +use std::fmt; + +use anyhow::Context; +use async_trait::async_trait; +use zksync_dal::{ConnectionPool, Core, CoreDal}; +use zksync_eth_client::EthInterface; +use zksync_types::{web3, Address, L1BatchNumber, H256, U256, U64}; +use zksync_web3_decl::{ + client::{DynClient, L1, L2}, + error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult}, + jsonrpsee::core::ClientError, + namespaces::ZksNamespaceClient, +}; + +use super::TreeDataFetcherResult; + +#[cfg(test)] +mod tests; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum MissingData { + /// The provider lacks a requested L1 batch. + #[error("no requested L1 batch")] + Batch, + /// The provider lacks a root hash for a requested L1 batch; the batch itself is present on the + /// provider. + #[error("no root hash for L1 batch")] + RootHash, +} + +/// External provider of tree data, such as main node (via JSON-RPC). +#[async_trait] +pub(crate) trait TreeDataProvider: fmt::Debug + Send + Sync + 'static { + /// Fetches a state root hash for the L1 batch with the specified number. + /// + /// It is guaranteed that this method will be called with monotonically increasing `number`s + /// (although not necessarily sequential ones). + async fn batch_details( + &mut self, + number: L1BatchNumber, + ) -> TreeDataFetcherResult>; +} + +#[async_trait] +impl TreeDataProvider for Box> { + async fn batch_details( + &mut self, + number: L1BatchNumber, + ) -> TreeDataFetcherResult> { + let Some(batch_details) = self + .get_l1_batch_details(number) + .rpc_context("get_l1_batch_details") + .with_arg("number", &number) + .await? + else { + return Ok(Err(MissingData::Batch)); + }; + Ok(batch_details.base.root_hash.ok_or(MissingData::RootHash)) + } +} + +#[derive(Debug, Clone, Copy)] +struct PastL1BatchInfo { + number: L1BatchNumber, + l1_commit_block_number: U64, + l1_commit_block_timestamp: U256, +} + +/// Provider of tree data loading it from L1 `BlockCommit` events emitted by the diamond proxy +/// contract. Should be used together with an L2 provider because L1 data can be missing for latest +/// batches, and the provider implementation uses assumptions that can break in some corner cases. +/// +/// # Implementation details +/// +/// To limit the range of L1 blocks for `eth_getLogs` calls, the provider assumes that an L1 block +/// with a `BlockCommit` event for a certain L1 batch is relatively close to L1 batch sealing. Thus, +/// the provider finds an approximate L1 block number for the event using binary search, or uses an +/// L1 block number of the `BlockCommit` event for the previously queried L1 batch (provided it's +/// not too far behind the seal timestamp of the batch). +#[derive(Debug)] +pub(super) struct L1DataProvider { + pool: ConnectionPool, + eth_client: Box>, + diamond_proxy_address: Address, + block_commit_signature: H256, + past_l1_batch: Option, +} + +impl L1DataProvider { + /// Accuracy when guessing L1 block number by L1 batch timestamp. + const L1_BLOCK_ACCURACY: U64 = U64([1_000]); + /// Range of L1 blocks queried via `eth_getLogs`. Should be at least several times greater than + /// `L1_BLOCK_ACCURACY`, but not large enough to trigger request limiting on the L1 RPC + /// provider. + const L1_BLOCK_RANGE: U64 = U64([20_000]); + + pub fn new( + pool: ConnectionPool, + eth_client: Box>, + diamond_proxy_address: Address, + ) -> anyhow::Result { + let block_commit_signature = zksync_contracts::hyperchain_contract() + .event("BlockCommit") + .context("missing `BlockCommit` event")? + .signature(); + Ok(Self { + pool, + eth_client, + diamond_proxy_address, + block_commit_signature, + past_l1_batch: None, + }) + } + + async fn l1_batch_seal_timestamp(&self, number: L1BatchNumber) -> anyhow::Result { + let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?; + let (_, last_l2_block_number) = storage + .blocks_dal() + .get_l2_block_range_of_l1_batch(number) + .await? + .with_context(|| format!("L1 batch #{number} does not have L2 blocks"))?; + let block_header = storage + .blocks_dal() + .get_l2_block_header(last_l2_block_number) + .await? + .with_context(|| format!("L2 block #{last_l2_block_number} (last block in L1 batch #{number}) disappeared"))?; + Ok(block_header.timestamp) + } + + /// Guesses the number of an L1 block with a `BlockCommit` event for the specified L1 batch. + /// The guess is based on the L1 batch seal timestamp. + async fn guess_l1_commit_block_number( + eth_client: &DynClient, + l1_batch_seal_timestamp: u64, + ) -> EnrichedClientResult { + let l1_batch_seal_timestamp = U256::from(l1_batch_seal_timestamp); + let (latest_number, latest_timestamp) = + Self::get_block(eth_client, web3::BlockNumber::Latest).await?; + if latest_timestamp < l1_batch_seal_timestamp { + return Ok(latest_number); // No better estimate at this point + } + let (earliest_number, earliest_timestamp) = + Self::get_block(eth_client, web3::BlockNumber::Earliest).await?; + if earliest_timestamp > l1_batch_seal_timestamp { + return Ok(earliest_number); // No better estimate at this point + } + + // At this point, we have `earliest_timestamp <= l1_batch_seal_timestamp <= + // latest_timestamp`. Binary-search the range until we're sort of accurate. + let mut left = earliest_number; + let mut right = latest_number; + while left + Self::L1_BLOCK_ACCURACY < right { + let middle = (left + right) / 2; + let (_, middle_timestamp) = + Self::get_block(eth_client, web3::BlockNumber::Number(middle)).await?; + if middle_timestamp <= l1_batch_seal_timestamp { + left = middle; + } else { + right = middle; + } + } + Ok(left) + } + + /// Gets a block that should be present on L1. + async fn get_block( + eth_client: &DynClient, + number: web3::BlockNumber, + ) -> EnrichedClientResult<(U64, U256)> { + let block = eth_client.block(number.into()).await?.ok_or_else(|| { + let err = "block is missing on L1 RPC provider"; + EnrichedClientError::new(ClientError::Custom(err.into()), "get_block") + .with_arg("number", &number) + })?; + let number = block.number.ok_or_else(|| { + let err = "block is missing a number"; + EnrichedClientError::new(ClientError::Custom(err.into()), "get_block") + .with_arg("number", &number) + })?; + Ok((number, block.timestamp)) + } + + pub fn with_fallback(self, fallback: Box) -> CombinedDataProvider { + CombinedDataProvider { + l1: Some(self), + fallback, + } + } +} + +#[async_trait] +impl TreeDataProvider for L1DataProvider { + async fn batch_details( + &mut self, + number: L1BatchNumber, + ) -> TreeDataFetcherResult> { + let l1_batch_seal_timestamp = self.l1_batch_seal_timestamp(number).await?; + let from_block = self.past_l1_batch.and_then(|info| { + assert!( + info.number < number, + "`batch_details()` must be called with monotonically increasing numbers" + ); + let threshold_timestamp = info.l1_commit_block_timestamp + Self::L1_BLOCK_RANGE.as_u64() / 2; + if U256::from(l1_batch_seal_timestamp) > threshold_timestamp { + tracing::debug!( + number = number.0, + "L1 batch #{number} seal timestamp ({l1_batch_seal_timestamp}) is too far ahead \ + of the previous processed L1 batch ({info:?}); not using L1 batch info" + ); + None + } else { + // This is an exact lower boundary: L1 batches are committed in order + Some(info.l1_commit_block_number) + } + }); + + let from_block = match from_block { + Some(number) => number, + None => { + let approximate_block = Self::guess_l1_commit_block_number( + self.eth_client.as_ref(), + l1_batch_seal_timestamp, + ) + .await?; + tracing::debug!( + number = number.0, + "Guessed L1 block number for L1 batch #{number} commit: {approximate_block}" + ); + // Subtract to account for imprecise L1 and L2 timestamps etc. + approximate_block.saturating_sub(Self::L1_BLOCK_ACCURACY) + } + }; + + let number_topic = H256::from_low_u64_be(number.0.into()); + let filter = web3::FilterBuilder::default() + .address(vec![self.diamond_proxy_address]) + .from_block(web3::BlockNumber::Number(from_block)) + .to_block(web3::BlockNumber::Number(from_block + Self::L1_BLOCK_RANGE)) + .topics( + Some(vec![self.block_commit_signature]), + Some(vec![number_topic]), + None, + None, + ) + .build(); + let mut logs = self.eth_client.logs(&filter).await?; + logs.retain(|log| !log.is_removed() && log.block_number.is_some()); + + match logs.as_slice() { + [] => Ok(Err(MissingData::Batch)), + [log] => { + let root_hash_topic = log.topics.get(2).copied().ok_or_else(|| { + let err = "Bogus `BlockCommit` event, does not have the root hash topic"; + EnrichedClientError::new(ClientError::Custom(err.into()), "batch_details") + .with_arg("filter", &filter) + .with_arg("log", &log) + })?; + // `unwrap()` is safe due to the filtering above + let l1_commit_block_number = log.block_number.unwrap(); + + let l1_commit_block = self.eth_client.block(l1_commit_block_number.into()).await?; + let l1_commit_block = l1_commit_block.ok_or_else(|| { + let err = "Block disappeared from L1 RPC provider"; + EnrichedClientError::new(ClientError::Custom(err.into()), "batch_details") + .with_arg("number", &l1_commit_block_number) + })?; + self.past_l1_batch = Some(PastL1BatchInfo { + number, + l1_commit_block_number, + l1_commit_block_timestamp: l1_commit_block.timestamp, + }); + Ok(Ok(root_hash_topic)) + } + _ => { + tracing::warn!( + "Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}: {logs:?}" + ); + Ok(Err(MissingData::RootHash)) + } + } + } +} + +/// Data provider combining [`L1DataProvider`] with a fallback provider. +#[derive(Debug)] +pub(super) struct CombinedDataProvider { + l1: Option, + fallback: Box, +} + +#[async_trait] +impl TreeDataProvider for CombinedDataProvider { + async fn batch_details( + &mut self, + number: L1BatchNumber, + ) -> TreeDataFetcherResult> { + if let Some(l1) = &mut self.l1 { + match l1.batch_details(number).await { + Err(err) => { + if err.is_transient() { + tracing::info!( + number = number.0, + "Transient error calling L1 data provider: {err}" + ); + } else { + tracing::warn!( + number = number.0, + "Fatal error calling L1 data provider: {err}" + ); + self.l1 = None; + } + } + Ok(Ok(root_hash)) => return Ok(Ok(root_hash)), + Ok(Err(missing_data)) => { + tracing::debug!( + number = number.0, + "L1 data provider misses batch data: {missing_data}" + ); + // No sense of calling the L1 provider in the future; the L2 provider will very + // likely get information about batches significantly + // faster. + self.l1 = None; + } + } + } + self.fallback.batch_details(number).await + } +} diff --git a/core/node/node_sync/src/tree_data_fetcher/provider/tests.rs b/core/node/node_sync/src/tree_data_fetcher/provider/tests.rs new file mode 100644 index 000000000000..8bb5cc63390e --- /dev/null +++ b/core/node/node_sync/src/tree_data_fetcher/provider/tests.rs @@ -0,0 +1,244 @@ +//! Tests for tree data providers. + +use assert_matches::assert_matches; +use once_cell::sync::Lazy; +use test_casing::test_casing; +use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; +use zksync_web3_decl::client::MockClient; + +use super::*; +use crate::tree_data_fetcher::tests::{seal_l1_batch_with_timestamp, MockMainNodeClient}; + +const DIAMOND_PROXY_ADDRESS: Address = Address::repeat_byte(0x22); + +static BLOCK_COMMIT_SIGNATURE: Lazy = Lazy::new(|| { + zksync_contracts::hyperchain_contract() + .event("BlockCommit") + .expect("missing `BlockCommit` event") + .signature() +}); + +struct EthereumParameters { + block_number: U64, + // L1 block numbers in which L1 batches are committed starting from L1 batch #1 + l1_blocks_for_commits: Vec, +} + +impl EthereumParameters { + fn new(block_number: u64) -> Self { + Self { + block_number: block_number.into(), + l1_blocks_for_commits: vec![], + } + } + + fn push_commit(&mut self, l1_block_number: u64) { + assert!(l1_block_number <= self.block_number.as_u64()); + + let l1_block_number = U64::from(l1_block_number); + let last_commit = self.l1_blocks_for_commits.last().copied(); + let is_increasing = last_commit.map_or(true, |last_number| last_number <= l1_block_number); + assert!(is_increasing, "Invalid L1 block number for commit"); + + self.l1_blocks_for_commits.push(l1_block_number); + } + + fn filter_logs(logs: &[web3::Log], filter: web3::Filter) -> Vec { + let Some(web3::BlockNumber::Number(filter_from)) = filter.from_block else { + panic!("Unexpected filter: {filter:?}"); + }; + let Some(web3::BlockNumber::Number(filter_to)) = filter.to_block else { + panic!("Unexpected filter: {filter:?}"); + }; + let filter_block_range = filter_from..=filter_to; + + let filter_addresses = filter.address.unwrap().flatten(); + let filter_topics = filter.topics.unwrap(); + let filter_topics: Vec<_> = filter_topics + .into_iter() + .map(|topic| topic.map(web3::ValueOrArray::flatten)) + .collect(); + + let filtered_logs = logs.iter().filter(|log| { + if !filter_addresses.contains(&log.address) { + return false; + } + if !filter_block_range.contains(&log.block_number.unwrap()) { + return false; + } + filter_topics + .iter() + .zip(&log.topics) + .all(|(filter_topics, actual_topic)| match filter_topics { + Some(topics) => topics.contains(actual_topic), + None => true, + }) + }); + filtered_logs.cloned().collect() + } + + fn client(&self) -> MockClient { + let logs = self + .l1_blocks_for_commits + .iter() + .enumerate() + .map(|(i, &l1_block_number)| { + let l1_batch_number = H256::from_low_u64_be(i as u64 + 1); + let root_hash = H256::repeat_byte(i as u8 + 1); + web3::Log { + address: DIAMOND_PROXY_ADDRESS, + topics: vec![ + *BLOCK_COMMIT_SIGNATURE, + l1_batch_number, + root_hash, + H256::zero(), // commitment hash; not used + ], + block_number: Some(l1_block_number), + ..web3::Log::default() + } + }); + let logs: Vec<_> = logs.collect(); + let block_number = self.block_number; + + MockClient::builder(L1::default()) + .method("eth_blockNumber", move || Ok(block_number)) + .method( + "eth_getBlockByNumber", + move |number: web3::BlockNumber, with_txs: bool| { + assert!(!with_txs); + + let number = match number { + web3::BlockNumber::Number(number) => number, + web3::BlockNumber::Latest => block_number, + web3::BlockNumber::Earliest => U64::zero(), + _ => panic!("Unexpected number: {number:?}"), + }; + if number > block_number { + return Ok(None); + } + Ok(Some(web3::Block:: { + number: Some(number), + timestamp: U256::from(number.as_u64()), // timestamp == number + ..web3::Block::default() + })) + }, + ) + .method("eth_getLogs", move |filter: web3::Filter| { + Ok(Self::filter_logs(&logs, filter)) + }) + .build() + } +} + +#[tokio::test] +async fn guessing_l1_commit_block_number() { + let eth_params = EthereumParameters::new(100_000); + let eth_client = eth_params.client(); + + for timestamp in [0, 100, 1_000, 5_000, 10_000, 100_000] { + let guessed_block_number = + L1DataProvider::guess_l1_commit_block_number(ð_client, timestamp) + .await + .unwrap(); + + assert!( + guessed_block_number.abs_diff(timestamp.into()) <= L1DataProvider::L1_BLOCK_ACCURACY, + "timestamp={timestamp}, guessed={guessed_block_number}" + ); + } +} + +async fn test_using_l1_data_provider(l1_batch_timestamps: &[u64]) { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + let mut eth_params = EthereumParameters::new(1_000_000); + for (number, &ts) in l1_batch_timestamps.iter().enumerate() { + let number = L1BatchNumber(number as u32 + 1); + seal_l1_batch_with_timestamp(&mut storage, number, ts).await; + eth_params.push_commit(ts + 1_000); // have a reasonable small diff between batch generation and commitment + } + drop(storage); + + let mut provider = + L1DataProvider::new(pool, Box::new(eth_params.client()), DIAMOND_PROXY_ADDRESS).unwrap(); + for i in 0..l1_batch_timestamps.len() { + let number = L1BatchNumber(i as u32 + 1); + let root_hash = provider + .batch_details(number) + .await + .unwrap() + .expect("no root hash"); + assert_eq!(root_hash, H256::repeat_byte(number.0 as u8)); + + let past_l1_batch = provider.past_l1_batch.unwrap(); + assert_eq!(past_l1_batch.number, number); + let expected_l1_block_number = eth_params.l1_blocks_for_commits[i]; + assert_eq!( + past_l1_batch.l1_commit_block_number, + expected_l1_block_number + ); + assert_eq!( + past_l1_batch.l1_commit_block_timestamp, + expected_l1_block_number.as_u64().into() + ); + } +} + +#[test_casing(4, [500, 1_500, 10_000, 30_000])] +#[tokio::test] +async fn using_l1_data_provider(batch_spacing: u64) { + let l1_batch_timestamps: Vec<_> = (0..10).map(|i| 50_000 + batch_spacing * i).collect(); + test_using_l1_data_provider(&l1_batch_timestamps).await; +} + +#[tokio::test] +async fn combined_data_provider_errors() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + let mut eth_params = EthereumParameters::new(1_000_000); + seal_l1_batch_with_timestamp(&mut storage, L1BatchNumber(1), 50_000).await; + eth_params.push_commit(51_000); + seal_l1_batch_with_timestamp(&mut storage, L1BatchNumber(2), 52_000).await; + drop(storage); + + let mut main_node_client = MockMainNodeClient::default(); + main_node_client.insert_batch(L1BatchNumber(2), H256::repeat_byte(2)); + let mut provider = + L1DataProvider::new(pool, Box::new(eth_params.client()), DIAMOND_PROXY_ADDRESS) + .unwrap() + .with_fallback(Box::new(main_node_client)); + + // L1 batch #1 should be obtained from L1 + let root_hash = provider + .batch_details(L1BatchNumber(1)) + .await + .unwrap() + .expect("no root hash"); + assert_eq!(root_hash, H256::repeat_byte(1)); + assert!(provider.l1.is_some()); + + // L1 batch #2 should be obtained from L2 + let root_hash = provider + .batch_details(L1BatchNumber(2)) + .await + .unwrap() + .expect("no root hash"); + assert_eq!(root_hash, H256::repeat_byte(2)); + assert!(provider.l1.is_none()); + + // L1 batch #3 is not present anywhere. + let missing = provider + .batch_details(L1BatchNumber(3)) + .await + .unwrap() + .unwrap_err(); + assert_matches!(missing, MissingData::Batch); +} diff --git a/core/node/node_sync/src/tree_data_fetcher/tests.rs b/core/node/node_sync/src/tree_data_fetcher/tests.rs index d1192e3ea942..cb25842f0517 100644 --- a/core/node/node_sync/src/tree_data_fetcher/tests.rs +++ b/core/node/node_sync/src/tree_data_fetcher/tests.rs @@ -8,64 +8,78 @@ use std::{ }; use assert_matches::assert_matches; +use async_trait::async_trait; use test_casing::test_casing; use zksync_dal::Connection; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; -use zksync_node_test_utils::{create_l1_batch, prepare_recovery_snapshot}; +use zksync_node_test_utils::{create_l1_batch, create_l2_block, prepare_recovery_snapshot}; use zksync_types::{AccountTreeId, Address, L2BlockNumber, StorageKey, StorageLog, H256}; use zksync_web3_decl::jsonrpsee::core::ClientError; use super::{metrics::StepOutcomeLabel, *}; #[derive(Debug, Default)] -struct MockMainNodeClient { +pub(super) struct MockMainNodeClient { transient_error: Arc, - batch_details_responses: HashMap, + batch_details_responses: HashMap, +} + +impl MockMainNodeClient { + pub fn insert_batch(&mut self, number: L1BatchNumber, root_hash: H256) { + self.batch_details_responses.insert(number, root_hash); + } } #[async_trait] -impl MainNodeClient for MockMainNodeClient { +impl TreeDataProvider for MockMainNodeClient { async fn batch_details( - &self, + &mut self, number: L1BatchNumber, - ) -> EnrichedClientResult> { + ) -> TreeDataFetcherResult> { if self.transient_error.fetch_and(false, Ordering::Relaxed) { let err = ClientError::RequestTimeout; - return Err(EnrichedClientError::new(err, "batch_details")); + return Err(EnrichedClientError::new(err, "batch_details").into()); } - Ok(self.batch_details_responses.get(&number).cloned()) + Ok(self + .batch_details_responses + .get(&number) + .copied() + .ok_or(MissingData::Batch)) } } -fn mock_l1_batch_details(number: L1BatchNumber, root_hash: Option) -> api::L1BatchDetails { - api::L1BatchDetails { - number, - base: api::BlockDetailsBase { - timestamp: number.0.into(), - l1_tx_count: 0, - l2_tx_count: 10, - root_hash, - status: api::BlockStatus::Sealed, - commit_tx_hash: None, - committed_at: None, - prove_tx_hash: None, - proven_at: None, - execute_tx_hash: None, - executed_at: None, - l1_gas_price: 123, - l2_fair_gas_price: 456, - base_system_contracts_hashes: Default::default(), - }, - } +async fn seal_l1_batch(storage: &mut Connection<'_, Core>, number: L1BatchNumber) { + seal_l1_batch_with_timestamp(storage, number, number.0.into()).await; } -async fn seal_l1_batch(storage: &mut Connection<'_, Core>, number: L1BatchNumber) { +pub(super) async fn seal_l1_batch_with_timestamp( + storage: &mut Connection<'_, Core>, + number: L1BatchNumber, + timestamp: u64, +) { let mut transaction = storage.start_transaction().await.unwrap(); + // Insert a single L2 block belonging to the batch. + let mut block_header = create_l2_block(number.0); + block_header.timestamp = timestamp; + transaction + .blocks_dal() + .insert_l2_block(&block_header) + .await + .unwrap(); + + let mut batch_header = create_l1_batch(number.0); + batch_header.timestamp = timestamp; + transaction + .blocks_dal() + .insert_mock_l1_batch(&batch_header) + .await + .unwrap(); transaction .blocks_dal() - .insert_mock_l1_batch(&create_l1_batch(number.0)) + .mark_l2_blocks_as_executed_in_l1_batch(batch_header.number) .await .unwrap(); + // One initial write per L1 batch let initial_writes = [StorageKey::new( AccountTreeId::new(Address::repeat_byte(1)), @@ -87,11 +101,11 @@ struct FetcherHarness { } impl FetcherHarness { - fn new(client: impl MainNodeClient, pool: ConnectionPool) -> Self { + fn new(client: impl TreeDataProvider, pool: ConnectionPool) -> Self { let (updates_sender, updates_receiver) = mpsc::unbounded_channel(); let metrics = &*Box::leak(Box::::default()); let fetcher = TreeDataFetcher { - main_node_client: Box::new(client), + data_provider: Box::new(client), pool: pool.clone(), metrics, health_updater: ReactiveHealthCheck::new("tree_data_fetcher").1, @@ -117,12 +131,13 @@ async fn tree_data_fetcher_steps() { let mut client = MockMainNodeClient::default(); for number in 1..=5 { let number = L1BatchNumber(number); - let details = mock_l1_batch_details(number, Some(H256::from_low_u64_be(number.0.into()))); - client.batch_details_responses.insert(number, details); + client + .batch_details_responses + .insert(number, H256::from_low_u64_be(number.0.into())); seal_l1_batch(&mut storage, number).await; } - let fetcher = FetcherHarness::new(client, pool.clone()).fetcher; + let mut fetcher = FetcherHarness::new(client, pool.clone()).fetcher; for number in 1..=5 { let step_outcome = fetcher.step().await.unwrap(); assert_matches!( @@ -181,12 +196,13 @@ async fn tree_data_fetcher_steps_after_snapshot_recovery() { let mut client = MockMainNodeClient::default(); for i in 1..=5 { let number = snapshot.l1_batch_number + i; - let details = mock_l1_batch_details(number, Some(H256::from_low_u64_be(number.0.into()))); - client.batch_details_responses.insert(number, details); + client + .batch_details_responses + .insert(number, H256::from_low_u64_be(number.0.into())); seal_l1_batch(&mut storage, number).await; } - let fetcher = FetcherHarness::new(client, pool.clone()).fetcher; + let mut fetcher = FetcherHarness::new(client, pool.clone()).fetcher; for i in 1..=5 { let step_outcome = fetcher.step().await.unwrap(); assert_matches!( @@ -212,8 +228,9 @@ async fn tree_data_fetcher_recovers_from_transient_errors() { let mut client = MockMainNodeClient::default(); for number in 1..=5 { let number = L1BatchNumber(number); - let details = mock_l1_batch_details(number, Some(H256::from_low_u64_be(number.0.into()))); - client.batch_details_responses.insert(number, details); + client + .batch_details_responses + .insert(number, H256::from_low_u64_be(number.0.into())); } let transient_error = client.transient_error.clone(); @@ -278,21 +295,20 @@ impl SlowMainNode { } #[async_trait] -impl MainNodeClient for SlowMainNode { +impl TreeDataProvider for SlowMainNode { async fn batch_details( - &self, + &mut self, number: L1BatchNumber, - ) -> EnrichedClientResult> { + ) -> TreeDataFetcherResult> { if number != L1BatchNumber(1) { - return Ok(None); + return Ok(Err(MissingData::Batch)); } let request_count = self.request_count.fetch_add(1, Ordering::Relaxed); - let root_hash = if request_count >= self.compute_root_hash_after { - Some(H256::repeat_byte(1)) + Ok(if request_count >= self.compute_root_hash_after { + Ok(H256::repeat_byte(1)) } else { - None - }; - Ok(Some(mock_l1_batch_details(number, root_hash))) + Err(MissingData::RootHash) + }) } } diff --git a/core/tests/loadnext/src/sdk/ethereum/mod.rs b/core/tests/loadnext/src/sdk/ethereum/mod.rs index 403d18a3c721..2b25c5b61623 100644 --- a/core/tests/loadnext/src/sdk/ethereum/mod.rs +++ b/core/tests/loadnext/src/sdk/ethereum/mod.rs @@ -4,7 +4,8 @@ use std::time::{Duration, Instant}; use serde_json::{Map, Value}; use zksync_eth_client::{ - clients::SigningClient, BoundEthInterface, CallFunctionArgs, Error, EthInterface, Options, + clients::SigningClient, BoundEthInterface, CallFunctionArgs, ContractCallError, EthInterface, + Options, }; use zksync_eth_signer::EthereumSigner; use zksync_types::{ @@ -158,7 +159,9 @@ impl EthereumProvider { .call(self.query_client()) .await .map_err(|err| match err { - Error::EthereumGateway(err) => ClientError::NetworkError(err.to_string()), + ContractCallError::EthereumGateway(err) => { + ClientError::NetworkError(err.to_string()) + } _ => ClientError::MalformedResponse(err.to_string()), }) } @@ -194,7 +197,9 @@ impl EthereumProvider { .call(self.query_client()) .await .map_err(|err| match err { - Error::EthereumGateway(err) => ClientError::NetworkError(err.to_string()), + ContractCallError::EthereumGateway(err) => { + ClientError::NetworkError(err.to_string()) + } _ => ClientError::MalformedResponse(err.to_string()), }) } @@ -361,7 +366,7 @@ impl EthereumProvider { gas_limit: U256, gas_per_pubdata_byte: u32, gas_price: Option, - ) -> Result { + ) -> Result { let gas_price = if let Some(gas_price) = gas_price { gas_price } else {