diff --git a/core/bin/snapshots_creator/src/tests.rs b/core/bin/snapshots_creator/src/tests.rs index 240857843f59..b8e6234817bb 100644 --- a/core/bin/snapshots_creator/src/tests.rs +++ b/core/bin/snapshots_creator/src/tests.rs @@ -197,7 +197,7 @@ async fn prepare_postgres( create_miniblock(conn, MiniblockNumber(block_number), logs.clone()).await; let factory_deps = gen_factory_deps(rng, 10); - conn.storage_dal() + conn.factory_deps_dal() .insert_factory_deps(MiniblockNumber(block_number), &factory_deps) .await .unwrap(); diff --git a/core/lib/dal/.sqlx/query-1d2cc4b485536af350089cf7950be3b85419fde77038dd3de6c55aa9c55d375c.json b/core/lib/dal/.sqlx/query-1d2cc4b485536af350089cf7950be3b85419fde77038dd3de6c55aa9c55d375c.json deleted file mode 100644 index b8929febf761..000000000000 --- a/core/lib/dal/.sqlx/query-1d2cc4b485536af350089cf7950be3b85419fde77038dd3de6c55aa9c55d375c.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n storage.value AS \"value!\",\n tokens.l1_address AS \"l1_address!\",\n tokens.l2_address AS \"l2_address!\",\n tokens.symbol AS \"symbol!\",\n tokens.name AS \"name!\",\n tokens.decimals AS \"decimals!\",\n tokens.usd_price AS \"usd_price?\"\n FROM\n storage\n INNER JOIN tokens ON storage.address = tokens.l2_address\n OR (\n storage.address = $2\n AND tokens.l2_address = $3\n )\n WHERE\n storage.hashed_key = ANY ($1)\n AND storage.value != $4\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "value!", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "l1_address!", - "type_info": "Bytea" - }, - { - "ordinal": 2, - "name": "l2_address!", - "type_info": "Bytea" - }, - { - "ordinal": 3, - "name": "symbol!", - "type_info": "Varchar" - }, - { - "ordinal": 4, - "name": "name!", - "type_info": "Varchar" - }, - { - "ordinal": 5, - "name": "decimals!", - "type_info": "Int4" - }, - { - "ordinal": 6, - "name": "usd_price?", - "type_info": "Numeric" - } - ], - "parameters": { - "Left": [ - "ByteaArray", - "Bytea", - "Bytea", - "Bytea" - ] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - true - ] - }, - "hash": "1d2cc4b485536af350089cf7950be3b85419fde77038dd3de6c55aa9c55d375c" -} diff --git a/core/lib/dal/.sqlx/query-6d08cadce92f03b0596e068884854a701146f4b2dc92361f42598a5f9a72c36b.json b/core/lib/dal/.sqlx/query-6d08cadce92f03b0596e068884854a701146f4b2dc92361f42598a5f9a72c36b.json new file mode 100644 index 000000000000..0e3846acc8bc --- /dev/null +++ b/core/lib/dal/.sqlx/query-6d08cadce92f03b0596e068884854a701146f4b2dc92361f42598a5f9a72c36b.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n value\n FROM\n storage_logs\n WHERE\n storage_logs.hashed_key = $1\n AND storage_logs.miniblock_number <= $2\n ORDER BY\n storage_logs.miniblock_number DESC,\n storage_logs.operation_number DESC\n LIMIT\n 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Bytea", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "6d08cadce92f03b0596e068884854a701146f4b2dc92361f42598a5f9a72c36b" +} diff --git a/core/lib/dal/.sqlx/query-9334df89c9562d4b35611b8e5ffb17305343df99ebc55f240278b5c4e63f89f5.json b/core/lib/dal/.sqlx/query-9334df89c9562d4b35611b8e5ffb17305343df99ebc55f240278b5c4e63f89f5.json deleted file mode 100644 index 92e74026bf57..000000000000 --- a/core/lib/dal/.sqlx/query-9334df89c9562d4b35611b8e5ffb17305343df99ebc55f240278b5c4e63f89f5.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n value\n FROM\n storage\n WHERE\n hashed_key = $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "value", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Bytea" - ] - }, - "nullable": [ - false - ] - }, - "hash": "9334df89c9562d4b35611b8e5ffb17305343df99ebc55f240278b5c4e63f89f5" -} diff --git a/core/lib/dal/.sqlx/query-a45dd1d1344bf6b3447a2d72949067ee3bff92b04766e98336ca9338a19aef46.json b/core/lib/dal/.sqlx/query-a45dd1d1344bf6b3447a2d72949067ee3bff92b04766e98336ca9338a19aef46.json deleted file mode 100644 index 69ce70a7f4a2..000000000000 --- a/core/lib/dal/.sqlx/query-a45dd1d1344bf6b3447a2d72949067ee3bff92b04766e98336ca9338a19aef46.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n hashed_key,\n value AS \"value!\"\n FROM\n storage\n WHERE\n hashed_key = ANY ($1)\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "hashed_key", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "value!", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "ByteaArray" - ] - }, - "nullable": [ - false, - false - ] - }, - "hash": "a45dd1d1344bf6b3447a2d72949067ee3bff92b04766e98336ca9338a19aef46" -} diff --git a/core/lib/dal/.sqlx/query-b33e8da69281efe7750043e409d9871731c41cef01da3d6aaf2c53f7b17c47b2.json b/core/lib/dal/.sqlx/query-b33e8da69281efe7750043e409d9871731c41cef01da3d6aaf2c53f7b17c47b2.json deleted file mode 100644 index 1ece82073712..000000000000 --- a/core/lib/dal/.sqlx/query-b33e8da69281efe7750043e409d9871731c41cef01da3d6aaf2c53f7b17c47b2.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n value\n FROM\n storage_logs\n WHERE\n storage_logs.hashed_key = $1\n AND storage_logs.miniblock_number <= $2\n ORDER BY\n storage_logs.miniblock_number DESC,\n storage_logs.operation_number DESC\n LIMIT\n 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "value", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Bytea", - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "b33e8da69281efe7750043e409d9871731c41cef01da3d6aaf2c53f7b17c47b2" -} diff --git a/core/lib/dal/.sqlx/query-c4ea7812861a283448095acbb1164420a25eef488de2b67e91ed39657667bd4a.json b/core/lib/dal/.sqlx/query-c4ea7812861a283448095acbb1164420a25eef488de2b67e91ed39657667bd4a.json deleted file mode 100644 index 6a74606e484f..000000000000 --- a/core/lib/dal/.sqlx/query-c4ea7812861a283448095acbb1164420a25eef488de2b67e91ed39657667bd4a.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n l1_address,\n l2_address\n FROM\n tokens\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "l1_address", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "l2_address", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - }, - "hash": "c4ea7812861a283448095acbb1164420a25eef488de2b67e91ed39657667bd4a" -} diff --git a/core/lib/dal/src/accounts_dal.rs b/core/lib/dal/src/accounts_dal.rs deleted file mode 100644 index a1323bf9517b..000000000000 --- a/core/lib/dal/src/accounts_dal.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::collections::HashMap; - -use zksync_types::{ - tokens::ETHEREUM_ADDRESS, utils::storage_key_for_standard_token_balance, AccountTreeId, - Address, L2_ETH_TOKEN_ADDRESS, U256, -}; - -use crate::{SqlxError, StorageProcessor}; - -#[derive(Debug)] -pub struct AccountsDal<'a, 'c> { - pub(super) storage: &'a mut StorageProcessor<'c>, -} - -impl AccountsDal<'_, '_> { - pub async fn get_balances_for_address( - &mut self, - address: Address, - ) -> Result, SqlxError> { - let token_l2_addresses: Vec
= self - .storage - .tokens_dal() - .get_well_known_token_addresses() - .await - .into_iter() - .map(|(_, l2_address)| l2_address) - .collect(); - - let hashed_keys: Vec> = token_l2_addresses - .into_iter() - .map(|mut l2_token_address| { - if l2_token_address == ETHEREUM_ADDRESS { - l2_token_address = L2_ETH_TOKEN_ADDRESS; - } - storage_key_for_standard_token_balance( - AccountTreeId::new(l2_token_address), - &address, - ) - .hashed_key() - .0 - .to_vec() - }) - .collect(); - let rows = sqlx::query!( - r#" - SELECT - storage.value AS "value!", - tokens.l1_address AS "l1_address!", - tokens.l2_address AS "l2_address!", - tokens.symbol AS "symbol!", - tokens.name AS "name!", - tokens.decimals AS "decimals!", - tokens.usd_price AS "usd_price?" - FROM - storage - INNER JOIN tokens ON storage.address = tokens.l2_address - OR ( - storage.address = $2 - AND tokens.l2_address = $3 - ) - WHERE - storage.hashed_key = ANY ($1) - AND storage.value != $4 - "#, - &hashed_keys, - L2_ETH_TOKEN_ADDRESS.as_bytes(), - ETHEREUM_ADDRESS.as_bytes(), - vec![0u8; 32] - ) - .fetch_all(self.storage.conn()) - .await?; - - let result: HashMap = rows - .into_iter() - .map(|row| { - let balance = U256::from_big_endian(&row.value); - (Address::from_slice(&row.l2_address), balance) - }) - .collect(); - - Ok(result) - } -} diff --git a/core/lib/dal/src/factory_deps_dal.rs b/core/lib/dal/src/factory_deps_dal.rs new file mode 100644 index 000000000000..a23eb706f6b5 --- /dev/null +++ b/core/lib/dal/src/factory_deps_dal.rs @@ -0,0 +1,171 @@ +use std::collections::{HashMap, HashSet}; + +use zksync_contracts::{BaseSystemContracts, SystemContractCode}; +use zksync_types::{MiniblockNumber, H256, U256}; +use zksync_utils::{bytes_to_be_words, bytes_to_chunks}; + +use crate::StorageProcessor; + +/// DAL methods related to factory dependencies. +#[derive(Debug)] +pub struct FactoryDepsDal<'a, 'c> { + pub(crate) storage: &'a mut StorageProcessor<'c>, +} + +impl FactoryDepsDal<'_, '_> { + /// Inserts factory dependencies for a miniblock. Factory deps are specified as a map of + /// `(bytecode_hash, bytecode)` entries. + pub async fn insert_factory_deps( + &mut self, + block_number: MiniblockNumber, + factory_deps: &HashMap>, + ) -> sqlx::Result<()> { + let (bytecode_hashes, bytecodes): (Vec<_>, Vec<_>) = factory_deps + .iter() + .map(|dep| (dep.0.as_bytes(), dep.1.as_slice())) + .unzip(); + + // Copy from stdin can't be used here because of `ON CONFLICT`. + sqlx::query!( + r#" + INSERT INTO + factory_deps (bytecode_hash, bytecode, miniblock_number, created_at, updated_at) + SELECT + u.bytecode_hash, + u.bytecode, + $3, + NOW(), + NOW() + FROM + UNNEST($1::bytea[], $2::bytea[]) AS u (bytecode_hash, bytecode) + ON CONFLICT (bytecode_hash) DO NOTHING + "#, + &bytecode_hashes as &[&[u8]], + &bytecodes as &[&[u8]], + block_number.0 as i64, + ) + .execute(self.storage.conn()) + .await?; + + Ok(()) + } + + /// Returns bytecode for a factory dependency with the specified bytecode `hash`. + pub async fn get_factory_dep(&mut self, hash: H256) -> Option> { + sqlx::query!( + r#" + SELECT + bytecode + FROM + factory_deps + WHERE + bytecode_hash = $1 + "#, + hash.as_bytes(), + ) + .fetch_optional(self.storage.conn()) + .await + .unwrap() + .map(|row| row.bytecode) + } + + pub async fn get_base_system_contracts( + &mut self, + bootloader_hash: H256, + default_aa_hash: H256, + ) -> BaseSystemContracts { + let bootloader_bytecode = self + .get_factory_dep(bootloader_hash) + .await + .expect("Bootloader code should be present in the database"); + let bootloader_code = SystemContractCode { + code: bytes_to_be_words(bootloader_bytecode), + hash: bootloader_hash, + }; + + let default_aa_bytecode = self + .get_factory_dep(default_aa_hash) + .await + .expect("Default account code should be present in the database"); + + let default_aa_code = SystemContractCode { + code: bytes_to_be_words(default_aa_bytecode), + hash: default_aa_hash, + }; + BaseSystemContracts { + bootloader: bootloader_code, + default_aa: default_aa_code, + } + } + + /// Returns bytecodes for factory deps with the specified `hashes`. + pub async fn get_factory_deps( + &mut self, + hashes: &HashSet, + ) -> HashMap> { + let hashes_as_bytes: Vec<_> = hashes.iter().map(H256::as_bytes).collect(); + + sqlx::query!( + r#" + SELECT + bytecode, + bytecode_hash + FROM + factory_deps + WHERE + bytecode_hash = ANY ($1) + "#, + &hashes_as_bytes as &[&[u8]], + ) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| { + ( + U256::from_big_endian(&row.bytecode_hash), + bytes_to_chunks(&row.bytecode), + ) + }) + .collect() + } + + /// Returns bytecode hashes for factory deps from miniblocks with number strictly greater + /// than `block_number`. + pub async fn get_factory_deps_for_revert( + &mut self, + block_number: MiniblockNumber, + ) -> sqlx::Result> { + Ok(sqlx::query!( + r#" + SELECT + bytecode_hash + FROM + factory_deps + WHERE + miniblock_number > $1 + "#, + block_number.0 as i64 + ) + .fetch_all(self.storage.conn()) + .await? + .into_iter() + .map(|row| H256::from_slice(&row.bytecode_hash)) + .collect()) + } + + /// Removes all factory deps with a miniblock number strictly greater than the specified `block_number`. + pub async fn rollback_factory_deps(&mut self, block_number: MiniblockNumber) { + sqlx::query!( + r#" + DELETE FROM factory_deps + WHERE + miniblock_number > $1 + "#, + block_number.0 as i64 + ) + .execute(self.storage.conn()) + .await + .unwrap(); + } +} diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index 3a5691a1c93e..0ca98a84be2c 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -5,11 +5,11 @@ pub use sqlx::{types::BigDecimal, Error as SqlxError}; pub use crate::connection::ConnectionPool; use crate::{ - accounts_dal::AccountsDal, basic_witness_input_producer_dal::BasicWitnessInputProducerDal, - blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal, connection::holder::ConnectionHolder, + basic_witness_input_producer_dal::BasicWitnessInputProducerDal, blocks_dal::BlocksDal, + blocks_web3_dal::BlocksWeb3Dal, connection::holder::ConnectionHolder, consensus_dal::ConsensusDal, contract_verification_dal::ContractVerificationDal, eth_sender_dal::EthSenderDal, events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, - fri_gpu_prover_queue_dal::FriGpuProverQueueDal, + factory_deps_dal::FactoryDepsDal, fri_gpu_prover_queue_dal::FriGpuProverQueueDal, fri_proof_compressor_dal::FriProofCompressorDal, fri_protocol_versions_dal::FriProtocolVersionsDal, fri_prover_dal::FriProverDal, fri_scheduler_dependency_tracker_dal::FriSchedulerDependencyTrackerDal, @@ -17,7 +17,7 @@ use crate::{ protocol_versions_dal::ProtocolVersionsDal, protocol_versions_web3_dal::ProtocolVersionsWeb3Dal, snapshot_recovery_dal::SnapshotRecoveryDal, snapshots_creator_dal::SnapshotsCreatorDal, - snapshots_dal::SnapshotsDal, storage_dal::StorageDal, storage_logs_dal::StorageLogsDal, + snapshots_dal::SnapshotsDal, storage_logs_dal::StorageLogsDal, storage_logs_dedup_dal::StorageLogsDedupDal, storage_web3_dal::StorageWeb3Dal, sync_dal::SyncDal, system_dal::SystemDal, tokens_dal::TokensDal, tokens_web3_dal::TokensWeb3Dal, transactions_dal::TransactionsDal, @@ -26,7 +26,6 @@ use crate::{ #[macro_use] mod macro_utils; -pub mod accounts_dal; pub mod basic_witness_input_producer_dal; pub mod blocks_dal; pub mod blocks_web3_dal; @@ -36,6 +35,7 @@ pub mod contract_verification_dal; pub mod eth_sender_dal; pub mod events_dal; pub mod events_web3_dal; +pub mod factory_deps_dal; pub mod fri_gpu_prover_queue_dal; pub mod fri_proof_compressor_dal; pub mod fri_protocol_versions_dal; @@ -52,7 +52,7 @@ pub mod protocol_versions_web3_dal; pub mod snapshot_recovery_dal; pub mod snapshots_creator_dal; pub mod snapshots_dal; -pub mod storage_dal; +mod storage_dal; pub mod storage_logs_dal; pub mod storage_logs_dedup_dal; pub mod storage_web3_dal; @@ -129,10 +129,6 @@ impl<'a> StorageProcessor<'a> { TransactionsWeb3Dal { storage: self } } - pub fn accounts_dal(&mut self) -> AccountsDal<'_, 'a> { - AccountsDal { storage: self } - } - pub fn basic_witness_input_producer_dal(&mut self) -> BasicWitnessInputProducerDal<'_, 'a> { BasicWitnessInputProducerDal { storage: self } } @@ -161,8 +157,8 @@ impl<'a> StorageProcessor<'a> { EventsWeb3Dal { storage: self } } - pub fn storage_dal(&mut self) -> StorageDal<'_, 'a> { - StorageDal { storage: self } + pub fn factory_deps_dal(&mut self) -> FactoryDepsDal<'_, 'a> { + FactoryDepsDal { storage: self } } pub fn storage_web3_dal(&mut self) -> StorageWeb3Dal<'_, 'a> { @@ -173,6 +169,12 @@ impl<'a> StorageProcessor<'a> { StorageLogsDal { storage: self } } + #[deprecated(note = "Soft-removed in favor of `storage_logs`; don't use")] + #[allow(deprecated)] + pub fn storage_dal(&mut self) -> storage_dal::StorageDal<'_, 'a> { + storage_dal::StorageDal { storage: self } + } + pub fn storage_logs_dedup_dal(&mut self) -> StorageLogsDedupDal<'_, 'a> { StorageLogsDedupDal { storage: self } } diff --git a/core/lib/dal/src/protocol_versions_dal.rs b/core/lib/dal/src/protocol_versions_dal.rs index 8aad040221c4..63f09dde5286 100644 --- a/core/lib/dal/src/protocol_versions_dal.rs +++ b/core/lib/dal/src/protocol_versions_dal.rs @@ -124,7 +124,7 @@ impl ProtocolVersionsDal<'_, '_> { .unwrap(); let contracts = self .storage - .storage_dal() + .factory_deps_dal() .get_base_system_contracts( H256::from_slice(&row.bootloader_code_hash), H256::from_slice(&row.default_account_code_hash), @@ -155,7 +155,7 @@ impl ProtocolVersionsDal<'_, '_> { if let Some(row) = row { Some( self.storage - .storage_dal() + .factory_deps_dal() .get_base_system_contracts( H256::from_slice(&row.bootloader_code_hash), H256::from_slice(&row.default_account_code_hash), diff --git a/core/lib/dal/src/storage_dal.rs b/core/lib/dal/src/storage_dal.rs index 1155cae4a3be..10a32e7c7975 100644 --- a/core/lib/dal/src/storage_dal.rs +++ b/core/lib/dal/src/storage_dal.rs @@ -1,161 +1,18 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use itertools::Itertools; -use zksync_contracts::{BaseSystemContracts, SystemContractCode}; -use zksync_types::{MiniblockNumber, StorageKey, StorageLog, StorageValue, H256, U256}; -use zksync_utils::{bytes_to_be_words, bytes_to_chunks}; +use zksync_types::{StorageKey, StorageLog, StorageValue, H256}; -use crate::{instrument::InstrumentExt, StorageProcessor}; +use crate::StorageProcessor; #[derive(Debug)] pub struct StorageDal<'a, 'c> { pub(crate) storage: &'a mut StorageProcessor<'c>, } +#[deprecated(note = "Soft-removed in favor of `storage_logs`; don't use")] impl StorageDal<'_, '_> { - /// Inserts factory dependencies for a miniblock. Factory deps are specified as a map of - /// `(bytecode_hash, bytecode)` entries. - pub async fn insert_factory_deps( - &mut self, - block_number: MiniblockNumber, - factory_deps: &HashMap>, - ) -> sqlx::Result<()> { - let (bytecode_hashes, bytecodes): (Vec<_>, Vec<_>) = factory_deps - .iter() - .map(|dep| (dep.0.as_bytes(), dep.1.as_slice())) - .unzip(); - - // Copy from stdin can't be used here because of `ON CONFLICT`. - sqlx::query!( - r#" - INSERT INTO - factory_deps (bytecode_hash, bytecode, miniblock_number, created_at, updated_at) - SELECT - u.bytecode_hash, - u.bytecode, - $3, - NOW(), - NOW() - FROM - UNNEST($1::bytea[], $2::bytea[]) AS u (bytecode_hash, bytecode) - ON CONFLICT (bytecode_hash) DO NOTHING - "#, - &bytecode_hashes as &[&[u8]], - &bytecodes as &[&[u8]], - block_number.0 as i64, - ) - .execute(self.storage.conn()) - .await?; - - Ok(()) - } - - /// Returns bytecode for a factory dependency with the specified bytecode `hash`. - pub async fn get_factory_dep(&mut self, hash: H256) -> Option> { - sqlx::query!( - r#" - SELECT - bytecode - FROM - factory_deps - WHERE - bytecode_hash = $1 - "#, - hash.as_bytes(), - ) - .fetch_optional(self.storage.conn()) - .await - .unwrap() - .map(|row| row.bytecode) - } - - pub async fn get_base_system_contracts( - &mut self, - bootloader_hash: H256, - default_aa_hash: H256, - ) -> BaseSystemContracts { - let bootloader_bytecode = self - .get_factory_dep(bootloader_hash) - .await - .expect("Bootloader code should be present in the database"); - let bootloader_code = SystemContractCode { - code: bytes_to_be_words(bootloader_bytecode), - hash: bootloader_hash, - }; - - let default_aa_bytecode = self - .get_factory_dep(default_aa_hash) - .await - .expect("Default account code should be present in the database"); - - let default_aa_code = SystemContractCode { - code: bytes_to_be_words(default_aa_bytecode), - hash: default_aa_hash, - }; - BaseSystemContracts { - bootloader: bootloader_code, - default_aa: default_aa_code, - } - } - - /// Returns bytecodes for factory deps with the specified `hashes`. - pub async fn get_factory_deps( - &mut self, - hashes: &HashSet, - ) -> HashMap> { - let hashes_as_bytes: Vec<_> = hashes.iter().map(H256::as_bytes).collect(); - - sqlx::query!( - r#" - SELECT - bytecode, - bytecode_hash - FROM - factory_deps - WHERE - bytecode_hash = ANY ($1) - "#, - &hashes_as_bytes as &[&[u8]], - ) - .fetch_all(self.storage.conn()) - .await - .unwrap() - .into_iter() - .map(|row| { - ( - U256::from_big_endian(&row.bytecode_hash), - bytes_to_chunks(&row.bytecode), - ) - }) - .collect() - } - - /// Returns bytecode hashes for factory deps from miniblocks with number strictly greater - /// than `block_number`. - pub async fn get_factory_deps_for_revert( - &mut self, - block_number: MiniblockNumber, - ) -> sqlx::Result> { - Ok(sqlx::query!( - r#" - SELECT - bytecode_hash - FROM - factory_deps - WHERE - miniblock_number > $1 - "#, - block_number.0 as i64 - ) - .fetch_all(self.storage.conn()) - .await? - .into_iter() - .map(|row| H256::from_slice(&row.bytecode_hash)) - .collect()) - } - /// Applies the specified storage logs for a miniblock. Returns the map of unique storage updates. - // We likely don't need `storage` table at all, as we have `storage_logs` table pub async fn apply_storage_logs( &mut self, updates: &[(H256, Vec)], @@ -221,42 +78,18 @@ impl StorageDal<'_, '_> { unique_updates } - /// Gets the current storage value at the specified `key`. - pub async fn get_by_key(&mut self, key: &StorageKey) -> sqlx::Result> { - let hashed_key = key.hashed_key(); - let row = sqlx::query!( - r#" - SELECT - value - FROM - storage - WHERE - hashed_key = $1 - "#, - hashed_key.as_bytes() - ) - .instrument("get_by_key") - .report_latency() - .with_arg("key", &hashed_key) - .fetch_optional(self.storage.conn()) - .await?; - - Ok(row.map(|row| H256::from_slice(&row.value))) - } - - /// Removes all factory deps with a miniblock number strictly greater than the specified `block_number`. - pub async fn rollback_factory_deps(&mut self, block_number: MiniblockNumber) { - sqlx::query!( - r#" - DELETE FROM factory_deps - WHERE - miniblock_number > $1 - "#, - block_number.0 as i64 - ) - .execute(self.storage.conn()) - .await - .unwrap(); + #[cfg(test)] + pub(crate) async fn get_by_key(&mut self, key: &StorageKey) -> sqlx::Result> { + use sqlx::Row as _; + + let row = sqlx::query("SELECT value FROM storage WHERE hashed_key = $1::bytea") + .bind(key.hashed_key().as_bytes()) + .fetch_optional(self.storage.conn()) + .await?; + Ok(row.map(|row| { + let raw_value: Vec = row.get("value"); + H256::from_slice(&raw_value) + })) } } @@ -267,6 +100,7 @@ mod tests { use super::*; use crate::ConnectionPool; + #[allow(deprecated)] #[tokio::test] async fn applying_storage_logs() { let pool = ConnectionPool::test_pool().await; diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index fa09a15bdf34..e5ec8b570c94 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -136,6 +136,7 @@ impl StorageLogsDal<'_, '_> { } /// Rolls back storage to the specified point in time. + #[deprecated(note = "`storage` table is soft-removed")] pub async fn rollback_storage( &mut self, last_miniblock_to_keep: MiniblockNumber, @@ -250,7 +251,10 @@ impl StorageLogsDal<'_, '_> { } /// Removes all storage logs with a miniblock number strictly greater than the specified `block_number`. - pub async fn rollback_storage_logs(&mut self, block_number: MiniblockNumber) { + pub async fn rollback_storage_logs( + &mut self, + block_number: MiniblockNumber, + ) -> sqlx::Result<()> { sqlx::query!( r#" DELETE FROM storage_logs @@ -260,8 +264,8 @@ impl StorageLogsDal<'_, '_> { block_number.0 as i64 ) .execute(self.storage.conn()) - .await - .unwrap(); + .await?; + Ok(()) } pub async fn is_contract_deployed_at_address(&mut self, address: Address) -> bool { @@ -808,6 +812,7 @@ mod tests { conn.storage_logs_dal() .insert_storage_logs(MiniblockNumber(number), &logs) .await; + #[allow(deprecated)] conn.storage_dal().apply_storage_logs(&logs).await; conn.blocks_dal() .mark_miniblocks_as_executed_in_l1_batch(L1BatchNumber(number)) @@ -845,6 +850,7 @@ mod tests { conn.storage_logs_dal() .append_storage_logs(MiniblockNumber(1), &more_logs) .await; + #[allow(deprecated)] conn.storage_dal().apply_storage_logs(&more_logs).await; let touched_slots = conn @@ -872,12 +878,27 @@ mod tests { let logs = vec![log, other_log, new_key_log]; insert_miniblock(conn, 2, logs).await; - let value = conn.storage_dal().get_by_key(&key).await.unwrap(); - assert_eq!(value, Some(H256::repeat_byte(0xff))); - let value = conn.storage_dal().get_by_key(&second_key).await.unwrap(); - assert_eq!(value, Some(H256::zero())); - let value = conn.storage_dal().get_by_key(&new_key).await.unwrap(); - assert_eq!(value, Some(H256::repeat_byte(0xfe))); + let value = conn.storage_web3_dal().get_value(&key).await.unwrap(); + assert_eq!(value, H256::repeat_byte(0xff)); + let value = conn + .storage_web3_dal() + .get_value(&second_key) + .await + .unwrap(); + assert_eq!(value, H256::zero()); + let value = conn.storage_web3_dal().get_value(&new_key).await.unwrap(); + assert_eq!(value, H256::repeat_byte(0xfe)); + + // Check the outdated `storage` table as well. + #[allow(deprecated)] + { + let value = conn.storage_dal().get_by_key(&key).await.unwrap(); + assert_eq!(value, Some(H256::repeat_byte(0xff))); + let value = conn.storage_dal().get_by_key(&second_key).await.unwrap(); + assert_eq!(value, Some(H256::zero())); + let value = conn.storage_dal().get_by_key(&new_key).await.unwrap(); + assert_eq!(value, Some(H256::repeat_byte(0xfe))); + } let prev_keys = vec![key.hashed_key(), new_key.hashed_key(), H256::zero()]; let prev_values = conn @@ -890,17 +911,35 @@ mod tests { assert_eq!(prev_values[&prev_keys[1]], None); assert_eq!(prev_values[&prev_keys[2]], None); + #[allow(deprecated)] + { + conn.storage_logs_dal() + .rollback_storage(MiniblockNumber(1)) + .await + .unwrap(); + let value = conn.storage_dal().get_by_key(&key).await.unwrap(); + assert_eq!(value, Some(H256::repeat_byte(3))); + let value = conn.storage_dal().get_by_key(&second_key).await.unwrap(); + assert_eq!(value, Some(H256::repeat_byte(2))); + let value = conn.storage_dal().get_by_key(&new_key).await.unwrap(); + assert_eq!(value, None); + } + conn.storage_logs_dal() - .rollback_storage(MiniblockNumber(1)) + .rollback_storage_logs(MiniblockNumber(1)) .await .unwrap(); - let value = conn.storage_dal().get_by_key(&key).await.unwrap(); - assert_eq!(value, Some(H256::repeat_byte(3))); - let value = conn.storage_dal().get_by_key(&second_key).await.unwrap(); - assert_eq!(value, Some(H256::repeat_byte(2))); - let value = conn.storage_dal().get_by_key(&new_key).await.unwrap(); - assert_eq!(value, None); + let value = conn.storage_web3_dal().get_value(&key).await.unwrap(); + assert_eq!(value, H256::repeat_byte(3)); + let value = conn + .storage_web3_dal() + .get_value(&second_key) + .await + .unwrap(); + assert_eq!(value, H256::repeat_byte(2)); + let value = conn.storage_web3_dal().get_value(&new_key).await.unwrap(); + assert_eq!(value, H256::zero()); } #[tokio::test] diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index 312c46acba23..dab9b622871d 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -1,4 +1,4 @@ -use std::ops; +use std::{collections::HashMap, ops}; use zksync_types::{ get_code_key, get_nonce_key, @@ -23,7 +23,7 @@ impl StorageWeb3Dal<'_, '_> { &mut self, address: Address, block_number: MiniblockNumber, - ) -> Result { + ) -> sqlx::Result { let nonce_key = get_nonce_key(&address); let nonce_value = self .get_historical_value_unchecked(&nonce_key, block_number) @@ -37,7 +37,7 @@ impl StorageWeb3Dal<'_, '_> { token_id: AccountTreeId, account_id: AccountTreeId, block_number: MiniblockNumber, - ) -> Result { + ) -> sqlx::Result { let key = storage_key_for_standard_token_balance(token_id, account_id.address()); let balance = self .get_historical_value_unchecked(&key, block_number) @@ -45,48 +45,66 @@ impl StorageWeb3Dal<'_, '_> { Ok(h256_to_u256(balance)) } + /// Gets the current value for the specified `key`. + pub async fn get_value(&mut self, key: &StorageKey) -> sqlx::Result { + self.get_historical_value_unchecked(key, MiniblockNumber(u32::MAX)) + .await + } + + /// Gets the current values for the specified `hashed_keys`. The returned map has requested hashed keys as keys + /// and current storage values as values. + pub async fn get_values(&mut self, hashed_keys: &[H256]) -> sqlx::Result> { + let storage_map = self + .storage + .storage_logs_dal() + .get_storage_values(hashed_keys, MiniblockNumber(u32::MAX)) + .await?; + Ok(storage_map + .into_iter() + .map(|(key, value)| (key, value.unwrap_or_default())) + .collect()) + } + /// This method does not check if a block with this number exists in the database. /// It will return the current value if the block is in the future. pub async fn get_historical_value_unchecked( &mut self, key: &StorageKey, block_number: MiniblockNumber, - ) -> Result { - { - // We need to proper distinguish if the value is zero or None - // for the VM to correctly determine initial writes. - // So, we accept that the value is None if it's zero and it wasn't initially written at the moment. - let hashed_key = key.hashed_key(); + ) -> sqlx::Result { + // We need to proper distinguish if the value is zero or None + // for the VM to correctly determine initial writes. + // So, we accept that the value is None if it's zero and it wasn't initially written at the moment. + let hashed_key = key.hashed_key(); - sqlx::query!( - r#" - SELECT - value - FROM - storage_logs - WHERE - storage_logs.hashed_key = $1 - AND storage_logs.miniblock_number <= $2 - ORDER BY - storage_logs.miniblock_number DESC, - storage_logs.operation_number DESC - LIMIT - 1 - "#, - hashed_key.as_bytes(), - block_number.0 as i64 - ) - .instrument("get_historical_value_unchecked") - .report_latency() - .with_arg("key", &hashed_key) - .fetch_optional(self.storage.conn()) - .await - .map(|option_row| { - option_row - .map(|row| H256::from_slice(&row.value)) - .unwrap_or_else(H256::zero) - }) - } + sqlx::query!( + r#" + SELECT + value + FROM + storage_logs + WHERE + storage_logs.hashed_key = $1 + AND storage_logs.miniblock_number <= $2 + ORDER BY + storage_logs.miniblock_number DESC, + storage_logs.operation_number DESC + LIMIT + 1 + "#, + hashed_key.as_bytes(), + block_number.0 as i64 + ) + .instrument("get_historical_value_unchecked") + .report_latency() + .with_arg("key", &hashed_key) + .fetch_optional(self.storage.conn()) + .await + .map(|option_row| { + option_row + .map(|row| H256::from_slice(&row.value)) + .unwrap_or_else(H256::zero) + }) } /// Provides information about the L1 batch that the specified miniblock is a part of. diff --git a/core/lib/dal/src/tests/mod.rs b/core/lib/dal/src/tests/mod.rs index 8094f37216c5..da9c66d8a71f 100644 --- a/core/lib/dal/src/tests/mod.rs +++ b/core/lib/dal/src/tests/mod.rs @@ -206,7 +206,7 @@ async fn remove_stuck_txs() { // Get all txs transactions_dal.reset_mempool().await.unwrap(); - let (txs, _) = transactions_dal + let txs = transactions_dal .sync_mempool(&[], &[], 0, 0, 1000) .await .unwrap(); @@ -229,7 +229,7 @@ async fn remove_stuck_txs() { // Get all txs transactions_dal.reset_mempool().await.unwrap(); - let (txs, _) = transactions_dal + let txs = transactions_dal .sync_mempool(&[], &[], 0, 0, 1000) .await .unwrap(); @@ -242,7 +242,7 @@ async fn remove_stuck_txs() { .unwrap(); assert_eq!(removed_txs, 1); transactions_dal.reset_mempool().await.unwrap(); - let (txs, _) = transactions_dal + let txs = transactions_dal .sync_mempool(&[], &[], 0, 0, 1000) .await .unwrap(); diff --git a/core/lib/dal/src/tokens_dal.rs b/core/lib/dal/src/tokens_dal.rs index 1910ecbe9437..a912656c0836 100644 --- a/core/lib/dal/src/tokens_dal.rs +++ b/core/lib/dal/src/tokens_dal.rs @@ -85,33 +85,6 @@ impl TokensDal<'_, '_> { } } - pub(crate) async fn get_well_known_token_addresses(&mut self) -> Vec<(Address, Address)> { - { - let records = sqlx::query!( - r#" - SELECT - l1_address, - l2_address - FROM - tokens - "# - ) - .fetch_all(self.storage.conn()) - .await - .unwrap(); - let addresses: Vec<(Address, Address)> = records - .into_iter() - .map(|record| { - ( - Address::from_slice(&record.l1_address), - Address::from_slice(&record.l2_address), - ) - }) - .collect(); - addresses - } - } - pub async fn get_all_l2_token_addresses(&mut self) -> sqlx::Result> { let rows = sqlx::query!( r#" diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index b084a1ba01a1..dc76fca67321 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt, time::Duration}; +use std::{fmt, time::Duration}; use anyhow::Context; use bigdecimal::BigDecimal; @@ -7,16 +7,15 @@ use sqlx::{error, types::chrono::NaiveDateTime}; use zksync_types::{ block::MiniblockExecutionData, fee::TransactionExecutionMetrics, - get_nonce_key, l1::L1Tx, l2::L2Tx, protocol_version::ProtocolUpgradeTx, tx::{tx_execution_info::TxExecutionStatus, TransactionExecutionResult}, vm_trace::{Call, VmExecutionTrace}, - Address, ExecuteTransactionCommon, L1BatchNumber, L1BlockNumber, MiniblockNumber, Nonce, - PriorityOpId, Transaction, H256, PROTOCOL_UPGRADE_TX_TYPE, U256, + Address, ExecuteTransactionCommon, L1BatchNumber, L1BlockNumber, MiniblockNumber, PriorityOpId, + Transaction, H256, PROTOCOL_UPGRADE_TX_TYPE, U256, }; -use zksync_utils::{h256_to_u32, u256_to_big_decimal}; +use zksync_utils::u256_to_big_decimal; use crate::{ instrument::InstrumentExt, @@ -886,7 +885,7 @@ impl TransactionsDal<'_, '_> { gas_per_pubdata: u32, fee_per_gas: u64, limit: usize, - ) -> sqlx::Result<(Vec, HashMap)> { + ) -> sqlx::Result> { let stashed_addresses: Vec<_> = stashed_accounts.iter().map(Address::as_bytes).collect(); sqlx::query!( r#" @@ -969,43 +968,8 @@ impl TransactionsDal<'_, '_> { .fetch_all(self.storage.conn()) .await?; - let nonce_keys: HashMap<_, _> = transactions - .iter() - .map(|tx| { - let address = Address::from_slice(&tx.initiator_address); - let nonce_key = get_nonce_key(&address).hashed_key(); - (nonce_key, address) - }) - .collect(); - - let storage_keys: Vec<_> = nonce_keys.keys().map(H256::as_bytes).collect(); - let nonce_rows = sqlx::query!( - r#" - SELECT - hashed_key, - value AS "value!" - FROM - storage - WHERE - hashed_key = ANY ($1) - "#, - &storage_keys as &[&[u8]], - ) - .fetch_all(self.storage.conn()) - .await?; - - let nonces = nonce_rows - .into_iter() - .map(|row| { - let nonce_key = H256::from_slice(&row.hashed_key); - let nonce = Nonce(h256_to_u32(H256::from_slice(&row.value))); - (nonce_keys[&nonce_key], nonce) - }) - .collect(); - Ok(( - transactions.into_iter().map(|tx| tx.into()).collect(), - nonces, - )) + let transactions = transactions.into_iter().map(|tx| tx.into()).collect(); + Ok(transactions) } pub async fn reset_mempool(&mut self) -> sqlx::Result<()> { diff --git a/core/lib/mempool/src/lib.rs b/core/lib/mempool/src/lib.rs index 1c4fdd1217a2..449f1d8ae5fb 100644 --- a/core/lib/mempool/src/lib.rs +++ b/core/lib/mempool/src/lib.rs @@ -4,6 +4,6 @@ mod tests; mod types; pub use crate::{ - mempool_store::{MempoolInfo, MempoolStore}, + mempool_store::{MempoolInfo, MempoolStats, MempoolStore}, types::L2TxFilter, }; diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index 48fc2a813c95..31b202a7c1ee 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -238,7 +238,7 @@ impl<'a> SnapshotsApplier<'a> { .map(|dep| (hash_bytecode(&dep.bytecode.0), dep.bytecode.0)) .collect(); storage - .storage_dal() + .factory_deps_dal() .insert_factory_deps( self.applied_snapshot_status.miniblock_number, &all_deps_hashmap, diff --git a/core/lib/state/src/postgres/tests.rs b/core/lib/state/src/postgres/tests.rs index 75adcbba8c63..5ad4364b4769 100644 --- a/core/lib/state/src/postgres/tests.rs +++ b/core/lib/state/src/postgres/tests.rs @@ -226,7 +226,7 @@ fn test_factory_deps_cache(pool: &ConnectionPool, rt_handle: Handle) { .block_on( storage .connection - .storage_dal() + .factory_deps_dal() .insert_factory_deps(MiniblockNumber(0), &contracts), ) .unwrap(); diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index 956307566af3..e2b5bb66c4d0 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -513,7 +513,7 @@ impl RocksdbStorage { tracing::info!("Getting factory deps that need to be removed..."); let stage_start = Instant::now(); let factory_deps = connection - .storage_dal() + .factory_deps_dal() .get_factory_deps_for_revert(last_miniblock_to_keep) .await .with_context(|| { diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs index 38ca942e6795..c95325cdea69 100644 --- a/core/lib/state/src/rocksdb/tests.rs +++ b/core/lib/state/src/rocksdb/tests.rs @@ -163,7 +163,7 @@ async fn insert_factory_deps( let factory_deps = indices .map(|i| (H256::repeat_byte(i), vec![i; 64])) .collect(); - conn.storage_dal() + conn.factory_deps_dal() .insert_factory_deps(miniblock_number, &factory_deps) .await .unwrap(); @@ -369,9 +369,7 @@ async fn recovering_factory_deps_from_snapshot() { all_factory_deps.insert(bytecode_hash, bytecode.clone()); let number = MiniblockNumber(number); - // FIXME (PLA-589): don't store miniblocks once the corresponding foreign keys are removed - create_miniblock(&mut conn, number, vec![]).await; - conn.storage_dal() + conn.factory_deps_dal() .insert_factory_deps(number, &HashMap::from([(bytecode_hash, bytecode)])) .await .unwrap(); diff --git a/core/lib/state/src/test_utils.rs b/core/lib/state/src/test_utils.rs index 9c4fca8285ee..4c05faec50ee 100644 --- a/core/lib/state/src/test_utils.rs +++ b/core/lib/state/src/test_utils.rs @@ -24,7 +24,8 @@ pub(crate) async fn prepare_postgres(conn: &mut StorageProcessor<'_>) { conn.storage_logs_dal() .rollback_storage_logs(MiniblockNumber(0)) - .await; + .await + .unwrap(); conn.blocks_dal() .delete_miniblocks(MiniblockNumber(0)) .await @@ -138,19 +139,17 @@ pub(crate) async fn prepare_postgres_for_snapshot_recovery( .await .unwrap(); - // FIXME (PLA-589): don't store miniblock / L1 batch once the corresponding foreign keys are removed let snapshot_storage_logs = gen_storage_logs(100..200); - create_miniblock( - conn, - snapshot_recovery.miniblock_number, - snapshot_storage_logs.clone(), - ) - .await; - create_l1_batch( - conn, - snapshot_recovery.l1_batch_number, - &snapshot_storage_logs, - ) - .await; + conn.storage_logs_dal() + .insert_storage_logs( + snapshot_recovery.miniblock_number, + &[(H256::zero(), snapshot_storage_logs.clone())], + ) + .await; + let mut written_keys: Vec<_> = snapshot_storage_logs.iter().map(|log| log.key).collect(); + written_keys.sort_unstable(); + conn.storage_logs_dedup_dal() + .insert_initial_writes(snapshot_recovery.l1_batch_number, &written_keys) + .await; (snapshot_recovery, snapshot_storage_logs) } diff --git a/core/lib/vm_utils/src/storage.rs b/core/lib/vm_utils/src/storage.rs index 53298e44562b..a05c84b911d4 100644 --- a/core/lib/vm_utils/src/storage.rs +++ b/core/lib/vm_utils/src/storage.rs @@ -50,7 +50,7 @@ pub async fn load_l1_batch_params( .hash; let base_system_contracts = storage - .storage_dal() + .factory_deps_dal() .get_base_system_contracts( pending_miniblock_header .base_system_contracts_hashes diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index 7d5f1ab483fe..f93e67aa8caf 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -570,10 +570,9 @@ impl TxSender { let balance = self .acquire_replica_connection() .await? - .storage_dal() - .get_by_key(ð_balance_key) - .await? - .unwrap_or_default(); + .storage_web3_dal() + .get_value(ð_balance_key) + .await?; Ok(h256_to_u256(balance)) } @@ -710,16 +709,15 @@ impl TxSender { let account_code_hash = self .acquire_replica_connection() .await? - .storage_dal() - .get_by_key(&hashed_key) + .storage_web3_dal() + .get_value(&hashed_key) .await .with_context(|| { format!( "failed getting code hash for account {:?}", tx.initiator_account() ) - })? - .unwrap_or_default(); + })?; if !tx.is_l1() && account_code_hash == H256::zero() diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs index 269250c295df..c4b3042503a7 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs @@ -16,10 +16,11 @@ use zksync_types::{ l2_to_l1_log::L2ToL1Log, tokens::ETHEREUM_ADDRESS, transaction_request::CallRequest, + utils::storage_key_for_standard_token_balance, AccountTreeId, L1BatchNumber, MiniblockNumber, StorageKey, Transaction, L1_MESSENGER_ADDRESS, L2_ETH_TOKEN_ADDRESS, REQUIRED_L1_TO_L2_GAS_PER_PUBDATA_BYTE, U256, U64, }; -use zksync_utils::{address_to_h256, ratio_to_big_decimal_normalized}; +use zksync_utils::{address_to_h256, h256_to_u256, ratio_to_big_decimal_normalized}; use zksync_web3_decl::{ error::Web3Error, types::{Address, Token, H256}, @@ -215,20 +216,38 @@ impl ZksNamespace { let method_latency = API_METRICS.start_call(METHOD_NAME); let mut storage = self.access_storage(METHOD_NAME).await?; - let balances = storage - .accounts_dal() - .get_balances_for_address(address) + let tokens = storage + .tokens_dal() + .get_all_l2_token_addresses() + .await + .map_err(|err| internal_error(METHOD_NAME, err))?; + let hashed_balance_keys = tokens.iter().map(|&token_address| { + let token_account = AccountTreeId::new(if token_address == ETHEREUM_ADDRESS { + L2_ETH_TOKEN_ADDRESS + } else { + token_address + }); + let hashed_key = + storage_key_for_standard_token_balance(token_account, &address).hashed_key(); + (hashed_key, (hashed_key, token_address)) + }); + let (hashed_balance_keys, hashed_key_to_token_address): (Vec<_>, HashMap<_, _>) = + hashed_balance_keys.unzip(); + + let balance_values = storage + .storage_web3_dal() + .get_values(&hashed_balance_keys) .await .map_err(|err| internal_error(METHOD_NAME, err))?; - let balances = balances + let balances = balance_values .into_iter() - .map(|(address, balance)| { - if address == L2_ETH_TOKEN_ADDRESS { - (ETHEREUM_ADDRESS, balance) - } else { - (address, balance) + .filter_map(|(hashed_key, balance)| { + let balance = h256_to_u256(balance); + if balance.is_zero() { + return None; } + Some((hashed_key_to_token_address[&hashed_key], balance)) }) .collect(); method_latency.observe(); @@ -533,7 +552,7 @@ impl ZksNamespace { let method_latency = API_METRICS.start_call(METHOD_NAME); let mut storage = self.access_storage(METHOD_NAME).await?; - let bytecode = storage.storage_dal().get_factory_dep(hash).await; + let bytecode = storage.factory_deps_dal().get_factory_dep(hash).await; method_latency.observe(); Ok(bytecode) diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index 6b5f8a2fa1b7..70732d69cfe5 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, time::Instant}; use assert_matches::assert_matches; use async_trait::async_trait; use jsonrpsee::core::ClientError; +use multivm::zk_evm_latest::ethereum_types::U256; use tokio::sync::watch; use zksync_config::configs::{ api::Web3JsonRpcConfig, @@ -19,13 +20,15 @@ use zksync_types::{ get_nonce_key, l2::L2Tx, storage::get_code_key, + tokens::{TokenInfo, TokenMetadata}, tx::{ tx_execution_info::TxExecutionStatus, ExecutionMetrics, IncludedTxLocation, TransactionExecutionResult, }, - utils::storage_key_for_eth_balance, + utils::{storage_key_for_eth_balance, storage_key_for_standard_token_balance}, AccountTreeId, Address, L1BatchNumber, Nonce, StorageKey, StorageLog, VmEvent, H256, U64, }; +use zksync_utils::u256_to_h256; use zksync_web3_decl::{ jsonrpsee::{http_client::HttpClient, types::error::ErrorCode}, namespaces::{EthNamespaceClient, ZksNamespaceClient}, @@ -223,7 +226,7 @@ impl StorageInitialization { Self::Recovery { logs, factory_deps } => { prepare_recovery_snapshot(storage, Self::SNAPSHOT_RECOVERY_BLOCK, logs).await; storage - .storage_dal() + .factory_deps_dal() .insert_factory_deps( MiniblockNumber(Self::SNAPSHOT_RECOVERY_BLOCK), factory_deps, @@ -870,3 +873,73 @@ impl HttpTest for TransactionReceiptsTest { async fn transaction_receipts() { test_http_server(TransactionReceiptsTest).await; } + +#[derive(Debug)] +struct AllAccountBalancesTest; + +impl AllAccountBalancesTest { + const ADDRESS: Address = Address::repeat_byte(0x11); +} + +#[async_trait] +impl HttpTest for AllAccountBalancesTest { + async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { + let balances = client.get_all_account_balances(Self::ADDRESS).await?; + assert_eq!(balances, HashMap::new()); + + let mut storage = pool.access_storage().await?; + store_miniblock(&mut storage, MiniblockNumber(1), &[]).await?; + + let eth_balance_key = storage_key_for_eth_balance(&Self::ADDRESS); + let eth_balance = U256::one() << 64; + let eth_balance_log = StorageLog::new_write_log(eth_balance_key, u256_to_h256(eth_balance)); + storage + .storage_logs_dal() + .insert_storage_logs(MiniblockNumber(1), &[(H256::zero(), vec![eth_balance_log])]) + .await; + // Create a custom token, but don't set balance for it yet. + let custom_token = TokenInfo { + l1_address: Address::repeat_byte(0xfe), + l2_address: Address::repeat_byte(0xfe), + metadata: TokenMetadata::default(Address::repeat_byte(0xfe)), + }; + storage + .tokens_dal() + .add_tokens(vec![custom_token.clone()]) + .await; + + let balances = client.get_all_account_balances(Self::ADDRESS).await?; + assert_eq!(balances, HashMap::from([(Address::zero(), eth_balance)])); + + store_miniblock(&mut storage, MiniblockNumber(2), &[]).await?; + let token_balance_key = storage_key_for_standard_token_balance( + AccountTreeId::new(custom_token.l2_address), + &Self::ADDRESS, + ); + let token_balance = 123.into(); + let token_balance_log = + StorageLog::new_write_log(token_balance_key, u256_to_h256(token_balance)); + storage + .storage_logs_dal() + .insert_storage_logs( + MiniblockNumber(2), + &[(H256::zero(), vec![token_balance_log])], + ) + .await; + + let balances = client.get_all_account_balances(Self::ADDRESS).await?; + assert_eq!( + balances, + HashMap::from([ + (Address::zero(), eth_balance), + (custom_token.l2_address, token_balance), + ]) + ); + Ok(()) + } +} + +#[tokio::test] +async fn getting_all_account_balances() { + test_http_server(AllAccountBalancesTest).await; +} diff --git a/core/lib/zksync_core/src/api_server/web3/tests/vm.rs b/core/lib/zksync_core/src/api_server/web3/tests/vm.rs index ba5ca2ead005..b685e9fc0164 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/vm.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/vm.rs @@ -208,8 +208,11 @@ impl HttpTest for SendRawTransactionTest { // Manually set sufficient balance for the transaction account. let mut storage = pool.access_storage().await?; storage - .storage_dal() - .apply_storage_logs(&[(H256::zero(), vec![Self::balance_storage_log()])]) + .storage_logs_dal() + .append_storage_logs( + MiniblockNumber(0), + &[(H256::zero(), vec![Self::balance_storage_log()])], + ) .await; } diff --git a/core/lib/zksync_core/src/block_reverter/mod.rs b/core/lib/zksync_core/src/block_reverter/mod.rs index 9b3c23dfc844..06ed7d5c2397 100644 --- a/core/lib/zksync_core/src/block_reverter/mod.rs +++ b/core/lib/zksync_core/src/block_reverter/mod.rs @@ -258,20 +258,24 @@ impl BlockReverter { .await; tracing::info!("rolling back factory deps...."); transaction - .storage_dal() + .factory_deps_dal() .rollback_factory_deps(last_miniblock_to_keep) .await; + tracing::info!("rolling back storage..."); + #[allow(deprecated)] transaction .storage_logs_dal() .rollback_storage(last_miniblock_to_keep) .await .expect("failed rolling back storage"); + tracing::info!("rolling back storage logs..."); transaction .storage_logs_dal() .rollback_storage_logs(last_miniblock_to_keep) - .await; + .await + .unwrap(); tracing::info!("rolling back l1 batches..."); transaction .blocks_dal() diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index 88a23d17ba37..4dd9202297be 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -177,7 +177,7 @@ pub(super) struct StateKeeper { batch_sealed: bool, fee_per_gas: u64, - gas_per_pubdata: u32, + gas_per_pubdata: u64, pub(super) actions_sender: ActionQueueSender, pub(super) pool: ConnectionPool, diff --git a/core/lib/zksync_core/src/eth_watch/tests.rs b/core/lib/zksync_core/src/eth_watch/tests.rs index 76b170ba365e..aaa24746edd3 100644 --- a/core/lib/zksync_core/src/eth_watch/tests.rs +++ b/core/lib/zksync_core/src/eth_watch/tests.rs @@ -525,12 +525,11 @@ async fn test_overlapping_batches() { async fn get_all_db_txs(storage: &mut StorageProcessor<'_>) -> Vec { storage.transactions_dal().reset_mempool().await.unwrap(); - let (txs, _) = storage + storage .transactions_dal() .sync_mempool(&[], &[], 0, 0, 1000) .await - .unwrap(); - txs + .unwrap() } fn tx_into_log(tx: L1Tx) -> Log { diff --git a/core/lib/zksync_core/src/genesis.rs b/core/lib/zksync_core/src/genesis.rs index 9d9021c7c151..8e1725d11a7f 100644 --- a/core/lib/zksync_core/src/genesis.rs +++ b/core/lib/zksync_core/src/genesis.rs @@ -165,7 +165,7 @@ async fn insert_base_system_contracts_to_factory_deps( .collect(); storage - .storage_dal() + .factory_deps_dal() .insert_factory_deps(MiniblockNumber(0), &factory_deps) .await .unwrap(); @@ -264,6 +264,7 @@ async fn insert_system_contracts( .insert_initial_writes(L1BatchNumber(0), &written_storage_keys) .await; + #[allow(deprecated)] transaction .storage_dal() .apply_storage_logs(&storage_logs) @@ -274,7 +275,7 @@ async fn insert_system_contracts( .map(|c| (hash_bytecode(&c.bytecode), c.bytecode.clone())) .collect(); transaction - .storage_dal() + .factory_deps_dal() .insert_factory_deps(MiniblockNumber(0), &factory_deps) .await .unwrap(); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index cd1cb7b7f0ce..3f9a1f690d42 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -772,12 +772,8 @@ async fn add_state_keeper_to_task_futures, - ) -> usize { - let mut count = 0; - for (key, (_, value)) in unique_updates { - if *key.account().address() == ACCOUNT_CODE_STORAGE_ADDRESS { - let bytecode_hash = *value; - // TODO(SMA-1554): Support contracts deletion. - // For now, we expected that if the `bytecode_hash` is zero, the contract was not deployed - // in the first place, so we don't do anything - if bytecode_hash != H256::zero() { - count += 1; - } - } - } - count - } - fn extract_events(&self, is_fictive: bool) -> Vec<(IncludedTxLocation, Vec<&VmEvent>)> { self.group_by_tx_location(&self.miniblock.events, is_fictive, |event| event.location.1) } diff --git a/core/lib/zksync_core/src/state_keeper/io/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/io/tests/tester.rs index 6624e808e668..fe79bfc584b6 100644 --- a/core/lib/zksync_core/src/state_keeper/io/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/io/tests/tester.rs @@ -185,7 +185,7 @@ impl Tester { fee_per_gas: u64, gas_per_pubdata: u32, ) { - let tx = create_transaction(fee_per_gas, gas_per_pubdata); + let tx = create_transaction(fee_per_gas, gas_per_pubdata.into()); guard.insert(vec![tx], Default::default()); } } diff --git a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs index 7c57be2450de..d4f3e4961945 100644 --- a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs +++ b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs @@ -1,12 +1,16 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use anyhow::Context as _; use multivm::utils::derive_base_fee_and_gas_per_pubdata; +#[cfg(test)] +use tokio::sync::mpsc; use tokio::sync::watch; use zksync_config::configs::chain::MempoolConfig; -use zksync_dal::ConnectionPool; +use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_mempool::L2TxFilter; -use zksync_types::VmVersion; +#[cfg(test)] +use zksync_types::H256; +use zksync_types::{get_nonce_key, Address, Nonce, Transaction, VmVersion}; use super::{metrics::KEEPER_METRICS, types::MempoolGuard}; use crate::{api_server::execution_sandbox::BlockArgs, fee_model::BatchFeeModelInputProvider}; @@ -34,6 +38,9 @@ pub struct MempoolFetcher { batch_fee_input_provider: Arc, sync_interval: Duration, sync_batch_size: usize, + stuck_tx_timeout: Option, + #[cfg(test)] + transaction_hashes_sender: mpsc::UnboundedSender>, } impl MempoolFetcher { @@ -47,32 +54,32 @@ impl MempoolFetcher { batch_fee_input_provider, sync_interval: config.sync_interval(), sync_batch_size: config.sync_batch_size, + stuck_tx_timeout: config.remove_stuck_txs.then(|| config.stuck_tx_timeout()), + #[cfg(test)] + transaction_hashes_sender: mpsc::unbounded_channel().0, } } pub async fn run( mut self, pool: ConnectionPool, - remove_stuck_txs: bool, - stuck_tx_timeout: Duration, stop_receiver: watch::Receiver, ) -> anyhow::Result<()> { - { - let mut storage = pool.access_storage_tagged("state_keeper").await?; - if remove_stuck_txs { - let removed_txs = storage - .transactions_dal() - .remove_stuck_txs(stuck_tx_timeout) - .await - .context("failed removing stuck transactions")?; - tracing::info!("Number of stuck txs was removed: {removed_txs}"); - } - storage + let mut storage = pool.access_storage_tagged("state_keeper").await?; + if let Some(stuck_tx_timeout) = self.stuck_tx_timeout { + let removed_txs = storage .transactions_dal() - .reset_mempool() + .remove_stuck_txs(stuck_tx_timeout) .await - .context("failed resetting mempool")?; + .context("failed removing stuck transactions")?; + tracing::info!("Number of stuck txs was removed: {removed_txs}"); } + storage + .transactions_dal() + .reset_mempool() + .await + .context("failed resetting mempool")?; + drop(storage); loop { if *stop_receiver.borrow() { @@ -98,7 +105,7 @@ impl MempoolFetcher { ) .await; - let (transactions, nonces) = storage + let transactions = storage .transactions_dal() .sync_mempool( &mempool_info.stashed_accounts, @@ -109,11 +116,18 @@ impl MempoolFetcher { ) .await .context("failed syncing mempool")?; + let nonces = get_transaction_nonces(&mut storage, &transactions).await?; drop(storage); + #[cfg(test)] + { + let transaction_hashes = transactions.iter().map(Transaction::hash).collect(); + self.transaction_hashes_sender.send(transaction_hashes).ok(); + } let all_transactions_loaded = transactions.len() < self.sync_batch_size; self.mempool.insert(transactions, nonces); latency.observe(); + if all_transactions_loaded { tokio::time::sleep(self.sync_interval).await; } @@ -121,3 +135,218 @@ impl MempoolFetcher { Ok(()) } } + +/// Loads nonces for all distinct `transactions` initiators from the storage. +async fn get_transaction_nonces( + storage: &mut StorageProcessor<'_>, + transactions: &[Transaction], +) -> anyhow::Result> { + let (nonce_keys, address_by_nonce_key): (Vec<_>, HashMap<_, _>) = transactions + .iter() + .map(|tx| { + let address = tx.initiator_account(); + let nonce_key = get_nonce_key(&address).hashed_key(); + (nonce_key, (nonce_key, address)) + }) + .unzip(); + + let nonce_values = storage + .storage_web3_dal() + .get_values(&nonce_keys) + .await + .context("failed getting nonces from storage")?; + + Ok(nonce_values + .into_iter() + .map(|(nonce_key, nonce_value)| { + let nonce = Nonce(zksync_utils::h256_to_u32(nonce_value)); + (address_by_nonce_key[&nonce_key], nonce) + }) + .collect()) +} + +#[cfg(test)] +mod tests { + use zksync_types::{ + fee::TransactionExecutionMetrics, L2ChainId, MiniblockNumber, PriorityOpId, + ProtocolVersionId, StorageLog, H256, + }; + use zksync_utils::u256_to_h256; + + use super::*; + use crate::{ + genesis::{ensure_genesis_state, GenesisParams}, + utils::testonly::{create_l2_transaction, MockBatchFeeParamsProvider}, + }; + + const TEST_MEMPOOL_CONFIG: MempoolConfig = MempoolConfig { + sync_interval_ms: 10, + sync_batch_size: 100, + capacity: 100, + stuck_tx_timeout: 0, + remove_stuck_txs: false, + delay_interval: 10, + }; + + #[tokio::test] + async fn getting_transaction_nonces() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + + let transaction = create_l2_transaction(10, 100); + let transaction_initiator = transaction.initiator_account(); + let nonce_key = get_nonce_key(&transaction_initiator); + let nonce_log = StorageLog::new_write_log(nonce_key, u256_to_h256(42.into())); + storage + .storage_logs_dal() + .insert_storage_logs(MiniblockNumber(0), &[(H256::zero(), vec![nonce_log])]) + .await; + + let other_transaction = create_l2_transaction(10, 100); + let other_transaction_initiator = other_transaction.initiator_account(); + assert_ne!(other_transaction_initiator, transaction_initiator); + + let nonces = get_transaction_nonces( + &mut storage, + &[transaction.into(), other_transaction.into()], + ) + .await + .unwrap(); + assert_eq!( + nonces, + HashMap::from([ + (transaction_initiator, Nonce(42)), + (other_transaction_initiator, Nonce(0)), + ]) + ); + } + + #[tokio::test] + async fn syncing_mempool_basics() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let mempool = MempoolGuard::new(PriorityOpId(0), 100); + let fee_params_provider = Arc::new(MockBatchFeeParamsProvider::default()); + let fee_input = fee_params_provider.get_batch_fee_input().await; + let (base_fee, gas_per_pubdata) = + derive_base_fee_and_gas_per_pubdata(fee_input, ProtocolVersionId::latest().into()); + + let mut fetcher = + MempoolFetcher::new(mempool.clone(), fee_params_provider, &TEST_MEMPOOL_CONFIG); + let (tx_hashes_sender, mut tx_hashes_receiver) = mpsc::unbounded_channel(); + fetcher.transaction_hashes_sender = tx_hashes_sender; + let (stop_sender, stop_receiver) = watch::channel(false); + let fetcher_task = tokio::spawn(fetcher.run(pool.clone(), stop_receiver)); + + // Add a new transaction to the storage. + let transaction = create_l2_transaction(base_fee, gas_per_pubdata); + let transaction_hash = transaction.hash(); + storage + .transactions_dal() + .insert_transaction_l2(transaction, TransactionExecutionMetrics::default()) + .await; + + // Check that the transaction is eventually synced. + let tx_hashes = wait_for_new_transactions(&mut tx_hashes_receiver).await; + assert_eq!(tx_hashes, [transaction_hash]); + assert_eq!(mempool.stats().l2_transaction_count, 1); + + stop_sender.send_replace(true); + fetcher_task.await.unwrap().expect("fetcher errored"); + } + + async fn wait_for_new_transactions( + tx_hashes_receiver: &mut mpsc::UnboundedReceiver>, + ) -> Vec { + loop { + let tx_hashes = tx_hashes_receiver.recv().await.unwrap(); + if tx_hashes.is_empty() { + continue; + } + break tx_hashes; + } + } + + #[tokio::test] + async fn ignoring_transaction_with_insufficient_fee() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let mempool = MempoolGuard::new(PriorityOpId(0), 100); + let fee_params_provider = Arc::new(MockBatchFeeParamsProvider::default()); + let fee_input = fee_params_provider.get_batch_fee_input().await; + let (base_fee, gas_per_pubdata) = + derive_base_fee_and_gas_per_pubdata(fee_input, ProtocolVersionId::latest().into()); + + let fetcher = + MempoolFetcher::new(mempool.clone(), fee_params_provider, &TEST_MEMPOOL_CONFIG); + let (stop_sender, stop_receiver) = watch::channel(false); + let fetcher_task = tokio::spawn(fetcher.run(pool.clone(), stop_receiver)); + + // Add a transaction with insufficient fee to the storage. + let transaction = create_l2_transaction(base_fee / 2, gas_per_pubdata / 2); + storage + .transactions_dal() + .insert_transaction_l2(transaction, TransactionExecutionMetrics::default()) + .await; + + tokio::time::sleep(TEST_MEMPOOL_CONFIG.sync_interval() * 5).await; + assert_eq!(mempool.stats().l2_transaction_count, 0); + + stop_sender.send_replace(true); + fetcher_task.await.unwrap().expect("fetcher errored"); + } + + #[tokio::test] + async fn ignoring_transaction_with_old_nonce() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let mempool = MempoolGuard::new(PriorityOpId(0), 100); + let fee_params_provider = Arc::new(MockBatchFeeParamsProvider::default()); + let fee_input = fee_params_provider.get_batch_fee_input().await; + let (base_fee, gas_per_pubdata) = + derive_base_fee_and_gas_per_pubdata(fee_input, ProtocolVersionId::latest().into()); + + let mut fetcher = + MempoolFetcher::new(mempool.clone(), fee_params_provider, &TEST_MEMPOOL_CONFIG); + let (tx_hashes_sender, mut tx_hashes_receiver) = mpsc::unbounded_channel(); + fetcher.transaction_hashes_sender = tx_hashes_sender; + let (stop_sender, stop_receiver) = watch::channel(false); + let fetcher_task = tokio::spawn(fetcher.run(pool.clone(), stop_receiver)); + + // Add a new transaction to the storage. + let transaction = create_l2_transaction(base_fee * 2, gas_per_pubdata * 2); + assert_eq!(transaction.nonce(), Nonce(0)); + let transaction_hash = transaction.hash(); + let nonce_key = get_nonce_key(&transaction.initiator_account()); + let nonce_log = StorageLog::new_write_log(nonce_key, u256_to_h256(42.into())); + storage + .storage_logs_dal() + .append_storage_logs(MiniblockNumber(0), &[(H256::zero(), vec![nonce_log])]) + .await; + storage + .transactions_dal() + .insert_transaction_l2(transaction, TransactionExecutionMetrics::default()) + .await; + + // Check that the transaction is eventually synced. + let tx_hashes = wait_for_new_transactions(&mut tx_hashes_receiver).await; + assert_eq!(tx_hashes, [transaction_hash]); + // Transaction must not be added to the pool because of its outdated nonce. + assert_eq!(mempool.stats().l2_transaction_count, 0); + + stop_sender.send_replace(true); + fetcher_task.await.unwrap().expect("fetcher errored"); + } +} diff --git a/core/lib/zksync_core/src/state_keeper/metrics.rs b/core/lib/zksync_core/src/state_keeper/metrics.rs index aa65c16f7de1..181447e9761b 100644 --- a/core/lib/zksync_core/src/state_keeper/metrics.rs +++ b/core/lib/zksync_core/src/state_keeper/metrics.rs @@ -244,7 +244,6 @@ pub(super) enum MiniblockSealStage { InsertStorageLogs, ApplyStorageLogs, InsertFactoryDeps, - ExtractContractsDeployed, ExtractAddedTokens, InsertTokens, ExtractEvents, diff --git a/core/lib/zksync_core/src/state_keeper/tests/mod.rs b/core/lib/zksync_core/src/state_keeper/tests/mod.rs index bfcfc524ffa0..70511b45b737 100644 --- a/core/lib/zksync_core/src/state_keeper/tests/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/tests/mod.rs @@ -122,7 +122,7 @@ pub(super) fn create_updates_manager() -> UpdatesManager { ) } -pub(super) fn create_transaction(fee_per_gas: u64, gas_per_pubdata: u32) -> Transaction { +pub(super) fn create_transaction(fee_per_gas: u64, gas_per_pubdata: u64) -> Transaction { create_l2_transaction(fee_per_gas, gas_per_pubdata).into() } diff --git a/core/lib/zksync_core/src/state_keeper/types.rs b/core/lib/zksync_core/src/state_keeper/types.rs index 2dbce8dd2076..731f22df9272 100644 --- a/core/lib/zksync_core/src/state_keeper/types.rs +++ b/core/lib/zksync_core/src/state_keeper/types.rs @@ -56,6 +56,14 @@ impl MempoolGuard { .get_mempool_info() } + #[cfg(test)] + pub fn stats(&self) -> zksync_mempool::MempoolStats { + self.0 + .lock() + .expect("failed to acquire mempool lock") + .stats() + } + pub fn register_metrics(&self) { StateKeeperGauges::register(Arc::downgrade(&self.0)); } diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index c800e83703f8..701e16188adb 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -190,7 +190,7 @@ impl ExternalIO { .access_storage_tagged("sync_layer") .await .unwrap() - .storage_dal() + .factory_deps_dal() .get_factory_dep(hash) .await; @@ -210,7 +210,7 @@ impl ExternalIO { .access_storage_tagged("sync_layer") .await .unwrap() - .storage_dal() + .factory_deps_dal() .insert_factory_deps( self.current_miniblock_number, &HashMap::from_iter([(contract.hash, be_words_to_bytes(&contract.code))]), diff --git a/core/lib/zksync_core/src/utils/testonly.rs b/core/lib/zksync_core/src/utils/testonly.rs index 3684e2af9091..625ee9c693b3 100644 --- a/core/lib/zksync_core/src/utils/testonly.rs +++ b/core/lib/zksync_core/src/utils/testonly.rs @@ -9,7 +9,7 @@ use zksync_types::{ block::{L1BatchHeader, MiniblockHeader}, commitment::{L1BatchMetaParameters, L1BatchMetadata}, fee::Fee, - fee_model::BatchFeeInput, + fee_model::{BatchFeeInput, FeeParams}, l2::L2Tx, snapshots::SnapshotRecoveryStatus, transaction_request::PaymasterParams, @@ -17,7 +17,7 @@ use zksync_types::{ StorageLog, H256, U256, }; -use crate::l1_gas_price::L1GasPriceProvider; +use crate::{fee_model::BatchFeeModelInputProvider, l1_gas_price::L1GasPriceProvider}; /// Creates a miniblock header with the specified number and deterministic contents. pub(crate) fn create_miniblock(number: u32) -> MiniblockHeader { @@ -73,7 +73,7 @@ pub(crate) fn create_l1_batch_metadata(number: u32) -> L1BatchMetadata { } /// Creates an L2 transaction with randomized parameters. -pub(crate) fn create_l2_transaction(fee_per_gas: u64, gas_per_pubdata: u32) -> L2Tx { +pub(crate) fn create_l2_transaction(fee_per_gas: u64, gas_per_pubdata: u64) -> L2Tx { let fee = Fee { gas_limit: 1000_u64.into(), max_fee_per_gas: fee_per_gas.into(), @@ -141,10 +141,6 @@ pub(crate) async fn prepare_recovery_snapshot( .storage_logs_dal() .insert_storage_logs(miniblock.number, &[(H256::zero(), snapshot_logs.to_vec())]) .await; - storage - .storage_dal() - .apply_storage_logs(&[(H256::zero(), snapshot_logs.to_vec())]) - .await; let snapshot_recovery = SnapshotRecoveryStatus { l1_batch_number: l1_batch.number, @@ -200,3 +196,19 @@ impl L1GasPriceProvider for MockL1GasPriceProvider { self.0 * u64::from(zksync_system_constants::L1_GAS_PER_PUBDATA_BYTE) } } + +/// Mock [`BatchFeeModelInputProvider`] implementation that returns a constant value. +#[derive(Debug)] +pub(crate) struct MockBatchFeeParamsProvider(pub FeeParams); + +impl Default for MockBatchFeeParamsProvider { + fn default() -> Self { + Self(FeeParams::sensible_v1_default()) + } +} + +impl BatchFeeModelInputProvider for MockBatchFeeParamsProvider { + fn get_fee_model_params(&self) -> FeeParams { + self.0 + } +} diff --git a/prover/witness_generator/src/basic_circuits.rs b/prover/witness_generator/src/basic_circuits.rs index 6d7d15b78613..85f16c284491 100644 --- a/prover/witness_generator/src/basic_circuits.rs +++ b/prover/witness_generator/src/basic_circuits.rs @@ -509,13 +509,13 @@ async fn generate_witness( .unwrap(); let bootloader_code_bytes = connection - .storage_dal() + .factory_deps_dal() .get_factory_dep(header.base_system_contracts_hashes.bootloader) .await .expect("Bootloader bytecode should exist"); let bootloader_code = bytes_to_chunks(&bootloader_code_bytes); let account_bytecode_bytes = connection - .storage_dal() + .factory_deps_dal() .get_factory_dep(header.base_system_contracts_hashes.default_aa) .await .expect("Default aa bytecode should exist"); @@ -539,7 +539,10 @@ async fn generate_witness( .unwrap() .unwrap(); - let mut used_bytecodes = connection.storage_dal().get_factory_deps(&hashes).await; + let mut used_bytecodes = connection + .factory_deps_dal() + .get_factory_deps(&hashes) + .await; if input.used_bytecodes_hashes.contains(&account_code_hash) { used_bytecodes.insert(account_code_hash, account_bytecode); }