From 5d5214ba983823b306495d34fdd1d46abacce07a Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Sat, 19 Oct 2024 00:47:33 +1100 Subject: [PATCH] fix(external-node): delete empty unsealed batch on EN initialization (#3125) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ This PR reverts #3088 as I have realized it is going to be very hard to make this fix work by going in that direction. Basically initializing with an empty unsealed batch causes a lot of issues and the existing state keeper/external IO flow heavily relies on us having at least one at the start to initialize correctly. Will leave more context in the comments. Feel free to review individual commits to not see revert changelog. ## Why ❔ This bug causes EN to panic ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`. --- Cargo.lock | 1 + ...81f4625ebd593aa4cd2bae79bcc0637387d78.json | 22 ++++ core/lib/dal/src/blocks_dal.rs | 45 ++++++++ core/node/consensus/src/testonly.rs | 9 +- core/node/node_sync/Cargo.toml | 1 + core/node/node_sync/src/external_io.rs | 8 ++ core/node/node_sync/src/fetcher.rs | 36 +------ core/node/node_sync/src/sync_action.rs | 12 +++ core/node/node_sync/src/tests.rs | 101 +++++++++++++++++- 9 files changed, 195 insertions(+), 40 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json diff --git a/Cargo.lock b/Cargo.lock index eac6e9771f5a..a5e51346bdf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10777,6 +10777,7 @@ dependencies = [ "anyhow", "assert_matches", "async-trait", + "backon", "chrono", "futures 0.3.30", "once_cell", diff --git a/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json b/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json new file mode 100644 index 000000000000..b40bdca666b8 --- /dev/null +++ b/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM l1_batches\n WHERE\n number > $1\n AND NOT is_sealed\n RETURNING number\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78" +} diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index bf1b48130c40..f71dc68ce757 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -2058,6 +2058,37 @@ impl BlocksDal<'_, '_> { Ok(()) } + /// Deletes the unsealed L1 batch from the storage. Expects the caller to make sure there are no + /// associated L2 blocks. + /// + /// Accepts `batch_to_keep` as a safety mechanism. + pub async fn delete_unsealed_l1_batch( + &mut self, + batch_to_keep: L1BatchNumber, + ) -> DalResult<()> { + let deleted_row = sqlx::query!( + r#" + DELETE FROM l1_batches + WHERE + number > $1 + AND NOT is_sealed + RETURNING number + "#, + i64::from(batch_to_keep.0) + ) + .instrument("delete_unsealed_l1_batch") + .with_arg("batch_to_keep", &batch_to_keep) + .fetch_optional(self.storage) + .await?; + if let Some(deleted_row) = deleted_row { + tracing::info!( + l1_batch_number = %deleted_row.number, + "Deleted unsealed batch" + ); + } + Ok(()) + } + /// Deletes all L1 batches from the storage so that the specified batch number is the last one left. pub async fn delete_l1_batches(&mut self, last_batch_to_keep: L1BatchNumber) -> DalResult<()> { self.delete_l1_batches_inner(Some(last_batch_to_keep)).await @@ -2184,6 +2215,20 @@ impl BlocksDal<'_, '_> { Ok(Some((L2BlockNumber(min as u32), L2BlockNumber(max as u32)))) } + /// Returns `true` if there exists a non-sealed batch (i.e. there is one+ stored L2 block that isn't assigned + /// to any batch yet). + pub async fn pending_batch_exists(&mut self) -> DalResult { + let count = sqlx::query_scalar!( + "SELECT COUNT(miniblocks.number) FROM miniblocks WHERE l1_batch_number IS NULL" + ) + .instrument("pending_batch_exists") + .fetch_one(self.storage) + .await? + .unwrap_or(0); + + Ok(count != 0) + } + // methods used for measuring Eth tx stage transition latencies // and emitting metrics base on these measured data pub async fn oldest_uncommitted_batch_timestamp(&mut self) -> DalResult> { diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 98c0d6b08131..4ebcf5c9a617 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -219,11 +219,10 @@ impl StateKeeper { .wait(IoCursor::for_fetcher(&mut conn.0)) .await? .context("IoCursor::new()")?; - let batch_sealed = ctx - .wait(conn.0.blocks_dal().get_unsealed_l1_batch()) + let pending_batch = ctx + .wait(conn.0.blocks_dal().pending_batch_exists()) .await? - .context("get_unsealed_l1_batch()")? - .is_none(); + .context("pending_batch_exists()")?; let (actions_sender, actions_queue) = ActionQueue::new(); let addr = sync::watch::channel(None).0; let sync_state = SyncState::default(); @@ -259,7 +258,7 @@ impl StateKeeper { last_batch: cursor.l1_batch, last_block: cursor.next_l2_block - 1, last_timestamp: cursor.prev_l2_block_timestamp, - batch_sealed, + batch_sealed: !pending_batch, next_priority_op: PriorityOpId(1), actions_sender, sync_state: sync_state.clone(), diff --git a/core/node/node_sync/Cargo.toml b/core/node/node_sync/Cargo.toml index ccfc8dd8a4e9..9c5b0c000700 100644 --- a/core/node/node_sync/Cargo.toml +++ b/core/node/node_sync/Cargo.toml @@ -43,3 +43,4 @@ zksync_node_test_utils.workspace = true assert_matches.workspace = true once_cell.workspace = true test-casing.workspace = true +backon.workspace = true diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 10fb2925015f..5e3a5ce9f46e 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -155,6 +155,14 @@ impl StateKeeperIO for ExternalIO { ) })?; let Some(mut pending_l2_block_header) = pending_l2_block_header else { + tracing::info!( + l1_batch_number = %cursor.l1_batch, + "No pending L2 blocks found; pruning unsealed batch if exists as we need at least one L2 block to initialize" + ); + storage + .blocks_dal() + .delete_unsealed_l1_batch(cursor.l1_batch - 1) + .await?; return Ok((cursor, None)); }; diff --git a/core/node/node_sync/src/fetcher.rs b/core/node/node_sync/src/fetcher.rs index 3f8558ed0ac5..51b9f7c7a060 100644 --- a/core/node/node_sync/src/fetcher.rs +++ b/core/node/node_sync/src/fetcher.rs @@ -114,8 +114,8 @@ impl IoCursorExt for IoCursor { let mut this = Self::new(storage).await?; // It's important to know whether we have opened a new batch already or just sealed the previous one. // Depending on it, we must either insert `OpenBatch` item into the queue, or not. - let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?; - if unsealed_batch.is_none() { + let was_new_batch_open = storage.blocks_dal().pending_batch_exists().await?; + if !was_new_batch_open { this.l1_batch -= 1; // Should continue from the last L1 batch present in the storage } Ok(this) @@ -201,35 +201,3 @@ impl IoCursorExt for IoCursor { new_actions } } - -#[cfg(test)] -mod tests { - use zksync_dal::{ConnectionPool, Core, CoreDal}; - use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; - use zksync_state_keeper::io::IoCursor; - use zksync_types::{block::UnsealedL1BatchHeader, L1BatchNumber}; - - use crate::fetcher::IoCursorExt; - - #[tokio::test] - async fn io_cursor_recognizes_empty_unsealed_batch() -> anyhow::Result<()> { - let pool = ConnectionPool::::test_pool().await; - let mut conn = pool.connection().await.unwrap(); - insert_genesis_batch(&mut conn, &GenesisParams::mock()) - .await - .unwrap(); - conn.blocks_dal() - .insert_l1_batch(UnsealedL1BatchHeader { - number: L1BatchNumber(1), - timestamp: 1, - protocol_version: None, - fee_address: Default::default(), - fee_input: Default::default(), - }) - .await?; - - let io_cursor = IoCursor::for_fetcher(&mut conn).await?; - assert_eq!(io_cursor.l1_batch, L1BatchNumber(1)); - Ok(()) - } -} diff --git a/core/node/node_sync/src/sync_action.rs b/core/node/node_sync/src/sync_action.rs index 8cb90d24fe84..e3fd56ae9bb0 100644 --- a/core/node/node_sync/src/sync_action.rs +++ b/core/node/node_sync/src/sync_action.rs @@ -33,6 +33,18 @@ impl ActionQueueSender { Ok(()) } + /// Pushes a single action into the queue without checking validity of the sequence. + /// + /// Useful to simulate situations where only a part of the sequence was executed on the node. + #[cfg(test)] + pub async fn push_action_unchecked(&self, action: SyncAction) -> anyhow::Result<()> { + self.0 + .send(action) + .await + .map_err(|_| anyhow::anyhow!("node action processor stopped"))?; + Ok(()) + } + /// Checks whether the action sequence is valid. /// Returned error is meant to be used as a panic message, since an invalid sequence represents an unrecoverable /// error. This function itself does not panic for the ease of testing. diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 3f5791cdf24c..1ae148709b22 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -2,6 +2,7 @@ use std::{iter, sync::Arc, time::Duration}; +use backon::{ConstantBuilder, Retryable}; use test_casing::test_casing; use tokio::{sync::watch, task::JoinHandle}; use zksync_contracts::BaseSystemContractsHashes; @@ -18,7 +19,7 @@ use zksync_state_keeper::{ }; use zksync_types::{ api, - block::L2BlockHasher, + block::{L2BlockHasher, UnsealedL1BatchHeader}, fee_model::{BatchFeeInput, PubdataIndependentBatchFeeModelInput}, snapshots::SnapshotRecoveryStatus, Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, @@ -652,3 +653,101 @@ async fn external_io_with_multiple_l1_batches() { assert_eq!(fictive_l2_block.timestamp, 2); assert_eq!(fictive_l2_block.l2_tx_count, 0); } + +async fn wait_for_batch_to_be_open( + pool: &ConnectionPool, + number: L1BatchNumber, +) -> anyhow::Result { + (|| async { + let mut storage = pool.connection().await.unwrap(); + let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?; + + if let Some(unsealed_batch) = unsealed_batch { + if unsealed_batch.number == number { + Ok(unsealed_batch) + } else { + Err(anyhow::anyhow!("L1 batch #{number} is not open yet")) + } + } else { + Err(anyhow::anyhow!("No unsealed L1 batch found yet")) + } + }) + .retry( + &ConstantBuilder::default() + .with_delay(Duration::from_millis(200)) + .with_max_times(20), + ) + .await +} + +#[tokio::test] +async fn external_io_empty_unsealed_batch() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + ensure_genesis(&mut storage).await; + drop(storage); + + let open_batch_one = open_l1_batch(1, 1, 1); + let tx = create_l2_transaction(10, 100); + let tx_hash = tx.hash(); + let tx = FetchedTransaction::new(tx.into()); + let open_batch_two = open_l1_batch(2, 2, 3); + let fictive_l2_block = SyncAction::L2Block { + params: L2BlockParams { + timestamp: 2, + virtual_blocks: 0, + }, + number: L2BlockNumber(2), + }; + let actions1 = vec![open_batch_one, tx.into(), SyncAction::SealL2Block]; + let actions2 = vec![fictive_l2_block, SyncAction::SealBatch]; + + let (actions_sender, action_queue) = ActionQueue::new(); + let client = MockMainNodeClient::default(); + let state_keeper = + StateKeeperHandles::new(pool.clone(), client, action_queue, &[&[tx_hash]]).await; + actions_sender.push_actions(actions1).await.unwrap(); + actions_sender.push_actions(actions2).await.unwrap(); + // Unchecked insert of batch #2 to simulate restart in the middle of processing an action sequence + // In other words batch #2 is inserted completely empty with no blocks/txs present in it + actions_sender + .push_action_unchecked(open_batch_two.clone()) + .await + .unwrap(); + // Wait until the L2 block is sealed. + state_keeper.wait_for_local_block(L2BlockNumber(2)).await; + + // Wait until L1 batch #2 is opened and persisted. + let unsealed_batch = wait_for_batch_to_be_open(&pool, L1BatchNumber(2)) + .await + .unwrap(); + assert_eq!(unsealed_batch.number, L1BatchNumber(2)); + assert_eq!(unsealed_batch.timestamp, 2); + + // Prepare the rest of batch #2 + let tx = create_l2_transaction(20, 200); + let tx_hash = tx.hash(); + let tx = FetchedTransaction::new(tx.into()); + let fictive_l2_block = SyncAction::L2Block { + params: L2BlockParams { + timestamp: 4, + virtual_blocks: 0, + }, + number: L2BlockNumber(4), + }; + let actions1 = vec![open_batch_two, tx.into(), SyncAction::SealL2Block]; + let actions2 = vec![fictive_l2_block, SyncAction::SealBatch]; + + // Restart state keeper + let (actions_sender, action_queue) = ActionQueue::new(); + let client = MockMainNodeClient::default(); + let state_keeper = + StateKeeperHandles::new(pool.clone(), client, action_queue, &[&[tx_hash]]).await; + actions_sender.push_actions(actions1).await.unwrap(); + actions_sender.push_actions(actions2).await.unwrap(); + + let hash_task = tokio::spawn(mock_l1_batch_hash_computation(pool.clone(), 1)); + // Wait until the block #4 is sealed. + state_keeper.wait_for_local_block(L2BlockNumber(4)).await; + hash_task.await.unwrap(); +}