From 38e1f722f689f3558ee8bd1d259134239970928a Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Wed, 18 Dec 2024 20:55:31 +0900 Subject: [PATCH 01/17] change logic to create l2 block only when transaction is received --- core/node/state_keeper/src/keeper.rs | 41 +++++++++++++++++----------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index c892fd8534ec..f9e50466354a 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -72,6 +72,7 @@ pub struct ZkSyncStateKeeper { sealer: Arc, storage_factory: Arc, health_updater: HealthUpdater, + should_create_l2_block: bool, } impl ZkSyncStateKeeper { @@ -89,6 +90,7 @@ impl ZkSyncStateKeeper { sealer, storage_factory, health_updater: ReactiveHealthCheck::new("state_keeper").1, + should_create_l2_block: false, } } @@ -187,7 +189,10 @@ impl ZkSyncStateKeeper { // Finish current batch. if !updates_manager.l2_block.executed_transactions.is_empty() { - self.seal_l2_block(&updates_manager).await?; + if !self.should_create_l2_block { + // l2 block has been already sealed + self.seal_l2_block(&updates_manager).await?; + } // We've sealed the L2 block that we had, but we still need to set up the timestamp // for the fictive L2 block. let new_l2_block_params = self @@ -199,6 +204,7 @@ impl ZkSyncStateKeeper { &mut *batch_executor, ) .await?; + self.should_create_l2_block = false; } let (finished_batch, _) = batch_executor.finish_batch().await?; @@ -585,14 +591,30 @@ impl ZkSyncStateKeeper { return Ok(()); } - if self.io.should_seal_l2_block(updates_manager) { + if !self.should_create_l2_block && self.io.should_seal_l2_block(updates_manager) { tracing::debug!( "L2 block #{} (L1 batch #{}) should be sealed as per sealing rules", updates_manager.l2_block.number, updates_manager.l1_batch.number ); self.seal_l2_block(updates_manager).await?; + self.should_create_l2_block = true; + } + let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); + let Some(tx) = self + .io + .wait_for_next_tx(POLL_WAIT_DURATION, updates_manager.l2_block.timestamp) + .instrument(info_span!("wait_for_next_tx")) + .await + .context("error waiting for next transaction")? + else { + waiting_latency.observe(); + continue; + }; + waiting_latency.observe(); + let tx_hash = tx.hash(); + if self.should_create_l2_block { let new_l2_block_params = self .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await @@ -605,22 +627,9 @@ impl ZkSyncStateKeeper { ); Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor) .await?; + self.should_create_l2_block = false; } - let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); - let Some(tx) = self - .io - .wait_for_next_tx(POLL_WAIT_DURATION, updates_manager.l2_block.timestamp) - .instrument(info_span!("wait_for_next_tx")) - .await - .context("error waiting for next transaction")? - else { - waiting_latency.observe(); - tracing::trace!("No new transactions. Waiting!"); - continue; - }; - waiting_latency.observe(); - let tx_hash = tx.hash(); let (seal_resolution, exec_result) = self .process_one_tx(batch_executor, updates_manager, tx.clone()) .await?; From 5af005b071b43cb096d3007d47977aaadad80b94 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Mon, 13 Jan 2025 20:32:34 +0900 Subject: [PATCH 02/17] make flag local --- core/node/state_keeper/src/keeper.rs | 34 ++++++++++++++++------------ 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index f9e50466354a..f961ba57fa3b 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -72,7 +72,6 @@ pub struct ZkSyncStateKeeper { sealer: Arc, storage_factory: Arc, health_updater: HealthUpdater, - should_create_l2_block: bool, } impl ZkSyncStateKeeper { @@ -90,7 +89,6 @@ impl ZkSyncStateKeeper { sealer, storage_factory, health_updater: ReactiveHealthCheck::new("state_keeper").1, - should_create_l2_block: false, } } @@ -187,14 +185,9 @@ impl ZkSyncStateKeeper { ) .await?; - // Finish current batch. + // Finish current batch with an empty block. if !updates_manager.l2_block.executed_transactions.is_empty() { - if !self.should_create_l2_block { - // l2 block has been already sealed - self.seal_l2_block(&updates_manager).await?; - } - // We've sealed the L2 block that we had, but we still need to set up the timestamp - // for the fictive L2 block. + // We need to set up the timestamp for the fictive L2 block. let new_l2_block_params = self .wait_for_new_l2_block_params(&updates_manager, &stop_receiver) .await?; @@ -204,7 +197,6 @@ impl ZkSyncStateKeeper { &mut *batch_executor, ) .await?; - self.should_create_l2_block = false; } let (finished_batch, _) = batch_executor.finish_batch().await?; @@ -572,6 +564,7 @@ impl ZkSyncStateKeeper { protocol_upgrade_tx: Option, stop_receiver: &watch::Receiver, ) -> Result<(), Error> { + let mut should_create_l2_block = false; if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) .await?; @@ -588,17 +581,24 @@ impl ZkSyncStateKeeper { "L1 batch #{} should be sealed unconditionally as per sealing rules", updates_manager.l1_batch.number ); + + // check if there is an open "non sealed" block, if yes seal it and return + if !updates_manager.l2_block.executed_transactions.is_empty() + && !should_create_l2_block + { + self.seal_l2_block(updates_manager).await?; + } return Ok(()); } - if !self.should_create_l2_block && self.io.should_seal_l2_block(updates_manager) { + if !should_create_l2_block && self.io.should_seal_l2_block(updates_manager) { tracing::debug!( "L2 block #{} (L1 batch #{}) should be sealed as per sealing rules", updates_manager.l2_block.number, updates_manager.l1_batch.number ); self.seal_l2_block(updates_manager).await?; - self.should_create_l2_block = true; + should_create_l2_block = true; } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); let Some(tx) = self @@ -614,7 +614,7 @@ impl ZkSyncStateKeeper { waiting_latency.observe(); let tx_hash = tx.hash(); - if self.should_create_l2_block { + if should_create_l2_block { let new_l2_block_params = self .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await @@ -627,7 +627,7 @@ impl ZkSyncStateKeeper { ); Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor) .await?; - self.should_create_l2_block = false; + should_create_l2_block = false; } let (seal_resolution, exec_result) = self @@ -683,6 +683,12 @@ impl ZkSyncStateKeeper { transaction {tx_hash}", updates_manager.l1_batch.number ); + // check if there is an open "non sealed" block, if yes seal it and return + if !updates_manager.l2_block.executed_transactions.is_empty() + && !should_create_l2_block + { + self.seal_l2_block(updates_manager).await?; + } full_latency.observe(); return Ok(()); } From 06c1ca4aacfa656576d3d08fbc88c81bc9575ec3 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Mon, 13 Jan 2025 23:38:50 +0900 Subject: [PATCH 03/17] renamme local variable --- core/node/state_keeper/src/keeper.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index f961ba57fa3b..28ff90a48d90 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -564,7 +564,7 @@ impl ZkSyncStateKeeper { protocol_upgrade_tx: Option, stop_receiver: &watch::Receiver, ) -> Result<(), Error> { - let mut should_create_l2_block = false; + let mut is_sealed = false; if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) .await?; @@ -583,22 +583,20 @@ impl ZkSyncStateKeeper { ); // check if there is an open "non sealed" block, if yes seal it and return - if !updates_manager.l2_block.executed_transactions.is_empty() - && !should_create_l2_block - { + if !updates_manager.l2_block.executed_transactions.is_empty() && !is_sealed { self.seal_l2_block(updates_manager).await?; } return Ok(()); } - if !should_create_l2_block && self.io.should_seal_l2_block(updates_manager) { + if !is_sealed && self.io.should_seal_l2_block(updates_manager) { tracing::debug!( "L2 block #{} (L1 batch #{}) should be sealed as per sealing rules", updates_manager.l2_block.number, updates_manager.l1_batch.number ); self.seal_l2_block(updates_manager).await?; - should_create_l2_block = true; + is_sealed = true; } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); let Some(tx) = self @@ -614,7 +612,8 @@ impl ZkSyncStateKeeper { waiting_latency.observe(); let tx_hash = tx.hash(); - if should_create_l2_block { + // if the current block is sealed, we need to start a new block + if is_sealed { let new_l2_block_params = self .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await @@ -627,7 +626,7 @@ impl ZkSyncStateKeeper { ); Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor) .await?; - should_create_l2_block = false; + is_sealed = false; } let (seal_resolution, exec_result) = self @@ -684,9 +683,7 @@ impl ZkSyncStateKeeper { updates_manager.l1_batch.number ); // check if there is an open "non sealed" block, if yes seal it and return - if !updates_manager.l2_block.executed_transactions.is_empty() - && !should_create_l2_block - { + if !updates_manager.l2_block.executed_transactions.is_empty() && !is_sealed { self.seal_l2_block(updates_manager).await?; } full_latency.observe(); From 8e9752dceabde97b78b45575e20cd40f1dc537d1 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Thu, 16 Jan 2025 21:15:11 +0900 Subject: [PATCH 04/17] fix based on comments --- core/node/node_sync/src/external_io.rs | 57 ++++++++++++++++++-------- core/node/state_keeper/src/keeper.rs | 50 +++++++++++----------- 2 files changed, 65 insertions(+), 42 deletions(-) diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index eb79965fa28b..c7c993d6acdf 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, VecDeque}, + time::Duration, +}; use anyhow::Context as _; use async_trait::async_trait; @@ -40,6 +43,7 @@ pub struct ExternalIO { actions: ActionQueue, main_node_client: Box, chain_id: L2ChainId, + pending_l2_block_actions: VecDeque<(L2BlockNumber, L2BlockParams)>, } impl ExternalIO { @@ -56,6 +60,7 @@ impl ExternalIO { actions, main_node_client, chain_id, + pending_l2_block_actions: VecDeque::new(), }) } @@ -337,23 +342,33 @@ impl StateKeeperIO for ExternalIO { cursor: &IoCursor, max_wait: Duration, ) -> anyhow::Result> { - // Wait for the next L2 block to appear in the queue. - let Some(action) = self.actions.recv_action(max_wait).await else { - return Ok(None); - }; - match action { - SyncAction::L2Block { params, number } => { - anyhow::ensure!( - number == cursor.next_l2_block, - "L2 block number mismatch: expected {}, got {number}", - cursor.next_l2_block - ); - return Ok(Some(params)); - } - other => { - anyhow::bail!( + // Check if there is a pending l2 block action while waiting for the next tx, if yes process it + if let Some((number, params)) = self.pending_l2_block_actions.pop_front() { + anyhow::ensure!( + number == cursor.next_l2_block, + "L2 block number mismatch: expected {}, got {number}", + cursor.next_l2_block + ); + return Ok(Some(params)); + } else { + // Alternatively, wait for the next L2 block to appear in the queue. + let Some(action) = self.actions.recv_action(max_wait).await else { + return Ok(None); + }; + match action { + SyncAction::L2Block { params, number } => { + anyhow::ensure!( + number == cursor.next_l2_block, + "L2 block number mismatch: expected {}, got {number}", + cursor.next_l2_block + ); + return Ok(Some(params)); + } + other => { + anyhow::bail!( "Unexpected action in the queue while waiting for the next L2 block: {other:?}" ); + } } } } @@ -370,11 +385,21 @@ impl StateKeeperIO for ExternalIO { let Some(action) = self.actions.peek_action_async(max_wait).await else { return Ok(None); }; + match action { SyncAction::Tx(tx) => { self.actions.pop_action().unwrap(); return Ok(Some(Transaction::from(*tx))); } + SyncAction::L2Block { params, number } => { + // Store any L2block action for future wait_for_new_l2_block_params call + // https://github.com/matter-labs/zksync-era/pull/3398 + self.actions.pop_action().unwrap(); + self.pending_l2_block_actions.push_back((number, params)); + + // Move to the next action + self.wait_for_next_tx(max_wait, _l2_block_timestamp).await + } SyncAction::SealL2Block | SyncAction::SealBatch => { // No more transactions in the current L2 block; the state keeper should seal it. return Ok(None); diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 28ff90a48d90..c1f3f1cd0faa 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -166,13 +166,13 @@ impl ZkSyncStateKeeper { &stop_receiver, ) .await?; - self.restore_state( - &mut *batch_executor, - &mut updates_manager, - pending_l2_blocks, - &stop_receiver, - ) - .await?; + let mut state_restored = self + .restore_state( + &mut *batch_executor, + &mut updates_manager, + pending_l2_blocks, + ) + .await?; let mut l1_batch_seal_delta: Option = None; while !is_canceled(&stop_receiver) { @@ -181,6 +181,7 @@ impl ZkSyncStateKeeper { &mut *batch_executor, &mut updates_manager, protocol_upgrade_tx, + state_restored, &stop_receiver, ) .await?; @@ -234,6 +235,7 @@ impl ZkSyncStateKeeper { } else { None }; + state_restored = false; } Err(Error::Canceled) } @@ -461,10 +463,9 @@ impl ZkSyncStateKeeper { batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, l2_blocks_to_reexecute: Vec, - stop_receiver: &watch::Receiver, - ) -> Result<(), Error> { + ) -> Result { if l2_blocks_to_reexecute.is_empty() { - return Ok(()); + return Ok(false); } for (index, l2_block) in l2_blocks_to_reexecute.into_iter().enumerate() { @@ -542,15 +543,7 @@ impl ZkSyncStateKeeper { tracing::debug!( "All the transactions from the pending state were re-executed successfully" ); - - // We've processed all the L2 blocks, and right now we're initializing the next *actual* L2 block. - let new_l2_block_params = self - .wait_for_new_l2_block_params(updates_manager, stop_receiver) - .await - .map_err(|e| e.context("wait_for_new_l2_block_params"))?; - Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor).await?; - - Ok(()) + Ok(true) } #[tracing::instrument( @@ -562,9 +555,10 @@ impl ZkSyncStateKeeper { batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, protocol_upgrade_tx: Option, + state_restored: bool, stop_receiver: &watch::Receiver, ) -> Result<(), Error> { - let mut is_sealed = false; + let mut is_last_block_sealed = state_restored; if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) .await?; @@ -583,20 +577,22 @@ impl ZkSyncStateKeeper { ); // check if there is an open "non sealed" block, if yes seal it and return - if !updates_manager.l2_block.executed_transactions.is_empty() && !is_sealed { + if !updates_manager.l2_block.executed_transactions.is_empty() + && !is_last_block_sealed + { self.seal_l2_block(updates_manager).await?; } return Ok(()); } - if !is_sealed && self.io.should_seal_l2_block(updates_manager) { + if !is_last_block_sealed && self.io.should_seal_l2_block(updates_manager) { tracing::debug!( "L2 block #{} (L1 batch #{}) should be sealed as per sealing rules", updates_manager.l2_block.number, updates_manager.l1_batch.number ); self.seal_l2_block(updates_manager).await?; - is_sealed = true; + is_last_block_sealed = true; } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); let Some(tx) = self @@ -613,7 +609,7 @@ impl ZkSyncStateKeeper { let tx_hash = tx.hash(); // if the current block is sealed, we need to start a new block - if is_sealed { + if is_last_block_sealed { let new_l2_block_params = self .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await @@ -626,7 +622,7 @@ impl ZkSyncStateKeeper { ); Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor) .await?; - is_sealed = false; + is_last_block_sealed = false; } let (seal_resolution, exec_result) = self @@ -683,7 +679,9 @@ impl ZkSyncStateKeeper { updates_manager.l1_batch.number ); // check if there is an open "non sealed" block, if yes seal it and return - if !updates_manager.l2_block.executed_transactions.is_empty() && !is_sealed { + if !updates_manager.l2_block.executed_transactions.is_empty() + && !is_last_block_sealed + { self.seal_l2_block(updates_manager).await?; } full_latency.observe(); From 47f249eafbd5a48fd48de57acc4cfb1932b8bf3b Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Wed, 22 Jan 2025 12:57:12 +0900 Subject: [PATCH 05/17] fix based on comments --- core/node/node_sync/src/external_io.rs | 64 +++------ core/node/state_keeper/src/io/mempool.rs | 11 ++ core/node/state_keeper/src/io/mod.rs | 3 + core/node/state_keeper/src/io/persistence.rs | 12 +- core/node/state_keeper/src/keeper.rs | 133 +++++++++++++----- .../src/testonly/test_batch_executor.rs | 7 + core/node/state_keeper/src/updates/mod.rs | 22 ++- 7 files changed, 164 insertions(+), 88 deletions(-) diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index c7c993d6acdf..71049c189e46 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, VecDeque}, - time::Duration, -}; +use std::{collections::HashMap, time::Duration}; use anyhow::Context as _; use async_trait::async_trait; @@ -43,7 +40,7 @@ pub struct ExternalIO { actions: ActionQueue, main_node_client: Box, chain_id: L2ChainId, - pending_l2_block_actions: VecDeque<(L2BlockNumber, L2BlockParams)>, + pub next_l2_block_param: L2BlockParams, } impl ExternalIO { @@ -60,7 +57,7 @@ impl ExternalIO { actions, main_node_client, chain_id, - pending_l2_block_actions: VecDeque::new(), + next_l2_block_param: L2BlockParams::default(), }) } @@ -342,37 +339,32 @@ impl StateKeeperIO for ExternalIO { cursor: &IoCursor, max_wait: Duration, ) -> anyhow::Result> { - // Check if there is a pending l2 block action while waiting for the next tx, if yes process it - if let Some((number, params)) = self.pending_l2_block_actions.pop_front() { - anyhow::ensure!( - number == cursor.next_l2_block, - "L2 block number mismatch: expected {}, got {number}", - cursor.next_l2_block - ); - return Ok(Some(params)); - } else { - // Alternatively, wait for the next L2 block to appear in the queue. - let Some(action) = self.actions.recv_action(max_wait).await else { - return Ok(None); - }; - match action { - SyncAction::L2Block { params, number } => { - anyhow::ensure!( - number == cursor.next_l2_block, - "L2 block number mismatch: expected {}, got {number}", - cursor.next_l2_block - ); - return Ok(Some(params)); - } - other => { - anyhow::bail!( + // Wait for the next L2 block to appear in the queue. + let Some(action) = self.actions.recv_action(max_wait).await else { + return Ok(None); + }; + match action { + SyncAction::L2Block { params, number } => { + anyhow::ensure!( + number == cursor.next_l2_block, + "L2 block number mismatch: expected {}, got {number}", + cursor.next_l2_block + ); + self.next_l2_block_param = params; + return Ok(Some(params)); + } + other => { + anyhow::bail!( "Unexpected action in the queue while waiting for the next L2 block: {other:?}" ); - } } } } + async fn get_updated_l2_block_params(&mut self) -> anyhow::Result> { + Ok(Some(self.next_l2_block_param)) + } + async fn wait_for_next_tx( &mut self, max_wait: Duration, @@ -385,21 +377,11 @@ impl StateKeeperIO for ExternalIO { let Some(action) = self.actions.peek_action_async(max_wait).await else { return Ok(None); }; - match action { SyncAction::Tx(tx) => { self.actions.pop_action().unwrap(); return Ok(Some(Transaction::from(*tx))); } - SyncAction::L2Block { params, number } => { - // Store any L2block action for future wait_for_new_l2_block_params call - // https://github.com/matter-labs/zksync-era/pull/3398 - self.actions.pop_action().unwrap(); - self.pending_l2_block_actions.push_back((number, params)); - - // Move to the next action - self.wait_for_next_tx(max_wait, _l2_block_timestamp).await - } SyncAction::SealL2Block | SyncAction::SealBatch => { // No more transactions in the current L2 block; the state keeper should seal it. return Ok(None); diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index f553fcb57a08..b8953f4bc1e9 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -290,6 +290,17 @@ impl StateKeeperIO for MempoolIO { })) } + async fn get_updated_l2_block_params(&mut self) -> anyhow::Result> { + let current_timestamp_millis = millis_since_epoch(); + let current_timestamp = (current_timestamp_millis / 1_000) as u64; + + Ok(Some(L2BlockParams { + timestamp: current_timestamp, + // This value is effectively ignored by the protocol. + virtual_blocks: 1, + })) + } + async fn wait_for_next_tx( &mut self, max_wait: Duration, diff --git a/core/node/state_keeper/src/io/mod.rs b/core/node/state_keeper/src/io/mod.rs index fbc481fb678d..ca347cbd6986 100644 --- a/core/node/state_keeper/src/io/mod.rs +++ b/core/node/state_keeper/src/io/mod.rs @@ -135,6 +135,9 @@ pub trait StateKeeperIO: 'static + Send + Sync + fmt::Debug + IoSealCriteria { max_wait: Duration, ) -> anyhow::Result>; + /// Get the updated parameters for the next L2 block. + async fn get_updated_l2_block_params(&mut self) -> anyhow::Result>; + /// Blocks for up to `max_wait` until the next transaction is available for execution. /// Returns `None` if no transaction became available until the timeout. async fn wait_for_next_tx( diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index 8db7fe4120ed..97f55f03b720 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -512,10 +512,11 @@ mod tests { vec![], ); output_handler.handle_l2_block(&updates).await.unwrap(); - updates.push_l2_block(L2BlockParams { + updates.update_next_l2_block_parameters(L2BlockParams { timestamp: 1, virtual_blocks: 1, }); + updates.push_l2_block(); let mut batch_result = FinishedL1Batch::mock(); batch_result.final_execution_state.deduplicated_storage_logs = @@ -623,10 +624,11 @@ mod tests { persistence.submit_l2_block(seal_command).await; // The second command should lead to blocking - updates_manager.push_l2_block(L2BlockParams { + updates_manager.update_next_l2_block_parameters(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); + updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); { let submit_future = persistence.submit_l2_block(seal_command); @@ -651,10 +653,11 @@ mod tests { // Check that `wait_for_all_commands()` state is reset after use. persistence.wait_for_all_commands().await; - updates_manager.push_l2_block(L2BlockParams { + updates_manager.update_next_l2_block_parameters(L2BlockParams { timestamp: 3, virtual_blocks: 1, }); + updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); persistence.submit_l2_block(seal_command).await; let command = sealer.commands_receiver.recv().await.unwrap(); @@ -675,10 +678,11 @@ mod tests { for i in 1..=5 { let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); - updates_manager.push_l2_block(L2BlockParams { + updates_manager.update_next_l2_block_parameters(L2BlockParams { timestamp: i, virtual_blocks: 1, }); + updates_manager.push_l2_block(); persistence.submit_l2_block(seal_command).await; } diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index c1f3f1cd0faa..a5fdbf1165b2 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -171,6 +171,7 @@ impl ZkSyncStateKeeper { &mut *batch_executor, &mut updates_manager, pending_l2_blocks, + &stop_receiver, ) .await?; @@ -188,16 +189,12 @@ impl ZkSyncStateKeeper { // Finish current batch with an empty block. if !updates_manager.l2_block.executed_transactions.is_empty() { - // We need to set up the timestamp for the fictive L2 block. - let new_l2_block_params = self - .wait_for_new_l2_block_params(&updates_manager, &stop_receiver) + self.seal_l2_block(&updates_manager).await?; + // We've sealed the L2 block that we had, but we still need to set up the timestamp + // for the fictive L2 block. + self.wait_for_new_l2_block_params(&mut updates_manager, &stop_receiver) .await?; - Self::start_next_l2_block( - new_l2_block_params, - &mut updates_manager, - &mut *batch_executor, - ) - .await?; + Self::start_next_l2_block(&mut updates_manager, &mut *batch_executor).await?; } let (finished_batch, _) = batch_executor.finish_batch().await?; @@ -386,9 +383,9 @@ impl ZkSyncStateKeeper { )] async fn wait_for_new_l2_block_params( &mut self, - updates: &UpdatesManager, + updates: &mut UpdatesManager, stop_receiver: &watch::Receiver, - ) -> Result { + ) -> Result<(), Error> { let latency = KEEPER_METRICS.wait_for_l2_block_params.start(); let cursor = updates.io_cursor(); while !is_canceled(stop_receiver) { @@ -400,14 +397,54 @@ impl ZkSyncStateKeeper { { self.health_updater .update(StateKeeperHealthDetails::from(&cursor).into()); - + updates.update_next_l2_block_parameters(params); latency.observe(); - return Ok(params); + return Ok(()); + } + } + Err(Error::Canceled) + } + + #[tracing::instrument( + skip_all, + fields( + l1_batch = %updates.l1_batch.number, + l2_block = %updates.l2_block.number, + ) + )] + async fn wait_for_updated_l2_block_params( + &mut self, + updates: &mut UpdatesManager, + stop_receiver: &watch::Receiver, + ) -> Result<(), Error> { + while !is_canceled(stop_receiver) { + if let Some(params) = self + .io + .get_updated_l2_block_params() + .await + .context("error getting the updated L2 block params")? + { + updates.update_next_l2_block_parameters(params); + return Ok(()); } } Err(Error::Canceled) } + #[tracing::instrument( + skip_all, + fields( + l1_batch = %updates_manager.l1_batch.number, + l2_block = %updates_manager.l2_block.number, + ) + )] + async fn set_next_l2_block_parameters( + updates_manager: &mut UpdatesManager, + l2_block_param: L2BlockParams, + ) { + updates_manager.update_next_l2_block_parameters(l2_block_param); + } + #[tracing::instrument( skip_all, fields( @@ -416,11 +453,10 @@ impl ZkSyncStateKeeper { ) )] async fn start_next_l2_block( - params: L2BlockParams, updates_manager: &mut UpdatesManager, batch_executor: &mut dyn BatchExecutor, ) -> anyhow::Result<()> { - updates_manager.push_l2_block(params); + updates_manager.push_l2_block(); let block_env = updates_manager.l2_block.get_env(); batch_executor .start_next_l2_block(block_env) @@ -463,6 +499,7 @@ impl ZkSyncStateKeeper { batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, l2_blocks_to_reexecute: Vec, + stop_receiver: &watch::Receiver, ) -> Result { if l2_blocks_to_reexecute.is_empty() { return Ok(false); @@ -471,15 +508,15 @@ impl ZkSyncStateKeeper { for (index, l2_block) in l2_blocks_to_reexecute.into_iter().enumerate() { // Push any non-first L2 block to updates manager. The first one was pushed when `updates_manager` was initialized. if index > 0 { - Self::start_next_l2_block( + Self::set_next_l2_block_parameters( + updates_manager, L2BlockParams { timestamp: l2_block.timestamp, virtual_blocks: l2_block.virtual_blocks, }, - updates_manager, - batch_executor, ) - .await?; + .await; + Self::start_next_l2_block(updates_manager, batch_executor).await?; } let l2_block_number = l2_block.number; @@ -543,6 +580,11 @@ impl ZkSyncStateKeeper { tracing::debug!( "All the transactions from the pending state were re-executed successfully" ); + + // We've processed all the L2 blocks, and right now we're preparing the next *actual* L2 block. + self.wait_for_new_l2_block_params(updates_manager, stop_receiver) + .await + .map_err(|e| e.context("wait_for_new_l2_block_params"))?; Ok(true) } @@ -576,16 +618,23 @@ impl ZkSyncStateKeeper { updates_manager.l1_batch.number ); - // check if there is an open "non sealed" block, if yes seal it and return - if !updates_manager.l2_block.executed_transactions.is_empty() - && !is_last_block_sealed - { - self.seal_l2_block(updates_manager).await?; + // Push the current block if it has not been done yet + if is_last_block_sealed { + self.wait_for_updated_l2_block_params(updates_manager, stop_receiver) + .await + .map_err(|e| e.context("wait_for_updated_l2_block_params"))?; + tracing::debug!( + "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", + updates_manager.l2_block.number + 1, + updates_manager.l1_batch.number, + display_timestamp(updates_manager.next_l2_block_timestamp()) + ); + Self::start_next_l2_block(updates_manager, batch_executor).await?; } return Ok(()); } - if !is_last_block_sealed && self.io.should_seal_l2_block(updates_manager) { + if self.io.should_seal_l2_block(updates_manager) && !is_last_block_sealed { tracing::debug!( "L2 block #{} (L1 batch #{}) should be sealed as per sealing rules", updates_manager.l2_block.number, @@ -593,35 +642,47 @@ impl ZkSyncStateKeeper { ); self.seal_l2_block(updates_manager).await?; is_last_block_sealed = true; + + // Get a tentative new l2 block parameters + self.wait_for_new_l2_block_params(updates_manager, stop_receiver) + .await + .map_err(|e| e.context("wait_for_new_l2_block_params"))?; } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); + + if is_last_block_sealed { + // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp + self.wait_for_updated_l2_block_params(updates_manager, stop_receiver) + .await + .map_err(|e| e.context("wait_for_updated_l2_block_params"))?; + } let Some(tx) = self .io - .wait_for_next_tx(POLL_WAIT_DURATION, updates_manager.l2_block.timestamp) + .wait_for_next_tx( + POLL_WAIT_DURATION, + updates_manager.next_l2_block_timestamp(), + ) .instrument(info_span!("wait_for_next_tx")) .await .context("error waiting for next transaction")? else { waiting_latency.observe(); + tracing::trace!("No new transactions. Waiting!"); continue; }; waiting_latency.observe(); + let tx_hash = tx.hash(); // if the current block is sealed, we need to start a new block if is_last_block_sealed { - let new_l2_block_params = self - .wait_for_new_l2_block_params(updates_manager, stop_receiver) - .await - .map_err(|e| e.context("wait_for_new_l2_block_params"))?; tracing::debug!( "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, updates_manager.l1_batch.number, - display_timestamp(new_l2_block_params.timestamp) + display_timestamp(updates_manager.next_l2_block_timestamp()) ); - Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor) - .await?; + Self::start_next_l2_block(updates_manager, batch_executor).await?; is_last_block_sealed = false; } @@ -678,12 +739,6 @@ impl ZkSyncStateKeeper { transaction {tx_hash}", updates_manager.l1_batch.number ); - // check if there is an open "non sealed" block, if yes seal it and return - if !updates_manager.l2_block.executed_transactions.is_empty() - && !is_last_block_sealed - { - self.seal_l2_block(updates_manager).await?; - } full_latency.observe(); return Ok(()); } diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index 9a675c7e97e8..77807b6f1e41 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -562,6 +562,7 @@ pub(crate) struct TestIO { protocol_version: ProtocolVersionId, previous_batch_protocol_version: ProtocolVersionId, protocol_upgrade_txs: HashMap, + pub next_l2_block_param: L2BlockParams, } impl fmt::Debug for TestIO { @@ -609,6 +610,7 @@ impl TestIO { protocol_version: ProtocolVersionId::latest(), previous_batch_protocol_version: ProtocolVersionId::latest(), protocol_upgrade_txs: HashMap::default(), + next_l2_block_param: L2BlockParams::default(), }; (this, OutputHandler::new(Box::new(persistence))) } @@ -717,9 +719,14 @@ impl StateKeeperIO for TestIO { }; self.l2_block_number += 1; self.timestamp += 1; + self.next_l2_block_param = params; Ok(Some(params)) } + async fn get_updated_l2_block_params(&mut self) -> anyhow::Result> { + Ok(Some(self.next_l2_block_param)) + } + async fn wait_for_next_tx( &mut self, max_wait: Duration, diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index b4f548527652..11b6adb555e2 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -39,6 +39,7 @@ pub struct UpdatesManager { pub l2_block: L2BlockUpdates, pub storage_writes_deduplicator: StorageWritesDeduplicator, pubdata_params: PubdataParams, + pub next_l2_block_param: L2BlockParams, } impl UpdatesManager { @@ -66,6 +67,10 @@ impl UpdatesManager { storage_writes_deduplicator: StorageWritesDeduplicator::new(), storage_view_cache: None, pubdata_params, + next_l2_block_param: L2BlockParams { + timestamp: l1_batch_env.first_l2_block.timestamp, + virtual_blocks: l1_batch_env.first_l2_block.max_virtual_blocks_to_create, + }, } } @@ -73,6 +78,10 @@ impl UpdatesManager { self.batch_timestamp } + pub(crate) fn next_l2_block_timestamp(&self) -> u64 { + self.next_l2_block_param.timestamp + } + pub fn base_system_contract_hashes(&self) -> BaseSystemContractsHashes { self.base_system_contract_hashes } @@ -167,12 +176,12 @@ impl UpdatesManager { /// Pushes a new L2 block with the specified timestamp into this manager. The previously /// held L2 block is considered sealed and is used to extend the L1 batch data. - pub fn push_l2_block(&mut self, l2_block_params: L2BlockParams) { + pub fn push_l2_block(&mut self) { let new_l2_block_updates = L2BlockUpdates::new( - l2_block_params.timestamp, + self.next_l2_block_param.timestamp, self.l2_block.number + 1, self.l2_block.get_l2_block_hash(), - l2_block_params.virtual_blocks, + self.next_l2_block_param.virtual_blocks, self.protocol_version, ); let old_l2_block_updates = std::mem::replace(&mut self.l2_block, new_l2_block_updates); @@ -180,6 +189,10 @@ impl UpdatesManager { .extend_from_sealed_l2_block(old_l2_block_updates); } + pub fn update_next_l2_block_parameters(&mut self, l2_block_param: L2BlockParams) { + self.next_l2_block_param = l2_block_param + } + pub(crate) fn pending_executed_transactions_len(&self) -> usize { self.l1_batch.executed_transactions.len() + self.l2_block.executed_transactions.len() } @@ -243,10 +256,11 @@ mod tests { assert_eq!(updates_manager.l1_batch.executed_transactions.len(), 0); // Seal an L2 block. - updates_manager.push_l2_block(L2BlockParams { + updates_manager.update_next_l2_block_parameters(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); + updates_manager.push_l2_block(); // Check that L1 batch updates are the same with the pending state // and L2 block updates are empty. From 16761cd6d8413b5209746d87977740c53cc7abd8 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Wed, 22 Jan 2025 13:46:36 +0900 Subject: [PATCH 06/17] rename keeper methods --- core/node/state_keeper/src/keeper.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 7311c55e08d5..8eadda786fa5 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -192,7 +192,7 @@ impl ZkSyncStateKeeper { self.seal_l2_block(&updates_manager).await?; // We've sealed the L2 block that we had, but we still need to set up the timestamp // for the fictive L2 block. - self.wait_for_new_l2_block_params(&mut updates_manager, &stop_receiver) + self.set_new_l2_block_params(&mut updates_manager, &stop_receiver) .await?; Self::start_next_l2_block(&mut updates_manager, &mut *batch_executor).await?; } @@ -381,7 +381,7 @@ impl ZkSyncStateKeeper { l2_block = %updates.l2_block.number, ) )] - async fn wait_for_new_l2_block_params( + async fn set_new_l2_block_params( &mut self, updates: &mut UpdatesManager, stop_receiver: &watch::Receiver, @@ -412,7 +412,7 @@ impl ZkSyncStateKeeper { l2_block = %updates.l2_block.number, ) )] - async fn wait_for_updated_l2_block_params( + async fn update_new_l2_block_params( &mut self, updates: &mut UpdatesManager, stop_receiver: &watch::Receiver, @@ -582,9 +582,9 @@ impl ZkSyncStateKeeper { ); // We've processed all the L2 blocks, and right now we're preparing the next *actual* L2 block. - self.wait_for_new_l2_block_params(updates_manager, stop_receiver) + self.set_new_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("wait_for_new_l2_block_params"))?; + .map_err(|e| e.context("set_new_l2_block_params"))?; Ok(true) } @@ -620,9 +620,9 @@ impl ZkSyncStateKeeper { // Push the current block if it has not been done yet if is_last_block_sealed { - self.wait_for_updated_l2_block_params(updates_manager, stop_receiver) + self.update_new_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("wait_for_updated_l2_block_params"))?; + .map_err(|e| e.context("update_new_l2_block_params"))?; tracing::debug!( "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, @@ -644,17 +644,17 @@ impl ZkSyncStateKeeper { is_last_block_sealed = true; // Get a tentative new l2 block parameters - self.wait_for_new_l2_block_params(updates_manager, stop_receiver) + self.set_new_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("wait_for_new_l2_block_params"))?; + .map_err(|e| e.context("set_new_l2_block_params"))?; } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); if is_last_block_sealed { // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp - self.wait_for_updated_l2_block_params(updates_manager, stop_receiver) + self.update_new_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("wait_for_updated_l2_block_params"))?; + .map_err(|e| e.context("update_new_l2_block_params"))?; } let Some(tx) = self .io From d50917cd9f7c52f60421ff6adc9c5ab90a763f81 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Wed, 29 Jan 2025 11:09:49 +0900 Subject: [PATCH 07/17] some renaming --- core/node/state_keeper/src/io/mempool.rs | 16 +++++----- core/node/state_keeper/src/io/persistence.rs | 8 ++--- core/node/state_keeper/src/keeper.rs | 32 ++++++++++---------- core/node/state_keeper/src/updates/mod.rs | 4 +-- 4 files changed, 31 insertions(+), 29 deletions(-) diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index b8953f4bc1e9..0b57bcb9d41e 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -59,6 +59,7 @@ pub struct MempoolIO { chain_id: L2ChainId, l2_da_validator_address: Option
, pubdata_type: PubdataType, + pub next_l2_block_param: L2BlockParams, } impl IoSealCriteria for MempoolIO { @@ -283,22 +284,22 @@ impl StateKeeperIO for MempoolIO { return Ok(None); }; - Ok(Some(L2BlockParams { + let params = L2BlockParams { timestamp, // This value is effectively ignored by the protocol. virtual_blocks: 1, - })) + }; + self.next_l2_block_param = params; + Ok(Some(params)) } async fn get_updated_l2_block_params(&mut self) -> anyhow::Result> { let current_timestamp_millis = millis_since_epoch(); let current_timestamp = (current_timestamp_millis / 1_000) as u64; - Ok(Some(L2BlockParams { - timestamp: current_timestamp, - // This value is effectively ignored by the protocol. - virtual_blocks: 1, - })) + let mut updated_params = self.next_l2_block_param; + updated_params.timestamp = current_timestamp; + Ok(Some(updated_params)) } async fn wait_for_next_tx( @@ -523,6 +524,7 @@ impl MempoolIO { chain_id, l2_da_validator_address, pubdata_type, + next_l2_block_param: L2BlockParams::default(), }) } diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index 97f55f03b720..423c1cd56654 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -512,7 +512,7 @@ mod tests { vec![], ); output_handler.handle_l2_block(&updates).await.unwrap(); - updates.update_next_l2_block_parameters(L2BlockParams { + updates.set_next_l2_block_parameters(L2BlockParams { timestamp: 1, virtual_blocks: 1, }); @@ -624,7 +624,7 @@ mod tests { persistence.submit_l2_block(seal_command).await; // The second command should lead to blocking - updates_manager.update_next_l2_block_parameters(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); @@ -653,7 +653,7 @@ mod tests { // Check that `wait_for_all_commands()` state is reset after use. persistence.wait_for_all_commands().await; - updates_manager.update_next_l2_block_parameters(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 3, virtual_blocks: 1, }); @@ -678,7 +678,7 @@ mod tests { for i in 1..=5 { let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); - updates_manager.update_next_l2_block_parameters(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: i, virtual_blocks: 1, }); diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 8eadda786fa5..f9d216d883ae 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -192,7 +192,7 @@ impl ZkSyncStateKeeper { self.seal_l2_block(&updates_manager).await?; // We've sealed the L2 block that we had, but we still need to set up the timestamp // for the fictive L2 block. - self.set_new_l2_block_params(&mut updates_manager, &stop_receiver) + self.get_and_set_new_l2_block_params(&mut updates_manager, &stop_receiver) .await?; Self::start_next_l2_block(&mut updates_manager, &mut *batch_executor).await?; } @@ -381,7 +381,7 @@ impl ZkSyncStateKeeper { l2_block = %updates.l2_block.number, ) )] - async fn set_new_l2_block_params( + async fn get_and_set_new_l2_block_params( &mut self, updates: &mut UpdatesManager, stop_receiver: &watch::Receiver, @@ -397,7 +397,7 @@ impl ZkSyncStateKeeper { { self.health_updater .update(StateKeeperHealthDetails::from(&cursor).into()); - updates.update_next_l2_block_parameters(params); + updates.set_next_l2_block_parameters(params); latency.observe(); return Ok(()); } @@ -412,7 +412,7 @@ impl ZkSyncStateKeeper { l2_block = %updates.l2_block.number, ) )] - async fn update_new_l2_block_params( + async fn update_l2_block_params( &mut self, updates: &mut UpdatesManager, stop_receiver: &watch::Receiver, @@ -424,7 +424,7 @@ impl ZkSyncStateKeeper { .await .context("error getting the updated L2 block params")? { - updates.update_next_l2_block_parameters(params); + updates.set_next_l2_block_parameters(params); return Ok(()); } } @@ -438,11 +438,11 @@ impl ZkSyncStateKeeper { l2_block = %updates_manager.l2_block.number, ) )] - async fn set_next_l2_block_parameters( + async fn set_l2_block_params( updates_manager: &mut UpdatesManager, l2_block_param: L2BlockParams, ) { - updates_manager.update_next_l2_block_parameters(l2_block_param); + updates_manager.set_next_l2_block_parameters(l2_block_param); } #[tracing::instrument( @@ -508,7 +508,7 @@ impl ZkSyncStateKeeper { for (index, l2_block) in l2_blocks_to_reexecute.into_iter().enumerate() { // Push any non-first L2 block to updates manager. The first one was pushed when `updates_manager` was initialized. if index > 0 { - Self::set_next_l2_block_parameters( + Self::set_l2_block_params( updates_manager, L2BlockParams { timestamp: l2_block.timestamp, @@ -582,9 +582,9 @@ impl ZkSyncStateKeeper { ); // We've processed all the L2 blocks, and right now we're preparing the next *actual* L2 block. - self.set_new_l2_block_params(updates_manager, stop_receiver) + self.get_and_set_new_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("set_new_l2_block_params"))?; + .map_err(|e| e.context("get_and_set_new_l2_block_params"))?; Ok(true) } @@ -620,9 +620,9 @@ impl ZkSyncStateKeeper { // Push the current block if it has not been done yet if is_last_block_sealed { - self.update_new_l2_block_params(updates_manager, stop_receiver) + self.update_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("update_new_l2_block_params"))?; + .map_err(|e| e.context("update_l2_block_params"))?; tracing::debug!( "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, @@ -643,8 +643,8 @@ impl ZkSyncStateKeeper { self.seal_l2_block(updates_manager).await?; is_last_block_sealed = true; - // Get a tentative new l2 block parameters - self.set_new_l2_block_params(updates_manager, stop_receiver) + // Set a tentative new l2 block parameters + self.get_and_set_new_l2_block_params(updates_manager, stop_receiver) .await .map_err(|e| e.context("set_new_l2_block_params"))?; } @@ -652,9 +652,9 @@ impl ZkSyncStateKeeper { if is_last_block_sealed { // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp - self.update_new_l2_block_params(updates_manager, stop_receiver) + self.update_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("update_new_l2_block_params"))?; + .map_err(|e| e.context("update_l2_block_params"))?; } let Some(tx) = self .io diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index 11b6adb555e2..26c221ebbaa5 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -189,7 +189,7 @@ impl UpdatesManager { .extend_from_sealed_l2_block(old_l2_block_updates); } - pub fn update_next_l2_block_parameters(&mut self, l2_block_param: L2BlockParams) { + pub fn set_next_l2_block_parameters(&mut self, l2_block_param: L2BlockParams) { self.next_l2_block_param = l2_block_param } @@ -256,7 +256,7 @@ mod tests { assert_eq!(updates_manager.l1_batch.executed_transactions.len(), 0); // Seal an L2 block. - updates_manager.update_next_l2_block_parameters(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); From d161b678b1d91444453fac3e02a88b351e8c1b6b Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Wed, 29 Jan 2025 21:35:18 +0900 Subject: [PATCH 08/17] change signature next_l2_block_param --- core/node/node_sync/src/external_io.rs | 4 +-- core/node/state_keeper/src/io/mempool.rs | 4 +-- core/node/state_keeper/src/io/mod.rs | 2 +- core/node/state_keeper/src/keeper.rs | 28 ++++--------------- .../src/testonly/test_batch_executor.rs | 4 +-- 5 files changed, 12 insertions(+), 30 deletions(-) diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 71049c189e46..5719da4769ea 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -361,8 +361,8 @@ impl StateKeeperIO for ExternalIO { } } - async fn get_updated_l2_block_params(&mut self) -> anyhow::Result> { - Ok(Some(self.next_l2_block_param)) + fn get_updated_l2_block_params(&mut self) -> L2BlockParams { + self.next_l2_block_param } async fn wait_for_next_tx( diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index 0b57bcb9d41e..aad300ae5b3c 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -293,13 +293,13 @@ impl StateKeeperIO for MempoolIO { Ok(Some(params)) } - async fn get_updated_l2_block_params(&mut self) -> anyhow::Result> { + fn get_updated_l2_block_params(&mut self) -> L2BlockParams { let current_timestamp_millis = millis_since_epoch(); let current_timestamp = (current_timestamp_millis / 1_000) as u64; let mut updated_params = self.next_l2_block_param; updated_params.timestamp = current_timestamp; - Ok(Some(updated_params)) + updated_params } async fn wait_for_next_tx( diff --git a/core/node/state_keeper/src/io/mod.rs b/core/node/state_keeper/src/io/mod.rs index ca347cbd6986..354a6d9afa3d 100644 --- a/core/node/state_keeper/src/io/mod.rs +++ b/core/node/state_keeper/src/io/mod.rs @@ -136,7 +136,7 @@ pub trait StateKeeperIO: 'static + Send + Sync + fmt::Debug + IoSealCriteria { ) -> anyhow::Result>; /// Get the updated parameters for the next L2 block. - async fn get_updated_l2_block_params(&mut self) -> anyhow::Result>; + fn get_updated_l2_block_params(&mut self) -> L2BlockParams; /// Blocks for up to `max_wait` until the next transaction is available for execution. /// Returns `None` if no transaction became available until the timeout. diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index f9d216d883ae..7a8140f7ba4b 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -412,23 +412,9 @@ impl ZkSyncStateKeeper { l2_block = %updates.l2_block.number, ) )] - async fn update_l2_block_params( - &mut self, - updates: &mut UpdatesManager, - stop_receiver: &watch::Receiver, - ) -> Result<(), Error> { - while !is_canceled(stop_receiver) { - if let Some(params) = self - .io - .get_updated_l2_block_params() - .await - .context("error getting the updated L2 block params")? - { - updates.set_next_l2_block_parameters(params); - return Ok(()); - } - } - Err(Error::Canceled) + fn update_l2_block_params(&mut self, updates: &mut UpdatesManager) -> () { + let updated_params = self.io.get_updated_l2_block_params(); + updates.set_next_l2_block_parameters(updated_params); } #[tracing::instrument( @@ -620,9 +606,7 @@ impl ZkSyncStateKeeper { // Push the current block if it has not been done yet if is_last_block_sealed { - self.update_l2_block_params(updates_manager, stop_receiver) - .await - .map_err(|e| e.context("update_l2_block_params"))?; + self.update_l2_block_params(updates_manager); tracing::debug!( "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, @@ -652,9 +636,7 @@ impl ZkSyncStateKeeper { if is_last_block_sealed { // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp - self.update_l2_block_params(updates_manager, stop_receiver) - .await - .map_err(|e| e.context("update_l2_block_params"))?; + self.update_l2_block_params(updates_manager); } let Some(tx) = self .io diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index 77807b6f1e41..ab1e6b57519b 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -723,8 +723,8 @@ impl StateKeeperIO for TestIO { Ok(Some(params)) } - async fn get_updated_l2_block_params(&mut self) -> anyhow::Result> { - Ok(Some(self.next_l2_block_param)) + fn get_updated_l2_block_params(&mut self) -> L2BlockParams { + self.next_l2_block_param } async fn wait_for_next_tx( From 4768ff193f12260e2321a3c32063c70e9c8f677d Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Wed, 29 Jan 2025 23:19:39 +0900 Subject: [PATCH 09/17] remove await and rename method --- core/node/state_keeper/src/keeper.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 7a8140f7ba4b..960eb49ed9ad 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -192,7 +192,7 @@ impl ZkSyncStateKeeper { self.seal_l2_block(&updates_manager).await?; // We've sealed the L2 block that we had, but we still need to set up the timestamp // for the fictive L2 block. - self.get_and_set_new_l2_block_params(&mut updates_manager, &stop_receiver) + self.wait_and_set_new_l2_block_params(&mut updates_manager, &stop_receiver) .await?; Self::start_next_l2_block(&mut updates_manager, &mut *batch_executor).await?; } @@ -381,7 +381,7 @@ impl ZkSyncStateKeeper { l2_block = %updates.l2_block.number, ) )] - async fn get_and_set_new_l2_block_params( + async fn wait_and_set_new_l2_block_params( &mut self, updates: &mut UpdatesManager, stop_receiver: &watch::Receiver, @@ -424,10 +424,7 @@ impl ZkSyncStateKeeper { l2_block = %updates_manager.l2_block.number, ) )] - async fn set_l2_block_params( - updates_manager: &mut UpdatesManager, - l2_block_param: L2BlockParams, - ) { + fn set_l2_block_params(updates_manager: &mut UpdatesManager, l2_block_param: L2BlockParams) { updates_manager.set_next_l2_block_parameters(l2_block_param); } @@ -500,8 +497,7 @@ impl ZkSyncStateKeeper { timestamp: l2_block.timestamp, virtual_blocks: l2_block.virtual_blocks, }, - ) - .await; + ); Self::start_next_l2_block(updates_manager, batch_executor).await?; } @@ -568,9 +564,9 @@ impl ZkSyncStateKeeper { ); // We've processed all the L2 blocks, and right now we're preparing the next *actual* L2 block. - self.get_and_set_new_l2_block_params(updates_manager, stop_receiver) + self.wait_and_set_new_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("get_and_set_new_l2_block_params"))?; + .map_err(|e| e.context("wait_and_set_new_l2_block_params"))?; Ok(true) } @@ -628,7 +624,7 @@ impl ZkSyncStateKeeper { is_last_block_sealed = true; // Set a tentative new l2 block parameters - self.get_and_set_new_l2_block_params(updates_manager, stop_receiver) + self.wait_and_set_new_l2_block_params(updates_manager, stop_receiver) .await .map_err(|e| e.context("set_new_l2_block_params"))?; } From 225beda843cf82ef30ac1daaaaf9f49bf2c67df2 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Thu, 30 Jan 2025 00:12:58 +0900 Subject: [PATCH 10/17] remove logic from update manager --- core/node/state_keeper/src/io/persistence.rs | 12 +-- core/node/state_keeper/src/keeper.rs | 92 +++++++++----------- core/node/state_keeper/src/updates/mod.rs | 22 +---- 3 files changed, 50 insertions(+), 76 deletions(-) diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index 423c1cd56654..8db7fe4120ed 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -512,11 +512,10 @@ mod tests { vec![], ); output_handler.handle_l2_block(&updates).await.unwrap(); - updates.set_next_l2_block_parameters(L2BlockParams { + updates.push_l2_block(L2BlockParams { timestamp: 1, virtual_blocks: 1, }); - updates.push_l2_block(); let mut batch_result = FinishedL1Batch::mock(); batch_result.final_execution_state.deduplicated_storage_logs = @@ -624,11 +623,10 @@ mod tests { persistence.submit_l2_block(seal_command).await; // The second command should lead to blocking - updates_manager.set_next_l2_block_parameters(L2BlockParams { + updates_manager.push_l2_block(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); - updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); { let submit_future = persistence.submit_l2_block(seal_command); @@ -653,11 +651,10 @@ mod tests { // Check that `wait_for_all_commands()` state is reset after use. persistence.wait_for_all_commands().await; - updates_manager.set_next_l2_block_parameters(L2BlockParams { + updates_manager.push_l2_block(L2BlockParams { timestamp: 3, virtual_blocks: 1, }); - updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); persistence.submit_l2_block(seal_command).await; let command = sealer.commands_receiver.recv().await.unwrap(); @@ -678,11 +675,10 @@ mod tests { for i in 1..=5 { let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); - updates_manager.set_next_l2_block_parameters(L2BlockParams { + updates_manager.push_l2_block(L2BlockParams { timestamp: i, virtual_blocks: 1, }); - updates_manager.push_l2_block(); persistence.submit_l2_block(seal_command).await; } diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 960eb49ed9ad..1f43d5d68593 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -192,9 +192,15 @@ impl ZkSyncStateKeeper { self.seal_l2_block(&updates_manager).await?; // We've sealed the L2 block that we had, but we still need to set up the timestamp // for the fictive L2 block. - self.wait_and_set_new_l2_block_params(&mut updates_manager, &stop_receiver) + let new_l2_block_params = self + .wait_for_new_l2_block_params(&updates_manager, &stop_receiver) .await?; - Self::start_next_l2_block(&mut updates_manager, &mut *batch_executor).await?; + Self::start_next_l2_block( + new_l2_block_params, + &mut updates_manager, + &mut *batch_executor, + ) + .await?; } let (finished_batch, _) = batch_executor.finish_batch().await?; @@ -381,11 +387,11 @@ impl ZkSyncStateKeeper { l2_block = %updates.l2_block.number, ) )] - async fn wait_and_set_new_l2_block_params( + async fn wait_for_new_l2_block_params( &mut self, - updates: &mut UpdatesManager, + updates: &UpdatesManager, stop_receiver: &watch::Receiver, - ) -> Result<(), Error> { + ) -> Result { let latency = KEEPER_METRICS.wait_for_l2_block_params.start(); let cursor = updates.io_cursor(); while !is_canceled(stop_receiver) { @@ -397,37 +403,14 @@ impl ZkSyncStateKeeper { { self.health_updater .update(StateKeeperHealthDetails::from(&cursor).into()); - updates.set_next_l2_block_parameters(params); + latency.observe(); - return Ok(()); + return Ok(params); } } Err(Error::Canceled) } - #[tracing::instrument( - skip_all, - fields( - l1_batch = %updates.l1_batch.number, - l2_block = %updates.l2_block.number, - ) - )] - fn update_l2_block_params(&mut self, updates: &mut UpdatesManager) -> () { - let updated_params = self.io.get_updated_l2_block_params(); - updates.set_next_l2_block_parameters(updated_params); - } - - #[tracing::instrument( - skip_all, - fields( - l1_batch = %updates_manager.l1_batch.number, - l2_block = %updates_manager.l2_block.number, - ) - )] - fn set_l2_block_params(updates_manager: &mut UpdatesManager, l2_block_param: L2BlockParams) { - updates_manager.set_next_l2_block_parameters(l2_block_param); - } - #[tracing::instrument( skip_all, fields( @@ -436,10 +419,11 @@ impl ZkSyncStateKeeper { ) )] async fn start_next_l2_block( + params: L2BlockParams, updates_manager: &mut UpdatesManager, batch_executor: &mut dyn BatchExecutor, ) -> anyhow::Result<()> { - updates_manager.push_l2_block(); + updates_manager.push_l2_block(params); let block_env = updates_manager.l2_block.get_env(); batch_executor .start_next_l2_block(block_env) @@ -491,14 +475,15 @@ impl ZkSyncStateKeeper { for (index, l2_block) in l2_blocks_to_reexecute.into_iter().enumerate() { // Push any non-first L2 block to updates manager. The first one was pushed when `updates_manager` was initialized. if index > 0 { - Self::set_l2_block_params( - updates_manager, + Self::start_next_l2_block( L2BlockParams { timestamp: l2_block.timestamp, virtual_blocks: l2_block.virtual_blocks, }, - ); - Self::start_next_l2_block(updates_manager, batch_executor).await?; + updates_manager, + batch_executor, + ) + .await?; } let l2_block_number = l2_block.number; @@ -564,9 +549,10 @@ impl ZkSyncStateKeeper { ); // We've processed all the L2 blocks, and right now we're preparing the next *actual* L2 block. - self.wait_and_set_new_l2_block_params(updates_manager, stop_receiver) + // The `wait_for_new_l2_block_params` call is used to initialize the StateKeeperIO with a correct new l2 block params + self.wait_for_new_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("wait_and_set_new_l2_block_params"))?; + .map_err(|e| e.context("wait_for_new_l2_block_params"))?; Ok(true) } @@ -583,6 +569,8 @@ impl ZkSyncStateKeeper { stop_receiver: &watch::Receiver, ) -> Result<(), Error> { let mut is_last_block_sealed = state_restored; + let mut next_l2_block_params = L2BlockParams::default(); + if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) .await?; @@ -602,14 +590,19 @@ impl ZkSyncStateKeeper { // Push the current block if it has not been done yet if is_last_block_sealed { - self.update_l2_block_params(updates_manager); + next_l2_block_params = self.io.get_updated_l2_block_params(); tracing::debug!( "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, updates_manager.l1_batch.number, - display_timestamp(updates_manager.next_l2_block_timestamp()) + display_timestamp(next_l2_block_params.timestamp) ); - Self::start_next_l2_block(updates_manager, batch_executor).await?; + Self::start_next_l2_block( + next_l2_block_params, + updates_manager, + batch_executor, + ) + .await?; } return Ok(()); } @@ -623,23 +616,21 @@ impl ZkSyncStateKeeper { self.seal_l2_block(updates_manager).await?; is_last_block_sealed = true; - // Set a tentative new l2 block parameters - self.wait_and_set_new_l2_block_params(updates_manager, stop_receiver) + // Get a tentative new l2 block parameters + next_l2_block_params = self + .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await - .map_err(|e| e.context("set_new_l2_block_params"))?; + .map_err(|e| e.context("wait_for_new_l2_block_params"))?; } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); if is_last_block_sealed { // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp - self.update_l2_block_params(updates_manager); + next_l2_block_params = self.io.get_updated_l2_block_params(); } let Some(tx) = self .io - .wait_for_next_tx( - POLL_WAIT_DURATION, - updates_manager.next_l2_block_timestamp(), - ) + .wait_for_next_tx(POLL_WAIT_DURATION, next_l2_block_params.timestamp) .instrument(info_span!("wait_for_next_tx")) .await .context("error waiting for next transaction")? @@ -658,9 +649,10 @@ impl ZkSyncStateKeeper { "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, updates_manager.l1_batch.number, - display_timestamp(updates_manager.next_l2_block_timestamp()) + display_timestamp(next_l2_block_params.timestamp) ); - Self::start_next_l2_block(updates_manager, batch_executor).await?; + Self::start_next_l2_block(next_l2_block_params, updates_manager, batch_executor) + .await?; is_last_block_sealed = false; } diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index 26c221ebbaa5..b4f548527652 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -39,7 +39,6 @@ pub struct UpdatesManager { pub l2_block: L2BlockUpdates, pub storage_writes_deduplicator: StorageWritesDeduplicator, pubdata_params: PubdataParams, - pub next_l2_block_param: L2BlockParams, } impl UpdatesManager { @@ -67,10 +66,6 @@ impl UpdatesManager { storage_writes_deduplicator: StorageWritesDeduplicator::new(), storage_view_cache: None, pubdata_params, - next_l2_block_param: L2BlockParams { - timestamp: l1_batch_env.first_l2_block.timestamp, - virtual_blocks: l1_batch_env.first_l2_block.max_virtual_blocks_to_create, - }, } } @@ -78,10 +73,6 @@ impl UpdatesManager { self.batch_timestamp } - pub(crate) fn next_l2_block_timestamp(&self) -> u64 { - self.next_l2_block_param.timestamp - } - pub fn base_system_contract_hashes(&self) -> BaseSystemContractsHashes { self.base_system_contract_hashes } @@ -176,12 +167,12 @@ impl UpdatesManager { /// Pushes a new L2 block with the specified timestamp into this manager. The previously /// held L2 block is considered sealed and is used to extend the L1 batch data. - pub fn push_l2_block(&mut self) { + pub fn push_l2_block(&mut self, l2_block_params: L2BlockParams) { let new_l2_block_updates = L2BlockUpdates::new( - self.next_l2_block_param.timestamp, + l2_block_params.timestamp, self.l2_block.number + 1, self.l2_block.get_l2_block_hash(), - self.next_l2_block_param.virtual_blocks, + l2_block_params.virtual_blocks, self.protocol_version, ); let old_l2_block_updates = std::mem::replace(&mut self.l2_block, new_l2_block_updates); @@ -189,10 +180,6 @@ impl UpdatesManager { .extend_from_sealed_l2_block(old_l2_block_updates); } - pub fn set_next_l2_block_parameters(&mut self, l2_block_param: L2BlockParams) { - self.next_l2_block_param = l2_block_param - } - pub(crate) fn pending_executed_transactions_len(&self) -> usize { self.l1_batch.executed_transactions.len() + self.l2_block.executed_transactions.len() } @@ -256,11 +243,10 @@ mod tests { assert_eq!(updates_manager.l1_batch.executed_transactions.len(), 0); // Seal an L2 block. - updates_manager.set_next_l2_block_parameters(L2BlockParams { + updates_manager.push_l2_block(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); - updates_manager.push_l2_block(); // Check that L1 batch updates are the same with the pending state // and L2 block updates are empty. From ebcd2b490abf8087fa7f2c0cdb5ec127ce13240b Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Thu, 30 Jan 2025 11:14:24 +0900 Subject: [PATCH 11/17] fix initialisation next_l2_params --- core/node/state_keeper/src/keeper.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 1f43d5d68593..a2177bb06f17 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -569,7 +569,10 @@ impl ZkSyncStateKeeper { stop_receiver: &watch::Receiver, ) -> Result<(), Error> { let mut is_last_block_sealed = state_restored; - let mut next_l2_block_params = L2BlockParams::default(); + let mut next_l2_block_params = L2BlockParams { + timestamp: updates_manager.l2_block.timestamp, + virtual_blocks: updates_manager.l2_block.virtual_blocks, + }; if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) From d10c4b2de181a1c2d7d92726ae4805f5bcaaba86 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Sat, 1 Feb 2025 00:21:13 +0900 Subject: [PATCH 12/17] make io stateless and store last block param in updatemanager --- core/node/node_sync/src/external_io.rs | 7 +- core/node/state_keeper/src/io/mempool.rs | 9 +-- core/node/state_keeper/src/io/mod.rs | 4 +- core/node/state_keeper/src/io/persistence.rs | 12 ++-- core/node/state_keeper/src/keeper.rs | 67 ++++++++++--------- .../src/testonly/test_batch_executor.rs | 7 +- core/node/state_keeper/src/updates/mod.rs | 22 ++++-- 7 files changed, 67 insertions(+), 61 deletions(-) diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 5719da4769ea..b0389420134a 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -40,7 +40,6 @@ pub struct ExternalIO { actions: ActionQueue, main_node_client: Box, chain_id: L2ChainId, - pub next_l2_block_param: L2BlockParams, } impl ExternalIO { @@ -57,7 +56,6 @@ impl ExternalIO { actions, main_node_client, chain_id, - next_l2_block_param: L2BlockParams::default(), }) } @@ -350,7 +348,6 @@ impl StateKeeperIO for ExternalIO { "L2 block number mismatch: expected {}, got {number}", cursor.next_l2_block ); - self.next_l2_block_param = params; return Ok(Some(params)); } other => { @@ -361,9 +358,7 @@ impl StateKeeperIO for ExternalIO { } } - fn get_updated_l2_block_params(&mut self) -> L2BlockParams { - self.next_l2_block_param - } + fn update_next_l2_block_timestamp(&mut self, _block_timestamp: &mut u64) {} async fn wait_for_next_tx( &mut self, diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index aad300ae5b3c..83494c1a09d3 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -59,7 +59,6 @@ pub struct MempoolIO { chain_id: L2ChainId, l2_da_validator_address: Option
, pubdata_type: PubdataType, - pub next_l2_block_param: L2BlockParams, } impl IoSealCriteria for MempoolIO { @@ -289,17 +288,14 @@ impl StateKeeperIO for MempoolIO { // This value is effectively ignored by the protocol. virtual_blocks: 1, }; - self.next_l2_block_param = params; Ok(Some(params)) } - fn get_updated_l2_block_params(&mut self) -> L2BlockParams { + fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64) { let current_timestamp_millis = millis_since_epoch(); let current_timestamp = (current_timestamp_millis / 1_000) as u64; - let mut updated_params = self.next_l2_block_param; - updated_params.timestamp = current_timestamp; - updated_params + *block_timestamp = current_timestamp; } async fn wait_for_next_tx( @@ -524,7 +520,6 @@ impl MempoolIO { chain_id, l2_da_validator_address, pubdata_type, - next_l2_block_param: L2BlockParams::default(), }) } diff --git a/core/node/state_keeper/src/io/mod.rs b/core/node/state_keeper/src/io/mod.rs index 354a6d9afa3d..acf08b747835 100644 --- a/core/node/state_keeper/src/io/mod.rs +++ b/core/node/state_keeper/src/io/mod.rs @@ -135,8 +135,8 @@ pub trait StateKeeperIO: 'static + Send + Sync + fmt::Debug + IoSealCriteria { max_wait: Duration, ) -> anyhow::Result>; - /// Get the updated parameters for the next L2 block. - fn get_updated_l2_block_params(&mut self) -> L2BlockParams; + /// Update the next block param timestamp + fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64); /// Blocks for up to `max_wait` until the next transaction is available for execution. /// Returns `None` if no transaction became available until the timeout. diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index 8db7fe4120ed..423c1cd56654 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -512,10 +512,11 @@ mod tests { vec![], ); output_handler.handle_l2_block(&updates).await.unwrap(); - updates.push_l2_block(L2BlockParams { + updates.set_next_l2_block_parameters(L2BlockParams { timestamp: 1, virtual_blocks: 1, }); + updates.push_l2_block(); let mut batch_result = FinishedL1Batch::mock(); batch_result.final_execution_state.deduplicated_storage_logs = @@ -623,10 +624,11 @@ mod tests { persistence.submit_l2_block(seal_command).await; // The second command should lead to blocking - updates_manager.push_l2_block(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); + updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); { let submit_future = persistence.submit_l2_block(seal_command); @@ -651,10 +653,11 @@ mod tests { // Check that `wait_for_all_commands()` state is reset after use. persistence.wait_for_all_commands().await; - updates_manager.push_l2_block(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 3, virtual_blocks: 1, }); + updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); persistence.submit_l2_block(seal_command).await; let command = sealer.commands_receiver.recv().await.unwrap(); @@ -675,10 +678,11 @@ mod tests { for i in 1..=5 { let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); - updates_manager.push_l2_block(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: i, virtual_blocks: 1, }); + updates_manager.push_l2_block(); persistence.submit_l2_block(seal_command).await; } diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index a2177bb06f17..bed6d741af7e 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -195,12 +195,8 @@ impl ZkSyncStateKeeper { let new_l2_block_params = self .wait_for_new_l2_block_params(&updates_manager, &stop_receiver) .await?; - Self::start_next_l2_block( - new_l2_block_params, - &mut updates_manager, - &mut *batch_executor, - ) - .await?; + Self::set_l2_block_params(&mut updates_manager, new_l2_block_params); + Self::start_next_l2_block(&mut updates_manager, &mut *batch_executor).await?; } let (finished_batch, _) = batch_executor.finish_batch().await?; @@ -411,6 +407,17 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } + #[tracing::instrument( + skip_all, + fields( + l1_batch = %updates_manager.l1_batch.number, + l2_block = %updates_manager.l2_block.number, + ) + )] + fn set_l2_block_params(updates_manager: &mut UpdatesManager, l2_block_param: L2BlockParams) { + updates_manager.set_next_l2_block_parameters(l2_block_param); + } + #[tracing::instrument( skip_all, fields( @@ -419,11 +426,10 @@ impl ZkSyncStateKeeper { ) )] async fn start_next_l2_block( - params: L2BlockParams, updates_manager: &mut UpdatesManager, batch_executor: &mut dyn BatchExecutor, ) -> anyhow::Result<()> { - updates_manager.push_l2_block(params); + updates_manager.push_l2_block(); let block_env = updates_manager.l2_block.get_env(); batch_executor .start_next_l2_block(block_env) @@ -475,15 +481,14 @@ impl ZkSyncStateKeeper { for (index, l2_block) in l2_blocks_to_reexecute.into_iter().enumerate() { // Push any non-first L2 block to updates manager. The first one was pushed when `updates_manager` was initialized. if index > 0 { - Self::start_next_l2_block( + Self::set_l2_block_params( + updates_manager, L2BlockParams { timestamp: l2_block.timestamp, virtual_blocks: l2_block.virtual_blocks, }, - updates_manager, - batch_executor, - ) - .await?; + ); + Self::start_next_l2_block(updates_manager, batch_executor).await?; } let l2_block_number = l2_block.number; @@ -550,9 +555,11 @@ impl ZkSyncStateKeeper { // We've processed all the L2 blocks, and right now we're preparing the next *actual* L2 block. // The `wait_for_new_l2_block_params` call is used to initialize the StateKeeperIO with a correct new l2 block params - self.wait_for_new_l2_block_params(updates_manager, stop_receiver) + let new_l2_block_params = self + .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await .map_err(|e| e.context("wait_for_new_l2_block_params"))?; + Self::set_l2_block_params(updates_manager, new_l2_block_params); Ok(true) } @@ -569,10 +576,6 @@ impl ZkSyncStateKeeper { stop_receiver: &watch::Receiver, ) -> Result<(), Error> { let mut is_last_block_sealed = state_restored; - let mut next_l2_block_params = L2BlockParams { - timestamp: updates_manager.l2_block.timestamp, - virtual_blocks: updates_manager.l2_block.virtual_blocks, - }; if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) @@ -593,19 +596,15 @@ impl ZkSyncStateKeeper { // Push the current block if it has not been done yet if is_last_block_sealed { - next_l2_block_params = self.io.get_updated_l2_block_params(); + self.io + .update_next_l2_block_timestamp(updates_manager.next_l2_block_timestamp()); tracing::debug!( "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, updates_manager.l1_batch.number, - display_timestamp(next_l2_block_params.timestamp) + display_timestamp(updates_manager.next_l2_block_param.timestamp) ); - Self::start_next_l2_block( - next_l2_block_params, - updates_manager, - batch_executor, - ) - .await?; + Self::start_next_l2_block(updates_manager, batch_executor).await?; } return Ok(()); } @@ -620,20 +619,25 @@ impl ZkSyncStateKeeper { is_last_block_sealed = true; // Get a tentative new l2 block parameters - next_l2_block_params = self + let next_l2_block_params = self .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await .map_err(|e| e.context("wait_for_new_l2_block_params"))?; + Self::set_l2_block_params(updates_manager, next_l2_block_params); } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); if is_last_block_sealed { // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp - next_l2_block_params = self.io.get_updated_l2_block_params(); + self.io + .update_next_l2_block_timestamp(updates_manager.next_l2_block_timestamp()); } let Some(tx) = self .io - .wait_for_next_tx(POLL_WAIT_DURATION, next_l2_block_params.timestamp) + .wait_for_next_tx( + POLL_WAIT_DURATION, + updates_manager.next_l2_block_param.timestamp, + ) .instrument(info_span!("wait_for_next_tx")) .await .context("error waiting for next transaction")? @@ -652,10 +656,9 @@ impl ZkSyncStateKeeper { "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", updates_manager.l2_block.number + 1, updates_manager.l1_batch.number, - display_timestamp(next_l2_block_params.timestamp) + display_timestamp(updates_manager.next_l2_block_param.timestamp) ); - Self::start_next_l2_block(next_l2_block_params, updates_manager, batch_executor) - .await?; + Self::start_next_l2_block(updates_manager, batch_executor).await?; is_last_block_sealed = false; } diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index ab1e6b57519b..3b5eefd2a089 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -562,7 +562,6 @@ pub(crate) struct TestIO { protocol_version: ProtocolVersionId, previous_batch_protocol_version: ProtocolVersionId, protocol_upgrade_txs: HashMap, - pub next_l2_block_param: L2BlockParams, } impl fmt::Debug for TestIO { @@ -610,7 +609,6 @@ impl TestIO { protocol_version: ProtocolVersionId::latest(), previous_batch_protocol_version: ProtocolVersionId::latest(), protocol_upgrade_txs: HashMap::default(), - next_l2_block_param: L2BlockParams::default(), }; (this, OutputHandler::new(Box::new(persistence))) } @@ -719,13 +717,10 @@ impl StateKeeperIO for TestIO { }; self.l2_block_number += 1; self.timestamp += 1; - self.next_l2_block_param = params; Ok(Some(params)) } - fn get_updated_l2_block_params(&mut self) -> L2BlockParams { - self.next_l2_block_param - } + fn update_next_l2_block_timestamp(&mut self, _block_timestamp: &mut u64) {} async fn wait_for_next_tx( &mut self, diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index b4f548527652..b0b11b9d0168 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -39,6 +39,7 @@ pub struct UpdatesManager { pub l2_block: L2BlockUpdates, pub storage_writes_deduplicator: StorageWritesDeduplicator, pubdata_params: PubdataParams, + pub next_l2_block_param: L2BlockParams, } impl UpdatesManager { @@ -66,6 +67,10 @@ impl UpdatesManager { storage_writes_deduplicator: StorageWritesDeduplicator::new(), storage_view_cache: None, pubdata_params, + next_l2_block_param: L2BlockParams { + timestamp: l1_batch_env.first_l2_block.timestamp, + virtual_blocks: l1_batch_env.first_l2_block.max_virtual_blocks_to_create, + }, } } @@ -77,6 +82,10 @@ impl UpdatesManager { self.base_system_contract_hashes } + pub(crate) fn next_l2_block_timestamp(&mut self) -> &mut u64 { + &mut self.next_l2_block_param.timestamp + } + pub(crate) fn io_cursor(&self) -> IoCursor { IoCursor { next_l2_block: self.l2_block.number + 1, @@ -167,12 +176,12 @@ impl UpdatesManager { /// Pushes a new L2 block with the specified timestamp into this manager. The previously /// held L2 block is considered sealed and is used to extend the L1 batch data. - pub fn push_l2_block(&mut self, l2_block_params: L2BlockParams) { + pub fn push_l2_block(&mut self) { let new_l2_block_updates = L2BlockUpdates::new( - l2_block_params.timestamp, + self.next_l2_block_param.timestamp, self.l2_block.number + 1, self.l2_block.get_l2_block_hash(), - l2_block_params.virtual_blocks, + self.next_l2_block_param.virtual_blocks, self.protocol_version, ); let old_l2_block_updates = std::mem::replace(&mut self.l2_block, new_l2_block_updates); @@ -180,6 +189,10 @@ impl UpdatesManager { .extend_from_sealed_l2_block(old_l2_block_updates); } + pub fn set_next_l2_block_parameters(&mut self, l2_block_param: L2BlockParams) { + self.next_l2_block_param = l2_block_param + } + pub(crate) fn pending_executed_transactions_len(&self) -> usize { self.l1_batch.executed_transactions.len() + self.l2_block.executed_transactions.len() } @@ -243,10 +256,11 @@ mod tests { assert_eq!(updates_manager.l1_batch.executed_transactions.len(), 0); // Seal an L2 block. - updates_manager.push_l2_block(L2BlockParams { + updates_manager.set_next_l2_block_parameters(L2BlockParams { timestamp: 2, virtual_blocks: 1, }); + updates_manager.push_l2_block(); // Check that L1 batch updates are the same with the pending state // and L2 block updates are empty. From b839d3cc50aefe52f8a86438cfd47d165e279fa1 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Sat, 1 Feb 2025 00:26:17 +0900 Subject: [PATCH 13/17] remove unecessary change in mempool --- core/node/state_keeper/src/io/mempool.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index 83494c1a09d3..f553fcb57a08 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -283,19 +283,11 @@ impl StateKeeperIO for MempoolIO { return Ok(None); }; - let params = L2BlockParams { + Ok(Some(L2BlockParams { timestamp, // This value is effectively ignored by the protocol. virtual_blocks: 1, - }; - Ok(Some(params)) - } - - fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64) { - let current_timestamp_millis = millis_since_epoch(); - let current_timestamp = (current_timestamp_millis / 1_000) as u64; - - *block_timestamp = current_timestamp; + })) } async fn wait_for_next_tx( From b87df90d9f1c9b3c4d5c690afcf2eb22692ff10b Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Mon, 3 Feb 2025 15:58:55 +0900 Subject: [PATCH 14/17] revert deleted code by mistake --- core/node/state_keeper/src/io/mempool.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index f553fcb57a08..feb71b1f5550 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -290,6 +290,13 @@ impl StateKeeperIO for MempoolIO { })) } + fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64) { + let current_timestamp_millis = millis_since_epoch(); + let current_timestamp = (current_timestamp_millis / 1_000) as u64; + + *block_timestamp = current_timestamp; + } + async fn wait_for_next_tx( &mut self, max_wait: Duration, From c3e0fccb872eb3af1bc147ad1f61d7cc099e93b1 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Thu, 6 Feb 2025 00:02:07 +0900 Subject: [PATCH 15/17] fix comments --- core/node/state_keeper/src/io/mempool.rs | 10 ++- core/node/state_keeper/src/io/mod.rs | 2 +- core/node/state_keeper/src/io/persistence.rs | 16 ++-- core/node/state_keeper/src/keeper.rs | 93 +++++++++----------- core/node/state_keeper/src/updates/mod.rs | 34 ++++--- 5 files changed, 83 insertions(+), 72 deletions(-) diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index feb71b1f5550..19131a20960f 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -294,7 +294,15 @@ impl StateKeeperIO for MempoolIO { let current_timestamp_millis = millis_since_epoch(); let current_timestamp = (current_timestamp_millis / 1_000) as u64; - *block_timestamp = current_timestamp; + if current_timestamp < *block_timestamp { + tracing::warn!( + "Trying to update bloc timestamp {} with lower value timestamp {}", + *block_timestamp, + current_timestamp + ); + } else { + *block_timestamp = current_timestamp; + } } async fn wait_for_next_tx( diff --git a/core/node/state_keeper/src/io/mod.rs b/core/node/state_keeper/src/io/mod.rs index acf08b747835..af86527145f6 100644 --- a/core/node/state_keeper/src/io/mod.rs +++ b/core/node/state_keeper/src/io/mod.rs @@ -135,7 +135,7 @@ pub trait StateKeeperIO: 'static + Send + Sync + fmt::Debug + IoSealCriteria { max_wait: Duration, ) -> anyhow::Result>; - /// Update the next block param timestamp + /// Update the next block params timestamp fn update_next_l2_block_timestamp(&mut self, block_timestamp: &mut u64); /// Blocks for up to `max_wait` until the next transaction is available for execution. diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index 423c1cd56654..0a10885f2361 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -512,10 +512,10 @@ mod tests { vec![], ); output_handler.handle_l2_block(&updates).await.unwrap(); - updates.set_next_l2_block_parameters(L2BlockParams { + updates.set_next_l2_block_params(Some(L2BlockParams { timestamp: 1, virtual_blocks: 1, - }); + })); updates.push_l2_block(); let mut batch_result = FinishedL1Batch::mock(); @@ -624,10 +624,10 @@ mod tests { persistence.submit_l2_block(seal_command).await; // The second command should lead to blocking - updates_manager.set_next_l2_block_parameters(L2BlockParams { + updates_manager.set_next_l2_block_params(Some(L2BlockParams { timestamp: 2, virtual_blocks: 1, - }); + })); updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); { @@ -653,10 +653,10 @@ mod tests { // Check that `wait_for_all_commands()` state is reset after use. persistence.wait_for_all_commands().await; - updates_manager.set_next_l2_block_parameters(L2BlockParams { + updates_manager.set_next_l2_block_params(Some(L2BlockParams { timestamp: 3, virtual_blocks: 1, - }); + })); updates_manager.push_l2_block(); let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); persistence.submit_l2_block(seal_command).await; @@ -678,10 +678,10 @@ mod tests { for i in 1..=5 { let seal_command = updates_manager.seal_l2_block_command(Some(Address::default()), false); - updates_manager.set_next_l2_block_parameters(L2BlockParams { + updates_manager.set_next_l2_block_params(Some(L2BlockParams { timestamp: i, virtual_blocks: 1, - }); + })); updates_manager.push_l2_block(); persistence.submit_l2_block(seal_command).await; } diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index bed6d741af7e..c43fd8721d08 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -166,14 +166,13 @@ impl ZkSyncStateKeeper { &stop_receiver, ) .await?; - let mut state_restored = self - .restore_state( - &mut *batch_executor, - &mut updates_manager, - pending_l2_blocks, - &stop_receiver, - ) - .await?; + self.restore_state( + &mut *batch_executor, + &mut updates_manager, + pending_l2_blocks, + &stop_receiver, + ) + .await?; let mut l1_batch_seal_delta: Option = None; while !is_canceled(&stop_receiver) { @@ -182,7 +181,6 @@ impl ZkSyncStateKeeper { &mut *batch_executor, &mut updates_manager, protocol_upgrade_tx, - state_restored, &stop_receiver, ) .await?; @@ -234,7 +232,6 @@ impl ZkSyncStateKeeper { } else { None }; - state_restored = false; } Err(Error::Canceled) } @@ -407,15 +404,9 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } - #[tracing::instrument( - skip_all, - fields( - l1_batch = %updates_manager.l1_batch.number, - l2_block = %updates_manager.l2_block.number, - ) - )] - fn set_l2_block_params(updates_manager: &mut UpdatesManager, l2_block_param: L2BlockParams) { - updates_manager.set_next_l2_block_parameters(l2_block_param); + #[tracing::instrument(skip(updates_manager))] + fn set_l2_block_params(updates_manager: &mut UpdatesManager, l2_block_params: L2BlockParams) { + updates_manager.set_next_l2_block_params(Some(l2_block_params)); } #[tracing::instrument( @@ -431,6 +422,15 @@ impl ZkSyncStateKeeper { ) -> anyhow::Result<()> { updates_manager.push_l2_block(); let block_env = updates_manager.l2_block.get_env(); + let timestamp = updates_manager + .get_next_l2_block_params_or_batch_params() + .timestamp; + tracing::debug!( + "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", + updates_manager.l2_block.number + 1, + updates_manager.l1_batch.number, + display_timestamp(timestamp) + ); batch_executor .start_next_l2_block(block_env) .await @@ -473,9 +473,9 @@ impl ZkSyncStateKeeper { updates_manager: &mut UpdatesManager, l2_blocks_to_reexecute: Vec, stop_receiver: &watch::Receiver, - ) -> Result { + ) -> Result<(), Error> { if l2_blocks_to_reexecute.is_empty() { - return Ok(false); + return Ok(()); } for (index, l2_block) in l2_blocks_to_reexecute.into_iter().enumerate() { @@ -560,7 +560,7 @@ impl ZkSyncStateKeeper { .await .map_err(|e| e.context("wait_for_new_l2_block_params"))?; Self::set_l2_block_params(updates_manager, new_l2_block_params); - Ok(true) + Ok(()) } #[tracing::instrument( @@ -572,11 +572,8 @@ impl ZkSyncStateKeeper { batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, protocol_upgrade_tx: Option, - state_restored: bool, stop_receiver: &watch::Receiver, ) -> Result<(), Error> { - let mut is_last_block_sealed = state_restored; - if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) .await?; @@ -594,29 +591,27 @@ impl ZkSyncStateKeeper { updates_manager.l1_batch.number ); - // Push the current block if it has not been done yet - if is_last_block_sealed { - self.io - .update_next_l2_block_timestamp(updates_manager.next_l2_block_timestamp()); - tracing::debug!( - "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", - updates_manager.l2_block.number + 1, - updates_manager.l1_batch.number, - display_timestamp(updates_manager.next_l2_block_param.timestamp) + // Push the current block if it has not been done yet and this will effectively create a fictive l2 block + if updates_manager.has_next_block_params() { + self.io.update_next_l2_block_timestamp( + &mut updates_manager + .get_next_l2_block_params_or_batch_params() + .timestamp, ); Self::start_next_l2_block(updates_manager, batch_executor).await?; } return Ok(()); } - if self.io.should_seal_l2_block(updates_manager) && !is_last_block_sealed { + if !updates_manager.has_next_block_params() + && self.io.should_seal_l2_block(updates_manager) + { tracing::debug!( "L2 block #{} (L1 batch #{}) should be sealed as per sealing rules", updates_manager.l2_block.number, updates_manager.l1_batch.number ); self.seal_l2_block(updates_manager).await?; - is_last_block_sealed = true; // Get a tentative new l2 block parameters let next_l2_block_params = self @@ -625,18 +620,24 @@ impl ZkSyncStateKeeper { .map_err(|e| e.context("wait_for_new_l2_block_params"))?; Self::set_l2_block_params(updates_manager, next_l2_block_params); } + let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); - if is_last_block_sealed { + if updates_manager.has_next_block_params() { // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp - self.io - .update_next_l2_block_timestamp(updates_manager.next_l2_block_timestamp()); + self.io.update_next_l2_block_timestamp( + &mut updates_manager + .get_next_l2_block_params_or_batch_params() + .timestamp, + ); } let Some(tx) = self .io .wait_for_next_tx( POLL_WAIT_DURATION, - updates_manager.next_l2_block_param.timestamp, + updates_manager + .get_next_l2_block_params_or_batch_params() + .timestamp, ) .instrument(info_span!("wait_for_next_tx")) .await @@ -650,16 +651,10 @@ impl ZkSyncStateKeeper { let tx_hash = tx.hash(); - // if the current block is sealed, we need to start a new block - if is_last_block_sealed { - tracing::debug!( - "Initialized new L2 block #{} (L1 batch #{}) with timestamp {}", - updates_manager.l2_block.number + 1, - updates_manager.l1_batch.number, - display_timestamp(updates_manager.next_l2_block_param.timestamp) - ); + // We need to start a new block + if updates_manager.has_next_block_params() { Self::start_next_l2_block(updates_manager, batch_executor).await?; - is_last_block_sealed = false; + updates_manager.set_next_l2_block_params(None); } let (seal_resolution, exec_result) = self diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index b0b11b9d0168..3f2a8bbbd769 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -39,7 +39,7 @@ pub struct UpdatesManager { pub l2_block: L2BlockUpdates, pub storage_writes_deduplicator: StorageWritesDeduplicator, pubdata_params: PubdataParams, - pub next_l2_block_param: L2BlockParams, + next_l2_block_params: Option, } impl UpdatesManager { @@ -67,10 +67,7 @@ impl UpdatesManager { storage_writes_deduplicator: StorageWritesDeduplicator::new(), storage_view_cache: None, pubdata_params, - next_l2_block_param: L2BlockParams { - timestamp: l1_batch_env.first_l2_block.timestamp, - virtual_blocks: l1_batch_env.first_l2_block.max_virtual_blocks_to_create, - }, + next_l2_block_params: None, } } @@ -82,8 +79,18 @@ impl UpdatesManager { self.base_system_contract_hashes } - pub(crate) fn next_l2_block_timestamp(&mut self) -> &mut u64 { - &mut self.next_l2_block_param.timestamp + pub(crate) fn get_next_l2_block_params_or_batch_params(&mut self) -> L2BlockParams { + if let Some(next_l2_block_params) = self.next_l2_block_params { + return next_l2_block_params; + } + L2BlockParams { + timestamp: self.l2_block.timestamp, + virtual_blocks: self.l2_block.virtual_blocks, + } + } + + pub(crate) fn has_next_block_params(&self) -> bool { + self.next_l2_block_params.is_some() } pub(crate) fn io_cursor(&self) -> IoCursor { @@ -178,10 +185,11 @@ impl UpdatesManager { /// held L2 block is considered sealed and is used to extend the L1 batch data. pub fn push_l2_block(&mut self) { let new_l2_block_updates = L2BlockUpdates::new( - self.next_l2_block_param.timestamp, + self.get_next_l2_block_params_or_batch_params().timestamp, self.l2_block.number + 1, self.l2_block.get_l2_block_hash(), - self.next_l2_block_param.virtual_blocks, + self.get_next_l2_block_params_or_batch_params() + .virtual_blocks, self.protocol_version, ); let old_l2_block_updates = std::mem::replace(&mut self.l2_block, new_l2_block_updates); @@ -189,8 +197,8 @@ impl UpdatesManager { .extend_from_sealed_l2_block(old_l2_block_updates); } - pub fn set_next_l2_block_parameters(&mut self, l2_block_param: L2BlockParams) { - self.next_l2_block_param = l2_block_param + pub fn set_next_l2_block_params(&mut self, l2_block_params: Option) { + self.next_l2_block_params = l2_block_params } pub(crate) fn pending_executed_transactions_len(&self) -> usize { @@ -256,10 +264,10 @@ mod tests { assert_eq!(updates_manager.l1_batch.executed_transactions.len(), 0); // Seal an L2 block. - updates_manager.set_next_l2_block_parameters(L2BlockParams { + updates_manager.set_next_l2_block_params(Some(L2BlockParams { timestamp: 2, virtual_blocks: 1, - }); + })); updates_manager.push_l2_block(); // Check that L1 batch updates are the same with the pending state From 522be8310bdf55df232364c0440a6eeb06316457 Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Thu, 6 Feb 2025 15:17:03 +0900 Subject: [PATCH 16/17] add safety check --- core/node/state_keeper/src/keeper.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index c43fd8721d08..69a5d31e4893 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -184,6 +184,7 @@ impl ZkSyncStateKeeper { &stop_receiver, ) .await?; + assert!(!updates_manager.has_next_block_params()); // Finish current batch with an empty block. if !updates_manager.l2_block.executed_transactions.is_empty() { @@ -431,6 +432,7 @@ impl ZkSyncStateKeeper { updates_manager.l1_batch.number, display_timestamp(timestamp) ); + updates_manager.set_next_l2_block_params(None); batch_executor .start_next_l2_block(block_env) .await @@ -654,7 +656,6 @@ impl ZkSyncStateKeeper { // We need to start a new block if updates_manager.has_next_block_params() { Self::start_next_l2_block(updates_manager, batch_executor).await?; - updates_manager.set_next_l2_block_params(None); } let (seal_resolution, exec_result) = self From d3969acd6195ff5f647950b9497f71e57d910bdb Mon Sep 17 00:00:00 2001 From: Thomas Nguy Date: Thu, 6 Feb 2025 22:56:37 +0900 Subject: [PATCH 17/17] fix mutable --- core/node/state_keeper/src/io/mempool.rs | 4 +--- core/node/state_keeper/src/keeper.rs | 8 ++------ core/node/state_keeper/src/updates/mod.rs | 7 +++++++ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index 19131a20960f..8451a34b5fa5 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -296,9 +296,7 @@ impl StateKeeperIO for MempoolIO { if current_timestamp < *block_timestamp { tracing::warn!( - "Trying to update bloc timestamp {} with lower value timestamp {}", - *block_timestamp, - current_timestamp + "Trying to update block timestamp {block_timestamp} with lower value timestamp {current_timestamp}", ); } else { *block_timestamp = current_timestamp; diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 69a5d31e4893..4cc25f9d33e6 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -596,9 +596,7 @@ impl ZkSyncStateKeeper { // Push the current block if it has not been done yet and this will effectively create a fictive l2 block if updates_manager.has_next_block_params() { self.io.update_next_l2_block_timestamp( - &mut updates_manager - .get_next_l2_block_params_or_batch_params() - .timestamp, + updates_manager.get_next_l2_block_timestamp_as_mut(), ); Self::start_next_l2_block(updates_manager, batch_executor).await?; } @@ -628,9 +626,7 @@ impl ZkSyncStateKeeper { if updates_manager.has_next_block_params() { // The next block has not started yet, we keep updating the next l2 block parameters with correct timestamp self.io.update_next_l2_block_timestamp( - &mut updates_manager - .get_next_l2_block_params_or_batch_params() - .timestamp, + updates_manager.get_next_l2_block_timestamp_as_mut(), ); } let Some(tx) = self diff --git a/core/node/state_keeper/src/updates/mod.rs b/core/node/state_keeper/src/updates/mod.rs index 3f2a8bbbd769..7026ff51a4b6 100644 --- a/core/node/state_keeper/src/updates/mod.rs +++ b/core/node/state_keeper/src/updates/mod.rs @@ -79,6 +79,13 @@ impl UpdatesManager { self.base_system_contract_hashes } + pub(crate) fn get_next_l2_block_timestamp_as_mut(&mut self) -> &mut u64 { + if let Some(l2_block_params) = self.next_l2_block_params.as_mut() { + return &mut l2_block_params.timestamp; + } + &mut self.l2_block.timestamp + } + pub(crate) fn get_next_l2_block_params_or_batch_params(&mut self) -> L2BlockParams { if let Some(next_l2_block_params) = self.next_l2_block_params { return next_l2_block_params;