From e2c1b20e361e6ee2f5ac69cefe75d9c5575eb2f7 Mon Sep 17 00:00:00 2001
From: Alex Ostrovski <slowli@users.noreply.github.com>
Date: Thu, 30 Nov 2023 14:34:55 +0200
Subject: [PATCH] feat(merkle tree): Remove enumeration index assignment from
 Merkle tree (#551)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

## What ❔

Since enumeration indices are now fully stored in Postgres, it makes
sense to not duplicate their assignment in the Merkle tree. Instead, the
tree could take enum indices as inputs.

## Why ❔

This allows simplifying tree logic and unify "normal" L1 batch
processing and tree recovery. (This unification is not a part of this
PR; it'll be implemented separately.)

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
---
 .../lib/merkle_tree/examples/loadtest/main.rs |  24 +-
 core/lib/merkle_tree/examples/recovery.rs     |  10 +-
 core/lib/merkle_tree/src/consistency.rs       |  35 ++-
 core/lib/merkle_tree/src/domain.rs            | 145 ++++++------
 core/lib/merkle_tree/src/getters.rs           |  17 +-
 core/lib/merkle_tree/src/hasher/mod.rs        |  24 +-
 core/lib/merkle_tree/src/hasher/proofs.rs     |  62 +++--
 core/lib/merkle_tree/src/lib.rs               |  13 +-
 core/lib/merkle_tree/src/pruning.rs           |  28 +--
 core/lib/merkle_tree/src/recovery.rs          |  35 +--
 core/lib/merkle_tree/src/storage/mod.rs       |  62 ++---
 core/lib/merkle_tree/src/storage/patch.rs     |  21 +-
 core/lib/merkle_tree/src/storage/proofs.rs    | 224 ++----------------
 .../merkle_tree/src/storage/serialization.rs  |  11 +-
 core/lib/merkle_tree/src/storage/tests.rs     | 128 +++++-----
 core/lib/merkle_tree/src/types/internal.rs    |  22 +-
 core/lib/merkle_tree/src/types/mod.rs         |  93 ++++++--
 core/lib/merkle_tree/src/utils.rs             |   5 -
 .../merkle_tree/tests/integration/common.rs   |  50 ++--
 .../tests/integration/consistency.rs          |   6 +-
 .../merkle_tree/tests/integration/domain.rs   |  53 +++--
 .../tests/integration/merkle_tree.rs          | 116 +++++----
 .../merkle_tree/tests/integration/recovery.rs |  43 +---
 .../zksync_core/src/api_server/tree/mod.rs    |   2 +-
 .../src/metadata_calculator/helpers.rs        |  91 +++----
 .../src/metadata_calculator/metrics.rs        |   2 +-
 26 files changed, 591 insertions(+), 731 deletions(-)

diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs
index b598a579f6b4..527daa87b37a 100644
--- a/core/lib/merkle_tree/examples/loadtest/main.rs
+++ b/core/lib/merkle_tree/examples/loadtest/main.rs
@@ -15,7 +15,8 @@ use std::{
 
 use zksync_crypto::hasher::blake2::Blake2Hasher;
 use zksync_merkle_tree::{
-    Database, HashTree, MerkleTree, MerkleTreePruner, PatchSet, RocksDBWrapper, TreeInstruction,
+    Database, HashTree, MerkleTree, MerkleTreePruner, PatchSet, RocksDBWrapper, TreeEntry,
+    TreeInstruction,
 };
 use zksync_storage::{RocksDB, RocksDBOptions};
 use zksync_types::{AccountTreeId, Address, StorageKey, H256, U256};
@@ -135,19 +136,22 @@ impl Cli {
             next_key_idx += new_keys.len() as u64;
 
             next_value_idx += (new_keys.len() + updated_indices.len()) as u64;
-            let values = (next_value_idx..).map(H256::from_low_u64_be);
             let updated_keys = Self::generate_keys(updated_indices.into_iter());
-            let kvs = new_keys.into_iter().chain(updated_keys).zip(values);
+            let kvs = new_keys
+                .into_iter()
+                .chain(updated_keys)
+                .zip(next_value_idx..);
+            let kvs = kvs.map(|(key, idx)| {
+                // The assigned leaf indices here are not always correct, but it's OK for load test purposes.
+                TreeEntry::new(key, idx, H256::from_low_u64_be(idx))
+            });
 
             tracing::info!("Processing block #{version}");
             let start = Instant::now();
             let root_hash = if self.proofs {
-                let reads = Self::generate_keys(read_indices.into_iter())
-                    .map(|key| (key, TreeInstruction::Read));
-                let instructions = kvs
-                    .map(|(key, hash)| (key, TreeInstruction::Write(hash)))
-                    .chain(reads)
-                    .collect();
+                let reads =
+                    Self::generate_keys(read_indices.into_iter()).map(TreeInstruction::Read);
+                let instructions = kvs.map(TreeInstruction::Write).chain(reads).collect();
                 let output = tree.extend_with_proofs(instructions);
                 output.root_hash().unwrap()
             } else {
@@ -160,7 +164,7 @@ impl Cli {
 
         tracing::info!("Verifying tree consistency...");
         let start = Instant::now();
-        tree.verify_consistency(self.commit_count - 1)
+        tree.verify_consistency(self.commit_count - 1, false)
             .expect("tree consistency check failed");
         let elapsed = start.elapsed();
         tracing::info!("Verified tree consistency in {elapsed:?}");
diff --git a/core/lib/merkle_tree/examples/recovery.rs b/core/lib/merkle_tree/examples/recovery.rs
index af16ed05baf3..1a2aae236ea5 100644
--- a/core/lib/merkle_tree/examples/recovery.rs
+++ b/core/lib/merkle_tree/examples/recovery.rs
@@ -9,8 +9,8 @@ use std::time::Instant;
 
 use zksync_crypto::hasher::blake2::Blake2Hasher;
 use zksync_merkle_tree::{
-    recovery::{MerkleTreeRecovery, RecoveryEntry},
-    HashTree, Key, PatchSet, PruneDatabase, RocksDBWrapper, ValueHash,
+    recovery::MerkleTreeRecovery, HashTree, Key, PatchSet, PruneDatabase, RocksDBWrapper,
+    TreeEntry, ValueHash,
 };
 use zksync_storage::{RocksDB, RocksDBOptions};
 
@@ -94,7 +94,7 @@ impl Cli {
                 .map(|_| {
                     last_leaf_index += 1;
                     if self.random {
-                        RecoveryEntry {
+                        TreeEntry {
                             key: Key::from(rng.gen::<[u8; 32]>()),
                             value: ValueHash::zero(),
                             leaf_index: last_leaf_index,
@@ -102,7 +102,7 @@ impl Cli {
                     } else {
                         last_key += key_step - Key::from(rng.gen::<u64>());
                         // ^ Increases the key by a random increment close to `key` step with some randomness.
-                        RecoveryEntry {
+                        TreeEntry {
                             key: last_key,
                             value: ValueHash::zero(),
                             leaf_index: last_leaf_index,
@@ -127,7 +127,7 @@ impl Cli {
             recovery_started_at.elapsed()
         );
         let started_at = Instant::now();
-        tree.verify_consistency(recovered_version).unwrap();
+        tree.verify_consistency(recovered_version, true).unwrap();
         tracing::info!("Verified consistency in {:?}", started_at.elapsed());
     }
 }
diff --git a/core/lib/merkle_tree/src/consistency.rs b/core/lib/merkle_tree/src/consistency.rs
index 85896bad1ae1..2cc8996e64e9 100644
--- a/core/lib/merkle_tree/src/consistency.rs
+++ b/core/lib/merkle_tree/src/consistency.rs
@@ -69,10 +69,17 @@ pub enum ConsistencyError {
 impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
     /// Verifies the internal tree consistency as stored in the database.
     ///
+    /// If `validate_indices` flag is set, it will be checked that indices for all tree leaves are unique
+    /// and are sequentially assigned starting from 1.
+    ///
     /// # Errors
     ///
     /// Returns an error (the first encountered one if there are multiple).
-    pub fn verify_consistency(&self, version: u64) -> Result<(), ConsistencyError> {
+    pub fn verify_consistency(
+        &self,
+        version: u64,
+        validate_indices: bool,
+    ) -> Result<(), ConsistencyError> {
         let manifest = self.db.try_manifest()?;
         let manifest = manifest.ok_or(ConsistencyError::MissingVersion(version))?;
         if version >= manifest.version_count {
@@ -91,16 +98,19 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
         // We want to perform a depth-first walk of the tree in order to not keep
         // much in memory.
         let root_key = Nibbles::EMPTY.with_version(version);
-        let leaf_data = LeafConsistencyData::new(leaf_count);
-        self.validate_node(&root_node, root_key, &leaf_data)?;
-        leaf_data.validate_count()
+        let leaf_data = validate_indices.then(|| LeafConsistencyData::new(leaf_count));
+        self.validate_node(&root_node, root_key, leaf_data.as_ref())?;
+        if let Some(leaf_data) = leaf_data {
+            leaf_data.validate_count()?;
+        }
+        Ok(())
     }
 
     fn validate_node(
         &self,
         node: &Node,
         key: NodeKey,
-        leaf_data: &LeafConsistencyData,
+        leaf_data: Option<&LeafConsistencyData>,
     ) -> Result<ValueHash, ConsistencyError> {
         match node {
             Node::Leaf(leaf) => {
@@ -111,7 +121,9 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
                         full_key: leaf.full_key,
                     });
                 }
-                leaf_data.insert_leaf(leaf)?;
+                if let Some(leaf_data) = leaf_data {
+                    leaf_data.insert_leaf(leaf)?;
+                }
             }
 
             Node::Internal(node) => {
@@ -261,7 +273,10 @@ mod tests {
     use std::num::NonZeroU64;
 
     use super::*;
-    use crate::{types::InternalNode, PatchSet};
+    use crate::{
+        types::{InternalNode, TreeEntry},
+        PatchSet,
+    };
     use zksync_types::{H256, U256};
 
     const FIRST_KEY: Key = U256([0, 0, 0, 0x_dead_beef_0000_0000]);
@@ -270,8 +285,8 @@ mod tests {
     fn prepare_database() -> PatchSet {
         let mut tree = MerkleTree::new(PatchSet::default());
         tree.extend(vec![
-            (FIRST_KEY, H256([1; 32])),
-            (SECOND_KEY, H256([2; 32])),
+            TreeEntry::new(FIRST_KEY, 1, H256([1; 32])),
+            TreeEntry::new(SECOND_KEY, 2, H256([2; 32])),
         ]);
         tree.db
     }
@@ -300,7 +315,7 @@ mod tests {
             .num_threads(1)
             .build()
             .expect("failed initializing `rayon` thread pool");
-        thread_pool.install(|| MerkleTree::new(db).verify_consistency(0))
+        thread_pool.install(|| MerkleTree::new(db).verify_consistency(0, true))
     }
 
     #[test]
diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs
index bb82233aec28..0cd9a56a4866 100644
--- a/core/lib/merkle_tree/src/domain.rs
+++ b/core/lib/merkle_tree/src/domain.rs
@@ -5,7 +5,10 @@ use zksync_utils::h256_to_u256;
 
 use crate::{
     storage::{MerkleTreeColumnFamily, PatchSet, Patched, RocksDBWrapper},
-    types::{Key, Root, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash, TREE_DEPTH},
+    types::{
+        Key, Root, TreeEntry, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash,
+        TREE_DEPTH,
+    },
     BlockOutput, HashTree, MerkleTree, NoVersionError,
 };
 use zksync_crypto::hasher::blake2::Blake2Hasher;
@@ -13,7 +16,7 @@ use zksync_storage::RocksDB;
 use zksync_types::{
     proofs::{PrepareBasicCircuitsJob, StorageLogMetadata},
     writes::{InitialStorageWrite, RepeatedStorageWrite, StateDiffRecord},
-    L1BatchNumber, StorageKey, StorageLog, StorageLogKind, U256,
+    L1BatchNumber, StorageKey, U256,
 };
 
 /// Metadata for the current tree state.
@@ -65,17 +68,17 @@ impl ZkSyncTree {
 
     /// Returns metadata based on `storage_logs` generated by the genesis L1 batch. This does not
     /// create a persistent tree.
-    pub fn process_genesis_batch(storage_logs: &[StorageLog]) -> BlockOutput {
-        let kvs = Self::filter_write_logs(storage_logs);
+    pub fn process_genesis_batch(storage_logs: &[TreeInstruction<StorageKey>]) -> BlockOutput {
+        let kvs = Self::filter_write_instructions(storage_logs);
         tracing::info!(
             "Creating Merkle tree for genesis batch with {instr_count} writes",
             instr_count = kvs.len()
         );
 
-        let kvs = kvs
+        let kvs: Vec<_> = kvs
             .iter()
-            .map(|(k, v)| (k.hashed_key_u256(), *v))
-            .collect::<Vec<(Key, ValueHash)>>();
+            .map(|instr| instr.map_key(StorageKey::hashed_key_u256))
+            .collect();
 
         let mut in_memory_tree = MerkleTree::new(PatchSet::default());
         let output = in_memory_tree.extend(kvs);
@@ -170,29 +173,36 @@ impl ZkSyncTree {
     /// Panics if an inconsistency is detected.
     pub fn verify_consistency(&self, l1_batch_number: L1BatchNumber) {
         let version = u64::from(l1_batch_number.0);
-        self.tree.verify_consistency(version).unwrap_or_else(|err| {
-            panic!("Tree at version {version} is inconsistent: {err}");
-        });
+        self.tree
+            .verify_consistency(version, true)
+            .unwrap_or_else(|err| {
+                panic!("Tree at version {version} is inconsistent: {err}");
+            });
     }
 
     /// Processes an iterator of storage logs comprising a single L1 batch.
-    pub fn process_l1_batch(&mut self, storage_logs: &[StorageLog]) -> TreeMetadata {
+    pub fn process_l1_batch(
+        &mut self,
+        storage_logs: &[TreeInstruction<StorageKey>],
+    ) -> TreeMetadata {
         match self.mode {
             TreeMode::Full => self.process_l1_batch_full(storage_logs),
             TreeMode::Lightweight => self.process_l1_batch_lightweight(storage_logs),
         }
     }
 
-    fn process_l1_batch_full(&mut self, storage_logs: &[StorageLog]) -> TreeMetadata {
+    fn process_l1_batch_full(
+        &mut self,
+        instructions: &[TreeInstruction<StorageKey>],
+    ) -> TreeMetadata {
         let l1_batch_number = self.next_l1_batch_number();
-        let instructions = Self::transform_logs(storage_logs);
         let starting_leaf_count = self.tree.latest_root().leaf_count();
         let starting_root_hash = self.tree.latest_root_hash();
 
-        let instructions_with_hashed_keys = instructions
+        let instructions_with_hashed_keys: Vec<_> = instructions
             .iter()
-            .map(|(k, instr)| (k.hashed_key_u256(), *instr))
-            .collect::<Vec<(Key, TreeInstruction)>>();
+            .map(|instr| instr.map_key(StorageKey::hashed_key_u256))
+            .collect();
 
         tracing::info!(
             "Extending Merkle tree with batch #{l1_batch_number} with {instr_count} ops in full mode",
@@ -207,7 +217,7 @@ impl ZkSyncTree {
 
         let mut witness = PrepareBasicCircuitsJob::new(starting_leaf_count + 1);
         witness.reserve(output.logs.len());
-        for (log, (key, instruction)) in output.logs.iter().zip(&instructions) {
+        for (log, instruction) in output.logs.iter().zip(instructions) {
             let empty_levels_end = TREE_DEPTH - log.merkle_path.len();
             let empty_subtree_hashes =
                 (0..empty_levels_end).map(|i| Blake2Hasher.empty_subtree_hash(i));
@@ -218,20 +228,22 @@ impl ZkSyncTree {
                 .collect();
 
             let value_written = match instruction {
-                TreeInstruction::Write(value) => value.0,
-                TreeInstruction::Read => [0_u8; 32],
+                TreeInstruction::Write(entry) => entry.value.0,
+                TreeInstruction::Read(_) => [0_u8; 32],
             };
             let log = StorageLogMetadata {
                 root_hash: log.root_hash.0,
                 is_write: !log.base.is_read(),
-                first_write: matches!(log.base, TreeLogEntry::Inserted { .. }),
+                first_write: matches!(log.base, TreeLogEntry::Inserted),
                 merkle_paths,
-                leaf_hashed_key: key.hashed_key_u256(),
-                leaf_enumeration_index: match log.base {
-                    TreeLogEntry::Updated { leaf_index, .. }
-                    | TreeLogEntry::Inserted { leaf_index }
-                    | TreeLogEntry::Read { leaf_index, .. } => leaf_index,
-                    TreeLogEntry::ReadMissingKey => 0,
+                leaf_hashed_key: instruction.key().hashed_key_u256(),
+                leaf_enumeration_index: match instruction {
+                    TreeInstruction::Write(entry) => entry.leaf_index,
+                    TreeInstruction::Read(_) => match log.base {
+                        TreeLogEntry::Read { leaf_index, .. } => leaf_index,
+                        TreeLogEntry::ReadMissingKey => 0,
+                        _ => unreachable!("Read instructions always transform to Read / ReadMissingKey log entries"),
+                    }
                 },
                 value_written,
                 value_read: match log.base {
@@ -243,7 +255,7 @@ impl ZkSyncTree {
                         previous_value.0
                     }
                     TreeLogEntry::Read { value, .. } => value.0,
-                    TreeLogEntry::Inserted { .. } | TreeLogEntry::ReadMissingKey => [0_u8; 32],
+                    TreeLogEntry::Inserted | TreeLogEntry::ReadMissingKey => [0_u8; 32],
                 },
             };
             witness.push_merkle_path(log);
@@ -254,12 +266,12 @@ impl ZkSyncTree {
             .logs
             .into_iter()
             .filter_map(|log| (!log.base.is_read()).then_some(log.base));
-        let kvs = instructions.into_iter().filter_map(|(key, instruction)| {
-            let TreeInstruction::Write(value) = instruction else {
-                return None;
-            };
-            Some((key, value))
-        });
+        let kvs = instructions
+            .iter()
+            .filter_map(|instruction| match instruction {
+                TreeInstruction::Write(entry) => Some(*entry),
+                TreeInstruction::Read(_) => None,
+            });
         let (initial_writes, repeated_writes, state_diffs) = Self::extract_writes(logs, kvs);
 
         tracing::info!(
@@ -281,21 +293,9 @@ impl ZkSyncTree {
         }
     }
 
-    fn transform_logs(storage_logs: &[StorageLog]) -> Vec<(StorageKey, TreeInstruction)> {
-        let instructions = storage_logs.iter().map(|log| {
-            let key = log.key;
-            let instruction = match log.kind {
-                StorageLogKind::Write => TreeInstruction::Write(log.value),
-                StorageLogKind::Read => TreeInstruction::Read,
-            };
-            (key, instruction)
-        });
-        instructions.collect()
-    }
-
     fn extract_writes(
         logs: impl Iterator<Item = TreeLogEntry>,
-        kvs: impl Iterator<Item = (StorageKey, ValueHash)>,
+        entries: impl Iterator<Item = TreeEntry<StorageKey>>,
     ) -> (
         Vec<InitialStorageWrite>,
         Vec<RepeatedStorageWrite>,
@@ -304,13 +304,14 @@ impl ZkSyncTree {
         let mut initial_writes = vec![];
         let mut repeated_writes = vec![];
         let mut state_diffs = vec![];
-        for (log_entry, (key, value)) in logs.zip(kvs) {
+        for (log_entry, input_entry) in logs.zip(entries) {
+            let key = &input_entry.key;
             match log_entry {
-                TreeLogEntry::Inserted { leaf_index } => {
+                TreeLogEntry::Inserted => {
                     initial_writes.push(InitialStorageWrite {
-                        index: leaf_index,
+                        index: input_entry.leaf_index,
                         key: key.hashed_key_u256(),
-                        value,
+                        value: input_entry.value,
                     });
                     state_diffs.push(StateDiffRecord {
                         address: *key.address(),
@@ -318,25 +319,25 @@ impl ZkSyncTree {
                         derived_key: StorageKey::raw_hashed_key(key.address(), key.key()),
                         enumeration_index: 0u64,
                         initial_value: U256::default(),
-                        final_value: h256_to_u256(value),
+                        final_value: h256_to_u256(input_entry.value),
                     });
                 }
                 TreeLogEntry::Updated {
+                    previous_value: prev_value_hash,
                     leaf_index,
-                    previous_value,
                 } => {
-                    if previous_value != value {
+                    if prev_value_hash != input_entry.value {
                         repeated_writes.push(RepeatedStorageWrite {
-                            index: leaf_index,
-                            value,
+                            index: input_entry.leaf_index,
+                            value: input_entry.value,
                         });
                         state_diffs.push(StateDiffRecord {
                             address: *key.address(),
                             key: h256_to_u256(*key.key()),
                             derived_key: StorageKey::raw_hashed_key(key.address(), key.key()),
                             enumeration_index: leaf_index,
-                            initial_value: h256_to_u256(previous_value),
-                            final_value: h256_to_u256(value),
+                            initial_value: h256_to_u256(prev_value_hash),
+                            final_value: h256_to_u256(input_entry.value),
                         });
                     }
                     // Else we have a no-op update that must be omitted from `repeated_writes`.
@@ -348,8 +349,11 @@ impl ZkSyncTree {
         (initial_writes, repeated_writes, state_diffs)
     }
 
-    fn process_l1_batch_lightweight(&mut self, storage_logs: &[StorageLog]) -> TreeMetadata {
-        let kvs = Self::filter_write_logs(storage_logs);
+    fn process_l1_batch_lightweight(
+        &mut self,
+        instructions: &[TreeInstruction<StorageKey>],
+    ) -> TreeMetadata {
+        let kvs = Self::filter_write_instructions(instructions);
         let l1_batch_number = self.next_l1_batch_number();
         tracing::info!(
             "Extending Merkle tree with batch #{l1_batch_number} with {kv_count} writes \
@@ -357,10 +361,10 @@ impl ZkSyncTree {
             kv_count = kvs.len()
         );
 
-        let kvs_with_derived_key = kvs
+        let kvs_with_derived_key: Vec<_> = kvs
             .iter()
-            .map(|(k, v)| (k.hashed_key_u256(), *v))
-            .collect::<Vec<(Key, ValueHash)>>();
+            .map(|entry| entry.map_key(StorageKey::hashed_key_u256))
+            .collect();
 
         let output = if let Some(thread_pool) = &self.thread_pool {
             thread_pool.install(|| self.tree.extend(kvs_with_derived_key.clone()))
@@ -390,14 +394,15 @@ impl ZkSyncTree {
         }
     }
 
-    fn filter_write_logs(storage_logs: &[StorageLog]) -> Vec<(StorageKey, ValueHash)> {
-        let kvs = storage_logs.iter().filter_map(|log| match log.kind {
-            StorageLogKind::Write => {
-                let key = log.key;
-                Some((key, log.value))
-            }
-            StorageLogKind::Read => None,
-        });
+    fn filter_write_instructions(
+        instructions: &[TreeInstruction<StorageKey>],
+    ) -> Vec<TreeEntry<StorageKey>> {
+        let kvs = instructions
+            .iter()
+            .filter_map(|instruction| match instruction {
+                TreeInstruction::Write(entry) => Some(*entry),
+                TreeInstruction::Read(_) => None,
+            });
         kvs.collect()
     }
 
diff --git a/core/lib/merkle_tree/src/getters.rs b/core/lib/merkle_tree/src/getters.rs
index 67ce2aa98773..7fd6bfc96ed0 100644
--- a/core/lib/merkle_tree/src/getters.rs
+++ b/core/lib/merkle_tree/src/getters.rs
@@ -26,7 +26,7 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
                 let node = patch_set.get(longest_prefix);
                 match node {
                     Some(Node::Leaf(leaf)) if &leaf.full_key == leaf_key => (*leaf).into(),
-                    _ => TreeEntry::empty(),
+                    _ => TreeEntry::empty(*leaf_key),
                 }
             },
         )
@@ -76,11 +76,12 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
             |patch_set, &leaf_key, longest_prefix| {
                 let (leaf, merkle_path) =
                     patch_set.create_proof(&mut hasher, leaf_key, longest_prefix, 0);
-                let value_hash = leaf
+                let value = leaf
                     .as_ref()
                     .map_or_else(ValueHash::zero, |leaf| leaf.value_hash);
                 TreeEntry {
-                    value_hash,
+                    key: leaf_key,
+                    value,
                     leaf_index: leaf.map_or(0, |leaf| leaf.leaf_index),
                 }
                 .with_merkle_path(merkle_path.into_inner())
@@ -107,26 +108,26 @@ mod tests {
         let entries = tree.entries_with_proofs(0, &[missing_key]).unwrap();
         assert_eq!(entries.len(), 1);
         assert!(entries[0].base.is_empty());
-        entries[0].verify(&tree.hasher, missing_key, tree.hasher.empty_tree_hash());
+        entries[0].verify(&tree.hasher, tree.hasher.empty_tree_hash());
     }
 
     #[test]
     fn entries_in_single_node_tree() {
         let mut tree = MerkleTree::new(PatchSet::default());
         let key = Key::from(987_654);
-        let output = tree.extend(vec![(key, ValueHash::repeat_byte(1))]);
+        let output = tree.extend(vec![TreeEntry::new(key, 1, ValueHash::repeat_byte(1))]);
         let missing_key = Key::from(123);
 
         let entries = tree.entries(0, &[key, missing_key]).unwrap();
         assert_eq!(entries.len(), 2);
-        assert_eq!(entries[0].value_hash, ValueHash::repeat_byte(1));
+        assert_eq!(entries[0].value, ValueHash::repeat_byte(1));
         assert_eq!(entries[0].leaf_index, 1);
 
         let entries = tree.entries_with_proofs(0, &[key, missing_key]).unwrap();
         assert_eq!(entries.len(), 2);
         assert!(!entries[0].base.is_empty());
-        entries[0].verify(&tree.hasher, key, output.root_hash);
+        entries[0].verify(&tree.hasher, output.root_hash);
         assert!(entries[1].base.is_empty());
-        entries[1].verify(&tree.hasher, missing_key, output.root_hash);
+        entries[1].verify(&tree.hasher, output.root_hash);
     }
 }
diff --git a/core/lib/merkle_tree/src/hasher/mod.rs b/core/lib/merkle_tree/src/hasher/mod.rs
index 8b2478c43d34..9425a5836f02 100644
--- a/core/lib/merkle_tree/src/hasher/mod.rs
+++ b/core/lib/merkle_tree/src/hasher/mod.rs
@@ -11,7 +11,7 @@ pub(crate) use self::nodes::{InternalNodeCache, MerklePath};
 pub use self::proofs::TreeRangeDigest;
 use crate::{
     metrics::HashingStats,
-    types::{Key, ValueHash, TREE_DEPTH},
+    types::{TreeEntry, ValueHash, TREE_DEPTH},
 };
 use zksync_crypto::hasher::{blake2::Blake2Hasher, Hasher};
 
@@ -65,17 +65,11 @@ impl dyn HashTree + '_ {
         empty_hashes.chain(path.iter().copied())
     }
 
-    fn fold_merkle_path(
-        &self,
-        path: &[ValueHash],
-        key: Key,
-        value_hash: ValueHash,
-        leaf_index: u64,
-    ) -> ValueHash {
-        let mut hash = self.hash_leaf(&value_hash, leaf_index);
+    fn fold_merkle_path(&self, path: &[ValueHash], entry: TreeEntry) -> ValueHash {
+        let mut hash = self.hash_leaf(&entry.value, entry.leaf_index);
         let full_path = self.extend_merkle_path(path);
         for (depth, adjacent_hash) in full_path.enumerate() {
-            hash = if key.bit(depth) {
+            hash = if entry.key.bit(depth) {
                 self.hash_branch(&adjacent_hash, &hash)
             } else {
                 self.hash_branch(&hash, &adjacent_hash)
@@ -254,7 +248,7 @@ mod tests {
         let address: Address = "4b3af74f66ab1f0da3f2e4ec7a3cb99baf1af7b2".parse().unwrap();
         let key = StorageKey::new(AccountTreeId::new(address), H256::zero());
         let key = key.hashed_key_u256();
-        let leaf = LeafNode::new(key, H256([1; 32]), 1);
+        let leaf = LeafNode::new(TreeEntry::new(key, 1, H256([1; 32])));
 
         let stats = HashingStats::default();
         let mut hasher = (&Blake2Hasher as &dyn HashTree).with_stats(&stats);
@@ -265,7 +259,7 @@ mod tests {
         assert!(stats.hashed_bytes.into_inner() > 100);
 
         let hasher: &dyn HashTree = &Blake2Hasher;
-        let folded_hash = hasher.fold_merkle_path(&[], key, H256([1; 32]), 1);
+        let folded_hash = hasher.fold_merkle_path(&[], leaf.into());
         assert_eq!(folded_hash, EXPECTED_HASH);
     }
 
@@ -274,7 +268,7 @@ mod tests {
         let address: Address = "4b3af74f66ab1f0da3f2e4ec7a3cb99baf1af7b2".parse().unwrap();
         let key = StorageKey::new(AccountTreeId::new(address), H256::zero());
         let key = key.hashed_key_u256();
-        let leaf = LeafNode::new(key, H256([1; 32]), 1);
+        let leaf = LeafNode::new(TreeEntry::new(key, 1, H256([1; 32])));
 
         let mut hasher = HasherWithStats::new(&Blake2Hasher);
         let leaf_hash = leaf.hash(&mut hasher, 2);
@@ -283,9 +277,7 @@ mod tests {
         let expected_hash = hasher.hash_branch(&merkle_path[0], &leaf_hash);
         let expected_hash = hasher.hash_branch(&expected_hash, &merkle_path[1]);
 
-        let folded_hash = hasher
-            .inner
-            .fold_merkle_path(&merkle_path, key, H256([1; 32]), 1);
+        let folded_hash = hasher.inner.fold_merkle_path(&merkle_path, leaf.into());
         assert_eq!(folded_hash, expected_hash);
     }
 }
diff --git a/core/lib/merkle_tree/src/hasher/proofs.rs b/core/lib/merkle_tree/src/hasher/proofs.rs
index d97df0ad97d0..49d4bfe92958 100644
--- a/core/lib/merkle_tree/src/hasher/proofs.rs
+++ b/core/lib/merkle_tree/src/hasher/proofs.rs
@@ -22,36 +22,37 @@ impl BlockOutputWithProofs {
         &self,
         hasher: &dyn HashTree,
         old_root_hash: ValueHash,
-        instructions: &[(Key, TreeInstruction)],
+        instructions: &[TreeInstruction],
     ) {
         assert_eq!(instructions.len(), self.logs.len());
 
         let mut root_hash = old_root_hash;
-        for (op, &(key, instruction)) in self.logs.iter().zip(instructions) {
+        for (op, &instruction) in self.logs.iter().zip(instructions) {
             assert!(op.merkle_path.len() <= TREE_DEPTH);
-            if matches!(instruction, TreeInstruction::Read) {
+            if matches!(instruction, TreeInstruction::Read(_)) {
                 assert_eq!(op.root_hash, root_hash);
                 assert!(op.base.is_read());
             } else {
                 assert!(!op.base.is_read());
             }
 
-            let (prev_leaf_index, leaf_index, prev_value) = match op.base {
-                TreeLogEntry::Inserted { leaf_index } => (0, leaf_index, ValueHash::zero()),
+            let prev_entry = match op.base {
+                TreeLogEntry::Inserted | TreeLogEntry::ReadMissingKey => {
+                    TreeEntry::empty(instruction.key())
+                }
                 TreeLogEntry::Updated {
                     leaf_index,
-                    previous_value,
-                } => (leaf_index, leaf_index, previous_value),
-
-                TreeLogEntry::Read { leaf_index, value } => (leaf_index, leaf_index, value),
-                TreeLogEntry::ReadMissingKey => (0, 0, ValueHash::zero()),
+                    previous_value: value,
+                }
+                | TreeLogEntry::Read { leaf_index, value } => {
+                    TreeEntry::new(instruction.key(), leaf_index, value)
+                }
             };
 
-            let prev_hash =
-                hasher.fold_merkle_path(&op.merkle_path, key, prev_value, prev_leaf_index);
+            let prev_hash = hasher.fold_merkle_path(&op.merkle_path, prev_entry);
             assert_eq!(prev_hash, root_hash);
-            if let TreeInstruction::Write(value) = instruction {
-                let next_hash = hasher.fold_merkle_path(&op.merkle_path, key, value, leaf_index);
+            if let TreeInstruction::Write(new_entry) = instruction {
+                let next_hash = hasher.fold_merkle_path(&op.merkle_path, new_entry);
                 assert_eq!(next_hash, op.root_hash);
             }
             root_hash = op.root_hash;
@@ -65,19 +66,14 @@ impl TreeEntryWithProof {
     /// # Panics
     ///
     /// Panics if the proof doesn't verify.
-    pub fn verify(&self, hasher: &dyn HashTree, key: Key, trusted_root_hash: ValueHash) {
+    pub fn verify(&self, hasher: &dyn HashTree, trusted_root_hash: ValueHash) {
         if self.base.leaf_index == 0 {
             assert!(
-                self.base.value_hash.is_zero(),
+                self.base.value.is_zero(),
                 "Invalid missing value specification: leaf index is zero, but value is non-default"
             );
         }
-        let root_hash = hasher.fold_merkle_path(
-            &self.merkle_path,
-            key,
-            self.base.value_hash,
-            self.base.leaf_index,
-        );
+        let root_hash = hasher.fold_merkle_path(&self.merkle_path, self.base);
         assert_eq!(root_hash, trusted_root_hash, "Root hash mismatch");
     }
 }
@@ -146,11 +142,7 @@ impl<'a> TreeRangeDigest<'a> {
         let left_contour: Vec<_> = left_contour.collect();
         Self {
             hasher: HasherWithStats::new(hasher),
-            current_leaf: LeafNode::new(
-                start_key,
-                start_entry.base.value_hash,
-                start_entry.base.leaf_index,
-            ),
+            current_leaf: LeafNode::new(start_entry.base),
             left_contour: left_contour.try_into().unwrap(),
             // ^ `unwrap()` is safe by construction; `left_contour` will always have necessary length
         }
@@ -161,13 +153,13 @@ impl<'a> TreeRangeDigest<'a> {
     /// # Panics
     ///
     /// Panics if the provided `key` is not greater than the previous key provided to this digest.
-    pub fn update(&mut self, key: Key, entry: TreeEntry) {
+    pub fn update(&mut self, entry: TreeEntry) {
         assert!(
-            key > self.current_leaf.full_key,
+            entry.key > self.current_leaf.full_key,
             "Keys provided to a digest must be monotonically increasing"
         );
 
-        let diverging_level = utils::find_diverging_bit(self.current_leaf.full_key, key) + 1;
+        let diverging_level = utils::find_diverging_bit(self.current_leaf.full_key, entry.key) + 1;
 
         // Hash the current leaf up to the `diverging_level`, taking current `left_contour` into account.
         let mut hash = self
@@ -188,7 +180,7 @@ impl<'a> TreeRangeDigest<'a> {
         }
         // Record the computed hash.
         self.left_contour[TREE_DEPTH - diverging_level] = hash;
-        self.current_leaf = LeafNode::new(key, entry.value_hash, entry.leaf_index);
+        self.current_leaf = LeafNode::new(entry);
     }
 
     /// Finalizes this digest and returns the root hash of the tree.
@@ -196,8 +188,8 @@ impl<'a> TreeRangeDigest<'a> {
     /// # Panics
     ///
     /// Panics if the provided `final_key` is not greater than the previous key provided to this digest.
-    pub fn finalize(mut self, final_key: Key, final_entry: &TreeEntryWithProof) -> ValueHash {
-        self.update(final_key, final_entry.base);
+    pub fn finalize(mut self, final_entry: &TreeEntryWithProof) -> ValueHash {
+        self.update(final_entry.base);
 
         let full_path = self
             .hasher
@@ -206,9 +198,9 @@ impl<'a> TreeRangeDigest<'a> {
         let zipped_paths = self.left_contour.into_iter().zip(full_path);
         let mut hash = self
             .hasher
-            .hash_leaf(&final_entry.base.value_hash, final_entry.base.leaf_index);
+            .hash_leaf(&final_entry.base.value, final_entry.base.leaf_index);
         for (depth, (left, right)) in zipped_paths.enumerate() {
-            hash = if final_key.bit(depth) {
+            hash = if final_entry.base.key.bit(depth) {
                 self.hasher.hash_branch(&left, &hash)
             } else {
                 self.hasher.hash_branch(&hash, &right)
diff --git a/core/lib/merkle_tree/src/lib.rs b/core/lib/merkle_tree/src/lib.rs
index 166400cbb640..85ace50aada5 100644
--- a/core/lib/merkle_tree/src/lib.rs
+++ b/core/lib/merkle_tree/src/lib.rs
@@ -26,10 +26,15 @@
 //! - Hash of a vacant leaf is `hash([0_u8; 40])`, where `hash` is the hash function used
 //!   (Blake2s-256).
 //! - Hash of an occupied leaf is `hash(u64::to_be_bytes(leaf_index) ++ value_hash)`,
-//!   where `leaf_index` is the 1-based index of the leaf key in the order of insertion,
+//!   where `leaf_index` is a 1-based index of the leaf key provided when the leaf is inserted / updated,
 //!   `++` is byte concatenation.
 //! - Hash of an internal node is `hash(left_child_hash ++ right_child_hash)`.
 //!
+//! Currently in zksync, leaf indices enumerate leaves in the order of their insertion into the tree.
+//! Indices are computed externally and are provided to the tree as inputs; the tree doesn't verify
+//! index assignment and doesn't rely on particular index assignment assumptions (other than when
+//! [verifying tree consistency](MerkleTree::verify_consistency())).
+//!
 //! [Jellyfish Merkle tree]: https://developers.diem.com/papers/jellyfish-merkle-tree/2021-01-14.pdf
 
 // Linter settings.
@@ -209,10 +214,10 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
     /// # Return value
     ///
     /// Returns information about the update such as the final tree hash.
-    pub fn extend(&mut self, key_value_pairs: Vec<(Key, ValueHash)>) -> BlockOutput {
+    pub fn extend(&mut self, entries: Vec<TreeEntry>) -> BlockOutput {
         let next_version = self.db.manifest().unwrap_or_default().version_count;
         let storage = Storage::new(&self.db, &self.hasher, next_version, true);
-        let (output, patch) = storage.extend(key_value_pairs);
+        let (output, patch) = storage.extend(entries);
         self.db.apply_patch(patch);
         output
     }
@@ -226,7 +231,7 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
     /// instruction.
     pub fn extend_with_proofs(
         &mut self,
-        instructions: Vec<(Key, TreeInstruction)>,
+        instructions: Vec<TreeInstruction>,
     ) -> BlockOutputWithProofs {
         let next_version = self.db.manifest().unwrap_or_default().version_count;
         let storage = Storage::new(&self.db, &self.hasher, next_version, true);
diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs
index 21a3e8712fd7..5b1911ca6005 100644
--- a/core/lib/merkle_tree/src/pruning.rs
+++ b/core/lib/merkle_tree/src/pruning.rs
@@ -187,7 +187,7 @@ mod tests {
     use super::*;
     use crate::{
         types::{Node, NodeKey},
-        Database, Key, MerkleTree, PatchSet, ValueHash,
+        Database, Key, MerkleTree, PatchSet, TreeEntry, ValueHash,
     };
 
     fn create_db() -> PatchSet {
@@ -195,7 +195,7 @@ mod tests {
         for i in 0..5 {
             let key = Key::from(i);
             let value = ValueHash::from_low_u64_be(i);
-            MerkleTree::new(&mut db).extend(vec![(key, value)]);
+            MerkleTree::new(&mut db).extend(vec![TreeEntry::new(key, i + 1, value)]);
         }
         db
     }
@@ -245,9 +245,9 @@ mod tests {
         assert!(start.elapsed() < Duration::from_secs(10));
     }
 
-    fn generate_key_value_pairs(indexes: impl Iterator<Item = u64>) -> Vec<(Key, ValueHash)> {
+    fn generate_key_value_pairs(indexes: impl Iterator<Item = u64>) -> Vec<TreeEntry> {
         indexes
-            .map(|i| (Key::from(i), ValueHash::from_low_u64_be(i)))
+            .map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::from_low_u64_be(i)))
             .collect()
     }
 
@@ -273,7 +273,7 @@ mod tests {
 
         let mut tree = MerkleTree::new(&mut db);
         for version in first_retained_version..=latest_version {
-            tree.verify_consistency(version).unwrap();
+            tree.verify_consistency(version, true).unwrap();
         }
 
         let kvs = generate_key_value_pairs(100..200);
@@ -290,7 +290,7 @@ mod tests {
 
         let tree = MerkleTree::new(&mut db);
         for version in first_retained_version..=latest_version {
-            tree.verify_consistency(version).unwrap();
+            tree.verify_consistency(version, true).unwrap();
         }
         assert_no_stale_keys(&db, first_retained_version);
     }
@@ -318,8 +318,8 @@ mod tests {
         const ITERATIVE_BATCH_COUNT: usize = 10;
 
         let mut db = PatchSet::default();
-        let kvs: Vec<_> = (0_u32..100)
-            .map(|i| (Key::from(i), ValueHash::zero()))
+        let kvs: Vec<_> = (0_u64..100)
+            .map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::zero()))
             .collect();
 
         let batch_count = if initialize_iteratively {
@@ -335,8 +335,8 @@ mod tests {
 
         // Completely overwrite all keys.
         let new_value_hash = ValueHash::from_low_u64_be(1_000);
-        let new_kvs = (0_u32..100)
-            .map(|i| (Key::from(i), new_value_hash))
+        let new_kvs = (0_u64..100)
+            .map(|i| TreeEntry::new(Key::from(i), i + 1, new_value_hash))
             .collect();
         MerkleTree::new(&mut db).extend(new_kvs);
 
@@ -364,16 +364,16 @@ mod tests {
         prune_iteratively: bool,
     ) {
         let mut db = PatchSet::default();
-        let kvs: Vec<_> = (0_u32..100)
-            .map(|i| (Key::from(i), ValueHash::zero()))
+        let kvs: Vec<_> = (0_u64..100)
+            .map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::zero()))
             .collect();
         MerkleTree::new(&mut db).extend(kvs);
         let leaf_keys_in_db = leaf_keys(&mut db);
 
         // Completely overwrite all keys in several batches.
         let new_value_hash = ValueHash::from_low_u64_be(1_000);
-        let new_kvs: Vec<_> = (0_u32..100)
-            .map(|i| (Key::from(i), new_value_hash))
+        let new_kvs: Vec<_> = (0_u64..100)
+            .map(|i| TreeEntry::new(Key::from(i), i + 1, new_value_hash))
             .collect();
         for chunk in new_kvs.chunks(20) {
             MerkleTree::new(&mut db).extend(chunk.to_vec());
diff --git a/core/lib/merkle_tree/src/recovery.rs b/core/lib/merkle_tree/src/recovery.rs
index 85ac578cc0a1..d1f2618a5cdd 100644
--- a/core/lib/merkle_tree/src/recovery.rs
+++ b/core/lib/merkle_tree/src/recovery.rs
@@ -40,23 +40,11 @@ use std::time::Instant;
 use crate::{
     hasher::{HashTree, HasherWithStats},
     storage::{PatchSet, PruneDatabase, PrunePatchSet, Storage},
-    types::{Key, Manifest, Root, TreeTags, ValueHash},
+    types::{Key, Manifest, Root, TreeEntry, TreeTags, ValueHash},
     MerkleTree,
 };
 use zksync_crypto::hasher::blake2::Blake2Hasher;
 
-/// Entry in a Merkle tree used during recovery.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct RecoveryEntry {
-    /// Entry key.
-    pub key: Key,
-    /// Entry value.
-    pub value: ValueHash,
-    /// Leaf index associated with the entry. It is **not** checked whether leaf indices are well-formed
-    /// during recovery (e.g., that they are unique).
-    pub leaf_index: u64,
-}
-
 /// Handle to a Merkle tree during its recovery.
 #[derive(Debug)]
 pub struct MerkleTreeRecovery<DB, H = Blake2Hasher> {
@@ -154,7 +142,7 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
             %entries.key_range = entries_key_range(&entries),
         ),
     )]
-    pub fn extend_linear(&mut self, entries: Vec<RecoveryEntry>) {
+    pub fn extend_linear(&mut self, entries: Vec<TreeEntry>) {
         tracing::debug!("Started extending tree");
 
         let started_at = Instant::now();
@@ -177,7 +165,7 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
             entries.len = entries.len(),
         ),
     )]
-    pub fn extend_random(&mut self, entries: Vec<RecoveryEntry>) {
+    pub fn extend_random(&mut self, entries: Vec<TreeEntry>) {
         tracing::debug!("Started extending tree");
 
         let started_at = Instant::now();
@@ -242,7 +230,7 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
     }
 }
 
-fn entries_key_range(entries: &[RecoveryEntry]) -> String {
+fn entries_key_range(entries: &[TreeEntry]) -> String {
     let (Some(first), Some(last)) = (entries.first(), entries.last()) else {
         return "(empty)".to_owned();
     };
@@ -280,11 +268,7 @@ mod tests {
     #[test]
     fn recovering_tree_with_single_node() {
         let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), 42);
-        let recovery_entry = RecoveryEntry {
-            key: Key::from(123),
-            value: ValueHash::repeat_byte(1),
-            leaf_index: 1,
-        };
+        let recovery_entry = TreeEntry::new(Key::from(123), 1, ValueHash::repeat_byte(1));
         recovery.extend_linear(vec![recovery_entry]);
         let tree = recovery.finalize();
 
@@ -292,13 +276,8 @@ mod tests {
         let mut hasher = HasherWithStats::new(&Blake2Hasher);
         assert_eq!(
             tree.latest_root_hash(),
-            LeafNode::new(
-                recovery_entry.key,
-                recovery_entry.value,
-                recovery_entry.leaf_index
-            )
-            .hash(&mut hasher, 0)
+            LeafNode::new(recovery_entry).hash(&mut hasher, 0)
         );
-        tree.verify_consistency(42).unwrap();
+        tree.verify_consistency(42, true).unwrap();
     }
 }
diff --git a/core/lib/merkle_tree/src/storage/mod.rs b/core/lib/merkle_tree/src/storage/mod.rs
index c5a56abfca90..ae273d22f323 100644
--- a/core/lib/merkle_tree/src/storage/mod.rs
+++ b/core/lib/merkle_tree/src/storage/mod.rs
@@ -18,12 +18,10 @@ pub use self::{
 use crate::{
     hasher::HashTree,
     metrics::{TreeUpdaterStats, BLOCK_TIMINGS, GENERAL_METRICS},
-    recovery::RecoveryEntry,
     types::{
         BlockOutput, ChildRef, InternalNode, Key, LeafNode, Manifest, Nibbles, Node, Root,
-        TreeLogEntry, TreeTags, ValueHash,
+        TreeEntry, TreeLogEntry, TreeTags, ValueHash,
     },
-    utils::increment_counter,
 };
 
 /// Tree operation: either inserting a new version or updating an existing one (the latter is only
@@ -132,17 +130,17 @@ impl TreeUpdater {
     /// hashes for all updated nodes in [`Self::finalize()`].
     fn insert(
         &mut self,
-        key: Key,
-        value_hash: ValueHash,
+        entry: TreeEntry,
         parent_nibbles: &Nibbles,
-        leaf_index_fn: impl FnOnce() -> u64,
     ) -> (TreeLogEntry, NewLeafData) {
         let version = self.patch_set.root_version();
+        let key = entry.key;
+
         let traverse_outcome = self.patch_set.traverse(key, parent_nibbles);
         let (log, leaf_data) = match traverse_outcome {
             TraverseOutcome::LeafMatch(nibbles, mut leaf) => {
-                let log = TreeLogEntry::update(leaf.value_hash, leaf.leaf_index);
-                leaf.value_hash = value_hash;
+                let log = TreeLogEntry::update(leaf.leaf_index, leaf.value_hash);
+                leaf.update_from(entry);
                 self.patch_set.insert(nibbles, leaf.into());
                 self.metrics.updated_leaves += 1;
                 (log, NewLeafData::new(nibbles, leaf))
@@ -173,23 +171,20 @@ impl TreeUpdater {
                     nibble_idx += 1;
                 }
 
-                let leaf_index = leaf_index_fn();
-                let new_leaf = LeafNode::new(key, value_hash, leaf_index);
+                let new_leaf = LeafNode::new(entry);
                 let new_leaf_nibbles = Nibbles::new(&key, nibble_idx + 1);
                 let leaf_data = NewLeafData::new(new_leaf_nibbles, new_leaf);
                 let moved_leaf_nibbles = Nibbles::new(&leaf.full_key, nibble_idx + 1);
                 let leaf_data = leaf_data.with_adjacent_leaf(moved_leaf_nibbles, leaf);
-                (TreeLogEntry::insert(leaf_index), leaf_data)
+                (TreeLogEntry::Inserted, leaf_data)
             }
 
             TraverseOutcome::MissingChild(nibbles) if nibbles.nibble_count() == 0 => {
                 // The root is currently empty; we replace it with a leaf.
-                let leaf_index = leaf_index_fn();
-                debug_assert_eq!(leaf_index, 1);
-                let root_leaf = LeafNode::new(key, value_hash, leaf_index);
+                let root_leaf = LeafNode::new(entry);
                 self.set_root_node(root_leaf.into());
                 let leaf_data = NewLeafData::new(Nibbles::EMPTY, root_leaf);
-                (TreeLogEntry::insert(1), leaf_data)
+                (TreeLogEntry::Inserted, leaf_data)
             }
 
             TraverseOutcome::MissingChild(nibbles) => {
@@ -198,10 +193,9 @@ impl TreeUpdater {
                     unreachable!("Node parent must be an internal node");
                 };
                 parent.insert_child_ref(last_nibble, ChildRef::leaf(version));
-                let leaf_index = leaf_index_fn();
-                let new_leaf = LeafNode::new(key, value_hash, leaf_index);
+                let new_leaf = LeafNode::new(entry);
                 let leaf_data = NewLeafData::new(nibbles, new_leaf);
-                (TreeLogEntry::insert(leaf_index), leaf_data)
+                (TreeLogEntry::Inserted, leaf_data)
             }
         };
 
@@ -289,19 +283,20 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
 
     /// Extends the Merkle tree in the lightweight operation mode, without intermediate hash
     /// computations.
-    pub fn extend(mut self, key_value_pairs: Vec<(Key, ValueHash)>) -> (BlockOutput, PatchSet) {
+    pub fn extend(mut self, entries: Vec<TreeEntry>) -> (BlockOutput, PatchSet) {
         let load_nodes_latency = BLOCK_TIMINGS.load_nodes.start();
-        let sorted_keys = SortedKeys::new(key_value_pairs.iter().map(|(key, _)| *key));
+        let sorted_keys = SortedKeys::new(entries.iter().map(|entry| entry.key));
         let parent_nibbles = self.updater.load_ancestors(&sorted_keys, self.db);
         let load_nodes_latency = load_nodes_latency.observe();
         tracing::debug!("Load stage took {load_nodes_latency:?}");
 
         let extend_patch_latency = BLOCK_TIMINGS.extend_patch.start();
-        let mut logs = Vec::with_capacity(key_value_pairs.len());
-        for ((key, value_hash), parent_nibbles) in key_value_pairs.into_iter().zip(parent_nibbles) {
-            let (log, _) = self.updater.insert(key, value_hash, &parent_nibbles, || {
-                increment_counter(&mut self.leaf_count)
-            });
+        let mut logs = Vec::with_capacity(entries.len());
+        for (entry, parent_nibbles) in entries.into_iter().zip(parent_nibbles) {
+            let (log, _) = self.updater.insert(entry, &parent_nibbles);
+            if matches!(log, TreeLogEntry::Inserted) {
+                self.leaf_count += 1;
+            }
             logs.push(log);
         }
         let extend_patch_latency = extend_patch_latency.observe();
@@ -321,10 +316,7 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
         Some(self.updater.load_greatest_key(self.db)?.0.full_key)
     }
 
-    pub fn extend_during_linear_recovery(
-        mut self,
-        recovery_entries: Vec<RecoveryEntry>,
-    ) -> PatchSet {
+    pub fn extend_during_linear_recovery(mut self, recovery_entries: Vec<TreeEntry>) -> PatchSet {
         let (mut prev_key, mut prev_nibbles) = match self.updater.load_greatest_key(self.db) {
             Some((leaf, nibbles)) => (Some(leaf.full_key), nibbles),
             None => (None, Nibbles::EMPTY),
@@ -343,9 +335,7 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
 
             let key_nibbles = Nibbles::new(&entry.key, prev_nibbles.nibble_count());
             let parent_nibbles = prev_nibbles.common_prefix(&key_nibbles);
-            let (_, new_leaf) =
-                self.updater
-                    .insert(entry.key, entry.value, &parent_nibbles, || entry.leaf_index);
+            let (_, new_leaf) = self.updater.insert(entry, &parent_nibbles);
             prev_nibbles = new_leaf.nibbles;
             self.leaf_count += 1;
         }
@@ -356,10 +346,7 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
         patch
     }
 
-    pub fn extend_during_random_recovery(
-        mut self,
-        recovery_entries: Vec<RecoveryEntry>,
-    ) -> PatchSet {
+    pub fn extend_during_random_recovery(mut self, recovery_entries: Vec<TreeEntry>) -> PatchSet {
         let load_nodes_latency = BLOCK_TIMINGS.load_nodes.start();
         let sorted_keys = SortedKeys::new(recovery_entries.iter().map(|entry| entry.key));
         let parent_nibbles = self.updater.load_ancestors(&sorted_keys, self.db);
@@ -368,8 +355,7 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
 
         let extend_patch_latency = BLOCK_TIMINGS.extend_patch.start();
         for (entry, parent_nibbles) in recovery_entries.into_iter().zip(parent_nibbles) {
-            self.updater
-                .insert(entry.key, entry.value, &parent_nibbles, || entry.leaf_index);
+            self.updater.insert(entry, &parent_nibbles);
             self.leaf_count += 1;
         }
         let extend_patch_latency = extend_patch_latency.observe();
diff --git a/core/lib/merkle_tree/src/storage/patch.rs b/core/lib/merkle_tree/src/storage/patch.rs
index 6d0c38d6c9fb..ff41fb2f6bf3 100644
--- a/core/lib/merkle_tree/src/storage/patch.rs
+++ b/core/lib/merkle_tree/src/storage/patch.rs
@@ -680,7 +680,7 @@ mod tests {
     use super::*;
     use crate::{
         storage::Storage,
-        types::{Key, LeafNode},
+        types::{Key, LeafNode, TreeEntry},
     };
 
     fn patch_len(patch: &WorkingPatchSet) -> usize {
@@ -697,7 +697,7 @@ mod tests {
             let key = Key::from_little_endian(&[i; 32]);
             let nibbles = Nibbles::new(&key, 2 + usize::from(i) % 4);
             // ^ We need nibble count at least 2 for all `nibbles` to be distinct.
-            let leaf = LeafNode::new(key, ValueHash::zero(), i.into());
+            let leaf = LeafNode::new(TreeEntry::new(key, i.into(), ValueHash::zero()));
             patch.insert(nibbles, leaf.into());
             nibbles
         });
@@ -742,7 +742,8 @@ mod tests {
         // Test DB with a single entry.
         let mut db = PatchSet::default();
         let key = Key::from(1234_u64);
-        let (_, patch) = Storage::new(&db, &(), 0, true).extend(vec![(key, ValueHash::zero())]);
+        let (_, patch) =
+            Storage::new(&db, &(), 0, true).extend(vec![TreeEntry::new(key, 1, ValueHash::zero())]);
         db.apply_patch(patch);
 
         let mut patch = WorkingPatchSet::new(1, db.root(0).unwrap());
@@ -754,8 +755,11 @@ mod tests {
 
         // Test DB with multiple entries.
         let other_key = Key::from_little_endian(&[0xa0; 32]);
-        let (_, patch) =
-            Storage::new(&db, &(), 1, true).extend(vec![(other_key, ValueHash::zero())]);
+        let (_, patch) = Storage::new(&db, &(), 1, true).extend(vec![TreeEntry::new(
+            other_key,
+            2,
+            ValueHash::zero(),
+        )]);
         db.apply_patch(patch);
 
         let mut patch = WorkingPatchSet::new(2, db.root(1).unwrap());
@@ -766,8 +770,11 @@ mod tests {
         assert_eq!(load_result.db_reads, 1);
 
         let greater_key = Key::from_little_endian(&[0xaf; 32]);
-        let (_, patch) =
-            Storage::new(&db, &(), 2, true).extend(vec![(greater_key, ValueHash::zero())]);
+        let (_, patch) = Storage::new(&db, &(), 2, true).extend(vec![TreeEntry::new(
+            greater_key,
+            3,
+            ValueHash::zero(),
+        )]);
         db.apply_patch(patch);
 
         let mut patch = WorkingPatchSet::new(3, db.root(2).unwrap());
diff --git a/core/lib/merkle_tree/src/storage/proofs.rs b/core/lib/merkle_tree/src/storage/proofs.rs
index 9e2d172bd6bd..81f140088d37 100644
--- a/core/lib/merkle_tree/src/storage/proofs.rs
+++ b/core/lib/merkle_tree/src/storage/proofs.rs
@@ -15,26 +15,6 @@
 //! with root at level 4 (= 1 nibble). Thus, the patch sets and Merkle proofs
 //! produced by each group are mostly disjoint; they intersect only at the root node level.
 //!
-//! ## Computing leaf indices
-//!
-//! We need to determine leaf indices for all write instructions. Indices potentially depend
-//! on the entire list of `instructions`, so we should determine leaf indices before
-//! parallelization. Otherwise, we'd need to sync between parallelized tasks, which defeats
-//! the purpose of parallelization.
-//!
-//! We precompute indices as a separate step using the following observations:
-//!
-//! - If a leaf is present in the tree *before* `instructions` are applied, its index
-//!   can be obtained from the node ancestors loaded on the first step of the process.
-//! - Otherwise, a leaf may have been added by a previous instruction for the same key.
-//!   Since we already need [`SortedKeys`] to efficiently load ancestors, it's easy
-//!   to determine such pairs of instructions.
-//! - Otherwise, we have a first write, and the leaf index is defined as the current leaf
-//!   count.
-//!
-//! In summary, we can determine leaf indices for all write `instructions` in linear time
-//! and without synchronization required during the parallel steps of the process.
-//!
 //! ## Merging Merkle proofs
 //!
 //! The proofs produced by different groups only intersect at levels 0..4. This can be dealt with
@@ -68,7 +48,7 @@ use crate::{
         BlockOutputWithProofs, InternalNode, Key, Nibbles, Node, TreeInstruction, TreeLogEntry,
         TreeLogEntryWithProof, ValueHash,
     },
-    utils::{increment_counter, merge_by_index},
+    utils::merge_by_index,
 };
 
 /// Number of subtrees used for parallel computations.
@@ -93,16 +73,13 @@ impl TreeUpdater {
         for instruction in instructions {
             let InstructionWithPrecomputes {
                 index,
-                key,
                 instruction,
                 parent_nibbles,
-                leaf_index,
             } = instruction;
 
             let log = match instruction {
-                TreeInstruction::Write(value_hash) => {
-                    let (log, leaf_data) =
-                        self.insert(key, value_hash, &parent_nibbles, || leaf_index);
+                TreeInstruction::Write(entry) => {
+                    let (log, leaf_data) = self.insert(entry, &parent_nibbles);
                     let (new_root_hash, merkle_path) = self.update_node_hashes(hasher, &leaf_data);
                     root_hash = new_root_hash;
                     TreeLogEntryWithProof {
@@ -111,7 +88,7 @@ impl TreeUpdater {
                         root_hash,
                     }
                 }
-                TreeInstruction::Read => {
+                TreeInstruction::Read(key) => {
                     let (log, merkle_path) = self.prove(hasher, key, &parent_nibbles);
                     TreeLogEntryWithProof {
                         base: log,
@@ -183,7 +160,7 @@ impl TreeUpdater {
             self.patch_set
                 .create_proof(hasher, key, parent_nibbles, SUBTREE_ROOT_LEVEL / 4);
         let operation = leaf.map_or(TreeLogEntry::ReadMissingKey, |leaf| {
-            TreeLogEntry::read(leaf.value_hash, leaf.leaf_index)
+            TreeLogEntry::read(leaf.leaf_index, leaf.value_hash)
         });
 
         if matches!(operation, TreeLogEntry::ReadMissingKey) {
@@ -259,16 +236,14 @@ impl TreeUpdater {
 impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
     pub fn extend_with_proofs(
         mut self,
-        instructions: Vec<(Key, TreeInstruction)>,
+        instructions: Vec<TreeInstruction>,
     ) -> (BlockOutputWithProofs, PatchSet) {
         let load_nodes_latency = BLOCK_TIMINGS.load_nodes.start();
-        let sorted_keys = SortedKeys::new(instructions.iter().map(|(key, _)| *key));
+        let sorted_keys = SortedKeys::new(instructions.iter().map(TreeInstruction::key));
         let parent_nibbles = self.updater.load_ancestors(&sorted_keys, self.db);
         load_nodes_latency.observe();
 
-        let leaf_indices = self.compute_leaf_indices(&instructions, sorted_keys, &parent_nibbles);
-        let instruction_parts =
-            InstructionWithPrecomputes::split(instructions, parent_nibbles, leaf_indices);
+        let instruction_parts = InstructionWithPrecomputes::split(instructions, parent_nibbles);
         let initial_root = self.updater.patch_set.ensure_internal_root_node();
         let initial_metrics = self.updater.metrics;
         let storage_parts = self.updater.split();
@@ -310,44 +285,13 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
         output_with_proofs
     }
 
-    /// Computes leaf indices for all writes in `instructions`. Leaf indices are not used for reads;
-    /// thus, the corresponding entries are always 0.
-    fn compute_leaf_indices(
-        &mut self,
-        instructions: &[(Key, TreeInstruction)],
-        mut sorted_keys: SortedKeys,
-        parent_nibbles: &[Nibbles],
-    ) -> Vec<u64> {
-        sorted_keys.remove_read_instructions(instructions);
-        let key_mentions = sorted_keys.key_mentions(instructions.len());
-        let patch_set = &self.updater.patch_set;
-
-        let mut leaf_indices = Vec::with_capacity(instructions.len());
-        let it = instructions.iter().zip(parent_nibbles).enumerate();
-        for (idx, ((key, instruction), nibbles)) in it {
-            let leaf_index = match (instruction, key_mentions[idx]) {
-                (TreeInstruction::Read, _) => 0,
-                // ^ Leaf indices are not used for read instructions.
-                (TreeInstruction::Write(_), KeyMention::First) => {
-                    let leaf_index = match patch_set.get(nibbles) {
-                        Some(Node::Leaf(leaf)) if leaf.full_key == *key => Some(leaf.leaf_index),
-                        _ => None,
-                    };
-                    leaf_index.unwrap_or_else(|| increment_counter(&mut self.leaf_count))
-                }
-                (TreeInstruction::Write(_), KeyMention::SameAs(prev_idx)) => leaf_indices[prev_idx],
-            };
-            leaf_indices.push(leaf_index);
-        }
-        leaf_indices
-    }
-
     fn finalize_with_proofs(
         mut self,
         hasher: &mut HasherWithStats<'_>,
         root: InternalNode,
         logs: Vec<(usize, TreeLogEntryWithProof<MerklePath>)>,
     ) -> (BlockOutputWithProofs, PatchSet) {
+        self.leaf_count += self.updater.metrics.new_leaves;
         tracing::debug!(
             "Finished updating tree; total leaf count: {}, stats: {:?}",
             self.leaf_count,
@@ -370,95 +314,35 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
     }
 }
 
-/// Mention of a key in a block: either the first mention, or the same mention as the specified
-/// 0-based index in the block.
-#[derive(Debug, Clone, Copy)]
-enum KeyMention {
-    First,
-    SameAs(usize),
-}
-
-impl SortedKeys {
-    fn remove_read_instructions(&mut self, instructions: &[(Key, TreeInstruction)]) {
-        debug_assert_eq!(instructions.len(), self.0.len());
-
-        self.0.retain(|(idx, key)| {
-            let (key_for_instruction, instruction) = &instructions[*idx];
-            debug_assert_eq!(key_for_instruction, key);
-            matches!(instruction, TreeInstruction::Write(_))
-        });
-    }
-
-    /// Determines for the original sequence of `Key`s whether a particular key mention
-    /// is the first one, or it follows after another mention.
-    fn key_mentions(&self, original_len: usize) -> Vec<KeyMention> {
-        debug_assert!(original_len >= self.0.len());
-
-        let mut flags = vec![KeyMention::First; original_len];
-        let [(mut first_key_mention, mut prev_key), tail @ ..] = self.0.as_slice() else {
-            return flags;
-        };
-
-        // Note that `SameAs(_)` doesn't necessarily reference the first mention of a key,
-        // just one with a lesser index. This is OK for our purposes.
-        for &(idx, key) in tail {
-            if prev_key == key {
-                if idx > first_key_mention {
-                    flags[idx] = KeyMention::SameAs(first_key_mention);
-                } else {
-                    debug_assert!(idx < first_key_mention); // all indices should be unique
-                    flags[first_key_mention] = KeyMention::SameAs(idx);
-                    first_key_mention = idx;
-                }
-            } else {
-                prev_key = key;
-                first_key_mention = idx;
-            }
-        }
-        flags
-    }
-}
-
 /// [`TreeInstruction`] together with precomputed data necessary to efficiently parallelize
 /// Merkle tree traversal.
 #[derive(Debug)]
 struct InstructionWithPrecomputes {
     /// 0-based index of the instruction.
     index: usize,
-    /// Key read / written by the instruction.
-    key: Key,
     instruction: TreeInstruction,
     /// Nibbles for the parent node computed by [`Storage::load_ancestors()`].
     parent_nibbles: Nibbles,
-    /// Leaf index for the operation computed by [`Storage::compute_leaf_indices()`].
-    /// Always 0 for reads.
-    leaf_index: u64,
 }
 
 impl InstructionWithPrecomputes {
     /// Creates groups of instructions to be used during parallelized tree traversal.
     fn split(
-        instructions: Vec<(Key, TreeInstruction)>,
+        instructions: Vec<TreeInstruction>,
         parent_nibbles: Vec<Nibbles>,
-        leaf_indices: Vec<u64>,
     ) -> [Vec<Self>; SUBTREE_COUNT] {
         const EMPTY_VEC: Vec<InstructionWithPrecomputes> = Vec::new();
         // ^ Need to extract this to a constant to be usable as an array initializer.
 
         let mut parts = [EMPTY_VEC; SUBTREE_COUNT];
-        let it = instructions
-            .into_iter()
-            .zip(parent_nibbles)
-            .zip(leaf_indices);
-        for (index, (((key, instruction), parent_nibbles), leaf_index)) in it.enumerate() {
-            let first_nibble = Nibbles::nibble(&key, 0);
+        let it = instructions.into_iter().zip(parent_nibbles);
+        for (index, (instruction, parent_nibbles)) in it.enumerate() {
+            let first_nibble = Nibbles::nibble(&instruction.key(), 0);
             let part = &mut parts[first_nibble as usize];
             part.push(Self {
                 index,
-                key,
                 instruction,
                 parent_nibbles,
-                leaf_index,
             });
         }
         parts
@@ -472,8 +356,6 @@ mod tests {
     use super::*;
     use crate::types::Root;
 
-    const HASH: ValueHash = ValueHash::zero();
-
     fn byte_key(byte: u8) -> Key {
         Key::from_little_endian(&[byte; 32])
     }
@@ -485,88 +367,14 @@ mod tests {
         assert_eq!(sorted_keys.0, [1, 3, 4, 0, 2].map(|i| (i, keys[i])));
     }
 
-    #[test]
-    fn computing_key_mentions() {
-        let keys = [4, 1, 3, 4, 3, 3].map(byte_key);
-        let sorted_keys = SortedKeys::new(keys.into_iter());
-        let mentions = sorted_keys.key_mentions(6);
-
-        assert_matches!(
-            mentions.as_slice(),
-            [
-                KeyMention::First, KeyMention::First, KeyMention::First,
-                KeyMention::SameAs(0), KeyMention::SameAs(2), KeyMention::SameAs(i)
-            ] if *i == 2 || *i == 4
-        );
-    }
-
-    #[test]
-    fn computing_leaf_indices() {
-        let db = prepare_db();
-        let (instructions, expected_indices) = get_instructions_and_leaf_indices();
-        let mut storage = Storage::new(&db, &(), 1, true);
-        let sorted_keys = SortedKeys::new(instructions.iter().map(|(key, _)| *key));
-        let parent_nibbles = storage.updater.load_ancestors(&sorted_keys, &db);
-
-        let leaf_indices =
-            storage.compute_leaf_indices(&instructions, sorted_keys, &parent_nibbles);
-        assert_eq!(leaf_indices, expected_indices);
-    }
-
-    fn prepare_db() -> PatchSet {
-        let mut db = PatchSet::default();
-        let (_, patch) =
-            Storage::new(&db, &(), 0, true).extend(vec![(byte_key(2), HASH), (byte_key(1), HASH)]);
-        db.apply_patch(patch);
-        db
-    }
-
-    fn get_instructions_and_leaf_indices() -> (Vec<(Key, TreeInstruction)>, Vec<u64>) {
-        let instructions_and_indices = vec![
-            (byte_key(3), TreeInstruction::Read, 0),
-            (byte_key(1), TreeInstruction::Write(HASH), 2),
-            (byte_key(2), TreeInstruction::Read, 0),
-            (byte_key(3), TreeInstruction::Write(HASH), 3),
-            (byte_key(1), TreeInstruction::Read, 0),
-            (byte_key(3), TreeInstruction::Write(HASH), 3),
-            (byte_key(2), TreeInstruction::Write(HASH), 1),
-            (byte_key(0xc0), TreeInstruction::Write(HASH), 4),
-            (byte_key(2), TreeInstruction::Write(HASH), 1),
-        ];
-        instructions_and_indices
-            .into_iter()
-            .map(|(key, instr, idx)| ((key, instr), idx))
-            .unzip()
-    }
-
-    #[test]
-    fn extending_storage_with_proofs() {
-        let db = prepare_db();
-        let (instructions, expected_indices) = get_instructions_and_leaf_indices();
-        let storage = Storage::new(&db, &(), 1, true);
-        let (block_output, _) = storage.extend_with_proofs(instructions);
-        assert_eq!(block_output.leaf_count, 4);
-
-        assert_eq!(block_output.logs.len(), expected_indices.len());
-        for (expected_idx, log) in expected_indices.into_iter().zip(&block_output.logs) {
-            match log.base {
-                TreeLogEntry::Inserted { leaf_index }
-                | TreeLogEntry::Updated { leaf_index, .. } => {
-                    assert_eq!(leaf_index, expected_idx);
-                }
-                _ => {}
-            }
-        }
-    }
-
     #[test]
     fn proofs_for_empty_storage() {
         let db = PatchSet::default();
         let storage = Storage::new(&db, &(), 0, true);
         let instructions = vec![
-            (byte_key(1), TreeInstruction::Read),
-            (byte_key(2), TreeInstruction::Read),
-            (byte_key(0xff), TreeInstruction::Read),
+            TreeInstruction::Read(byte_key(1)),
+            TreeInstruction::Read(byte_key(2)),
+            TreeInstruction::Read(byte_key(0xff)),
         ];
         let (block_output, patch) = storage.extend_with_proofs(instructions);
         assert_eq!(block_output.leaf_count, 0);
diff --git a/core/lib/merkle_tree/src/storage/serialization.rs b/core/lib/merkle_tree/src/storage/serialization.rs
index 15d67604cc04..6a9216fa104a 100644
--- a/core/lib/merkle_tree/src/storage/serialization.rs
+++ b/core/lib/merkle_tree/src/storage/serialization.rs
@@ -26,7 +26,11 @@ impl LeafNode {
         let leaf_index = leb128::read::unsigned(&mut bytes).map_err(|err| {
             DeserializeErrorKind::Leb128(err).with_context(ErrorContext::LeafIndex)
         })?;
-        Ok(Self::new(full_key, value_hash, leaf_index))
+        Ok(Self {
+            full_key,
+            value_hash,
+            leaf_index,
+        })
     }
 
     pub(super) fn serialize(&self, buffer: &mut Vec<u8>) {
@@ -297,6 +301,7 @@ impl Manifest {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::TreeEntry;
     use zksync_types::H256;
 
     #[test]
@@ -369,7 +374,7 @@ mod tests {
 
     #[test]
     fn serializing_leaf_node() {
-        let leaf = LeafNode::new(513.into(), H256([4; 32]), 42);
+        let leaf = LeafNode::new(TreeEntry::new(513.into(), 42, H256([4; 32])));
         let mut buffer = vec![];
         leaf.serialize(&mut buffer);
         assert_eq!(buffer[..30], [0; 30]); // padding for the key
@@ -426,7 +431,7 @@ mod tests {
 
     #[test]
     fn serializing_root_with_leaf() {
-        let leaf = LeafNode::new(513.into(), H256([4; 32]), 42);
+        let leaf = LeafNode::new(TreeEntry::new(513.into(), 42, H256([4; 32])));
         let root = Root::new(1, leaf.into());
         let mut buffer = vec![];
         root.serialize(&mut buffer);
diff --git a/core/lib/merkle_tree/src/storage/tests.rs b/core/lib/merkle_tree/src/storage/tests.rs
index 958c906289ea..e70cb057280e 100644
--- a/core/lib/merkle_tree/src/storage/tests.rs
+++ b/core/lib/merkle_tree/src/storage/tests.rs
@@ -25,7 +25,7 @@ pub(super) fn generate_nodes(version: u64, nibble_counts: &[usize]) -> HashMap<N
     let nodes = nibble_counts.iter().map(|&count| {
         assert_ne!(count, 0);
         let key = Nibbles::new(&FIRST_KEY, count).with_version(version);
-        let node = LeafNode::new(FIRST_KEY, H256::zero(), count as u64);
+        let node = LeafNode::new(TreeEntry::new(FIRST_KEY, count as u64, H256::zero()));
         (key, node.into())
     });
     nodes.collect()
@@ -58,7 +58,7 @@ fn inserting_entries_in_empty_database() {
     let parent_nibbles = updater.load_ancestors(&sorted_keys, &db);
     assert_eq!(parent_nibbles, [Nibbles::EMPTY; 3]);
 
-    updater.insert(FIRST_KEY, H256([1; 32]), &Nibbles::EMPTY, || 1);
+    updater.insert(TreeEntry::new(FIRST_KEY, 1, H256([1; 32])), &Nibbles::EMPTY);
 
     let root_node = updater.patch_set.get(&Nibbles::EMPTY).unwrap();
     let Node::Leaf(root_leaf) = root_node else {
@@ -67,10 +67,13 @@ fn inserting_entries_in_empty_database() {
     assert_eq!(root_leaf.full_key, FIRST_KEY);
     assert_eq!(root_leaf.value_hash, H256([1; 32]));
 
-    updater.insert(SECOND_KEY, H256([2; 32]), &Nibbles::EMPTY, || 2);
+    updater.insert(
+        TreeEntry::new(SECOND_KEY, 2, H256([2; 32])),
+        &Nibbles::EMPTY,
+    );
     assert_storage_with_2_keys(&updater);
 
-    updater.insert(THIRD_KEY, H256([3; 32]), &Nibbles::EMPTY, || 3);
+    updater.insert(TreeEntry::new(THIRD_KEY, 3, H256([3; 32])), &Nibbles::EMPTY);
     assert_storage_with_3_keys(&updater);
 }
 
@@ -151,9 +154,9 @@ fn assert_storage_with_3_keys(updater: &TreeUpdater) {
 #[test]
 fn changing_child_ref_type() {
     let mut updater = TreeUpdater::new(0, Root::Empty);
-    updater.insert(FIRST_KEY, H256([1; 32]), &Nibbles::EMPTY, || 1);
+    updater.insert(TreeEntry::new(FIRST_KEY, 1, H256([1; 32])), &Nibbles::EMPTY);
     let e_key = U256([0, 0, 0, 0x_e000_0000_0000_0000]);
-    updater.insert(e_key, H256([2; 32]), &Nibbles::EMPTY, || 2);
+    updater.insert(TreeEntry::new(e_key, 2, H256([2; 32])), &Nibbles::EMPTY);
 
     let node = updater.patch_set.get(&Nibbles::EMPTY).unwrap();
     let Node::Internal(node) = node else {
@@ -162,7 +165,10 @@ fn changing_child_ref_type() {
     assert!(node.child_ref(0xd).unwrap().is_leaf);
     assert!(node.child_ref(0xe).unwrap().is_leaf);
 
-    updater.insert(SECOND_KEY, H256([3; 32]), &Nibbles::EMPTY, || 3);
+    updater.insert(
+        TreeEntry::new(SECOND_KEY, 3, H256([3; 32])),
+        &Nibbles::EMPTY,
+    );
 
     let node = updater.patch_set.get(&Nibbles::EMPTY).unwrap();
     let Node::Internal(node) = node else {
@@ -176,12 +182,13 @@ fn changing_child_ref_type() {
 fn inserting_node_in_non_empty_database() {
     let mut db = PatchSet::default();
     let storage = Storage::new(&db, &(), 0, true);
-    let kvs = vec![(FIRST_KEY, H256([1; 32])), (SECOND_KEY, H256([2; 32]))];
+    let kvs = vec![
+        TreeEntry::new(FIRST_KEY, 1, H256([1; 32])),
+        TreeEntry::new(SECOND_KEY, 2, H256([2; 32])),
+    ];
     let (_, patch) = storage.extend(kvs);
     db.apply_patch(patch);
 
-    let mut count = 2;
-    let mut leaf_index_fn = || increment_counter(&mut count);
     let mut updater = TreeUpdater::new(1, db.root(0).unwrap());
     let sorted_keys = SortedKeys::new([THIRD_KEY, E_KEY, SECOND_KEY].into_iter());
     let parent_nibbles = updater.load_ancestors(&sorted_keys, &db);
@@ -196,19 +203,15 @@ fn inserting_node_in_non_empty_database() {
     );
 
     let (op, _) = updater.insert(
-        THIRD_KEY,
-        H256([3; 32]),
+        TreeEntry::new(THIRD_KEY, 3, H256([3; 32])),
         &parent_nibbles[0],
-        &mut leaf_index_fn,
     );
-    assert_eq!(op, TreeLogEntry::insert(3));
-    let (op, _) = updater.insert(E_KEY, H256::zero(), &parent_nibbles[1], &mut leaf_index_fn);
-    assert_eq!(op, TreeLogEntry::insert(4));
+    assert_eq!(op, TreeLogEntry::Inserted);
+    let (op, _) = updater.insert(TreeEntry::new(E_KEY, 4, H256::zero()), &parent_nibbles[1]);
+    assert_eq!(op, TreeLogEntry::Inserted);
     let (op, _) = updater.insert(
-        SECOND_KEY,
-        H256([2; 32]),
+        TreeEntry::new(SECOND_KEY, 2, H256([2; 32])),
         &parent_nibbles[2],
-        &mut leaf_index_fn,
     );
     assert_matches!(op, TreeLogEntry::Updated { leaf_index: 2, .. });
     assert_eq!(updater.metrics.new_internal_nodes, 0);
@@ -230,7 +233,10 @@ fn inserting_node_in_non_empty_database() {
 fn inserting_node_in_non_empty_database_with_moved_key() {
     let mut db = PatchSet::default();
     let storage = Storage::new(&db, &(), 0, true);
-    let kvs = vec![(FIRST_KEY, H256([1; 32])), (THIRD_KEY, H256([3; 32]))];
+    let kvs = vec![
+        TreeEntry::new(FIRST_KEY, 1, H256([1; 32])),
+        TreeEntry::new(THIRD_KEY, 2, H256([3; 32])),
+    ];
     let (_, patch) = storage.extend(kvs);
     db.apply_patch(patch);
 
@@ -246,8 +252,11 @@ fn inserting_node_in_non_empty_database_with_moved_key() {
         Some(Node::Leaf(_))
     );
 
-    let (op, _) = updater.insert(SECOND_KEY, H256([2; 32]), &parent_nibbles[0], || 3);
-    assert_eq!(op, TreeLogEntry::insert(3));
+    let (op, _) = updater.insert(
+        TreeEntry::new(SECOND_KEY, 3, H256([2; 32])),
+        &parent_nibbles[0],
+    );
+    assert_eq!(op, TreeLogEntry::Inserted);
     assert_matches!(
         updater.patch_set.get(&parent_nibbles[0]),
         Some(Node::Internal(_))
@@ -260,7 +269,7 @@ fn inserting_node_in_non_empty_database_with_moved_key() {
 fn proving_keys_existence_and_absence() {
     let mut updater = TreeUpdater::new(0, Root::Empty);
     updater.patch_set.ensure_internal_root_node(); // Necessary for proofs to work.
-    updater.insert(FIRST_KEY, H256([1; 32]), &Nibbles::EMPTY, || 1);
+    updater.insert(TreeEntry::new(FIRST_KEY, 1, H256([1; 32])), &Nibbles::EMPTY);
 
     let mut hasher = HasherWithStats::new(&());
     let (op, merkle_path) = updater.prove(&mut hasher, FIRST_KEY, &Nibbles::EMPTY);
@@ -273,7 +282,7 @@ fn proving_keys_existence_and_absence() {
     let merkle_path = finalize_merkle_path(merkle_path, &hasher);
     assert_eq!(merkle_path.len(), 40);
 
-    updater.insert(THIRD_KEY, H256([3; 32]), &Nibbles::EMPTY, || 2);
+    updater.insert(TreeEntry::new(THIRD_KEY, 2, H256([3; 32])), &Nibbles::EMPTY);
     let (op, merkle_path) = updater.prove(&mut hasher, FIRST_KEY, &Nibbles::EMPTY);
     assert_matches!(op, TreeLogEntry::Read { .. });
     let merkle_path = finalize_merkle_path(merkle_path, &hasher);
@@ -300,14 +309,17 @@ fn finalize_merkle_path(mut path: MerklePath, hasher: &HasherWithStats<'_>) -> V
 fn reading_keys_does_not_change_child_version() {
     let mut db = PatchSet::default();
     let storage = Storage::new(&db, &(), 0, true);
-    let kvs = vec![(FIRST_KEY, H256([0; 32])), (SECOND_KEY, H256([1; 32]))];
+    let kvs = vec![
+        TreeEntry::new(FIRST_KEY, 1, H256([0; 32])),
+        TreeEntry::new(SECOND_KEY, 2, H256([1; 32])),
+    ];
     let (_, patch) = storage.extend(kvs);
     db.apply_patch(patch);
 
     let storage = Storage::new(&db, &(), 1, true);
     let instructions = vec![
-        (FIRST_KEY, TreeInstruction::Read),
-        (E_KEY, TreeInstruction::Write(H256([2; 32]))),
+        TreeInstruction::Read(FIRST_KEY),
+        TreeInstruction::Write(TreeEntry::new(E_KEY, 3, H256([2; 32]))),
     ];
 
     let (_, patch) = storage.extend_with_proofs(instructions);
@@ -327,12 +339,15 @@ fn reading_keys_does_not_change_child_version() {
 fn read_ops_are_not_reflected_in_patch() {
     let mut db = PatchSet::default();
     let storage = Storage::new(&db, &(), 0, true);
-    let kvs = vec![(FIRST_KEY, H256([0; 32])), (SECOND_KEY, H256([1; 32]))];
+    let kvs = vec![
+        TreeEntry::new(FIRST_KEY, 1, H256([0; 32])),
+        TreeEntry::new(SECOND_KEY, 2, H256([1; 32])),
+    ];
     let (_, patch) = storage.extend(kvs);
     db.apply_patch(patch);
 
     let storage = Storage::new(&db, &(), 1, true);
-    let instructions = vec![(FIRST_KEY, TreeInstruction::Read)];
+    let instructions = vec![TreeInstruction::Read(FIRST_KEY)];
     let (_, patch) = storage.extend_with_proofs(instructions);
     assert!(patch.patches_by_version[&1].nodes.is_empty());
 }
@@ -351,7 +366,7 @@ fn read_instructions_do_not_lead_to_copied_nodes(writes_per_block: u64) {
     let mut database = PatchSet::default();
     let storage = Storage::new(&database, &(), 0, true);
     let kvs = (0..key_count)
-        .map(|i| (big_endian_key(i), H256::zero()))
+        .map(|i| TreeEntry::new(big_endian_key(i), i + 1, H256::zero()))
         .collect();
     let (_, patch) = storage.extend(kvs);
     database.apply_patch(patch);
@@ -361,10 +376,11 @@ fn read_instructions_do_not_lead_to_copied_nodes(writes_per_block: u64) {
         // Select some existing keys to read. Keys may be repeated, this is fine for our purpose.
         let reads = (0..writes_per_block).map(|_| {
             let key = big_endian_key(rng.gen_range(0..key_count));
-            (key, TreeInstruction::Read)
+            TreeInstruction::Read(key)
+        });
+        let writes = (key_count..key_count + writes_per_block).map(|i| {
+            TreeInstruction::Write(TreeEntry::new(big_endian_key(i), i + 1, H256::zero()))
         });
-        let writes = (key_count..key_count + writes_per_block)
-            .map(|i| (big_endian_key(i), TreeInstruction::Write(H256::zero())));
 
         let mut instructions: Vec<_> = reads.chain(writes).collect();
         instructions.shuffle(&mut rng);
@@ -400,7 +416,7 @@ fn replaced_keys_are_correctly_tracked(writes_per_block: usize, with_proofs: boo
     let mut database = PatchSet::default();
     let storage = Storage::new(&database, &(), 0, true);
     let kvs = (0..100)
-        .map(|i| (big_endian_key(i), H256::zero()))
+        .map(|i| TreeEntry::new(big_endian_key(i), i + 1, H256::zero()))
         .collect();
     let (_, patch) = storage.extend(kvs);
 
@@ -412,11 +428,11 @@ fn replaced_keys_are_correctly_tracked(writes_per_block: usize, with_proofs: boo
         let updates = (0..100)
             .choose_multiple(&mut rng, writes_per_block)
             .into_iter()
-            .map(|i| (big_endian_key(i), H256::zero()));
+            .map(|i| TreeEntry::new(big_endian_key(i), i + 1, H256::zero()));
 
         let storage = Storage::new(&database, &(), new_version, true);
         let patch = if with_proofs {
-            let instructions = updates.map(|(key, value)| (key, TreeInstruction::Write(value)));
+            let instructions = updates.map(TreeInstruction::Write);
             storage.extend_with_proofs(instructions.collect()).1
         } else {
             storage.extend(updates.collect()).1
@@ -454,14 +470,18 @@ fn assert_replaced_keys(db: &PatchSet, patch: &PatchSet) {
 #[test]
 fn tree_handles_keys_at_terminal_level() {
     let mut db = PatchSet::default();
-    let kvs = (0_u32..100)
-        .map(|i| (Key::from(i), ValueHash::zero()))
+    let kvs = (0_u64..100)
+        .map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::zero()))
         .collect();
     let (_, patch) = Storage::new(&db, &(), 0, true).extend(kvs);
     db.apply_patch(patch);
 
     // Overwrite a key and check that we don't panic.
-    let new_kvs = vec![(Key::from(0), ValueHash::from_low_u64_be(1))];
+    let new_kvs = vec![TreeEntry::new(
+        Key::from(0),
+        1,
+        ValueHash::from_low_u64_be(1),
+    )];
     let (_, patch) = Storage::new(&db, &(), 1, true).extend(new_kvs);
 
     assert_eq!(
@@ -483,7 +503,7 @@ fn tree_handles_keys_at_terminal_level() {
 #[test]
 fn recovery_flattens_node_versions() {
     let recovery_version = 100;
-    let recovery_entries = (0_u64..10).map(|i| RecoveryEntry {
+    let recovery_entries = (0_u64..10).map(|i| TreeEntry {
         key: Key::from(i) << 252, // the first key nibbles are distinct
         value: ValueHash::zero(),
         leaf_index: i + 1,
@@ -516,7 +536,7 @@ fn recovery_flattens_node_versions() {
 #[test_casing(7, [256, 4, 5, 20, 69, 127, 128])]
 fn recovery_with_node_hierarchy(chunk_size: usize) {
     let recovery_version = 100;
-    let recovery_entries = (0_u64..256).map(|i| RecoveryEntry {
+    let recovery_entries = (0_u64..256).map(|i| TreeEntry {
         key: Key::from(i) << 248, // the first two key nibbles are distinct
         value: ValueHash::zero(),
         leaf_index: i + 1,
@@ -567,7 +587,7 @@ fn recovery_with_node_hierarchy(chunk_size: usize) {
 #[test_casing(7, [256, 5, 7, 20, 59, 127, 128])]
 fn recovery_with_deep_node_hierarchy(chunk_size: usize) {
     let recovery_version = 1_000;
-    let recovery_entries = (0_u64..256).map(|i| RecoveryEntry {
+    let recovery_entries = (0_u64..256).map(|i| TreeEntry {
         key: Key::from(i), // the last two key nibbles are distinct
         value: ValueHash::zero(),
         leaf_index: i + 1,
@@ -630,7 +650,7 @@ fn recovery_with_deep_node_hierarchy(chunk_size: usize) {
 fn recovery_workflow_with_multiple_stages() {
     let mut db = PatchSet::default();
     let recovery_version = 100;
-    let recovery_entries = (0_u64..100).map(|i| RecoveryEntry {
+    let recovery_entries = (0_u64..100).map(|i| TreeEntry {
         key: Key::from(i),
         value: ValueHash::zero(),
         leaf_index: i,
@@ -640,7 +660,7 @@ fn recovery_workflow_with_multiple_stages() {
     assert_eq!(patch.root(recovery_version).unwrap().leaf_count(), 100);
     db.apply_patch(patch);
 
-    let more_recovery_entries = (100_u64..200).map(|i| RecoveryEntry {
+    let more_recovery_entries = (100_u64..200).map(|i| TreeEntry {
         key: Key::from(i),
         value: ValueHash::zero(),
         leaf_index: i,
@@ -653,7 +673,7 @@ fn recovery_workflow_with_multiple_stages() {
 
     // Check that all entries can be accessed
     let storage = Storage::new(&db, &(), recovery_version + 1, true);
-    let instructions = (0_u32..200).map(|i| (Key::from(i), TreeInstruction::Read));
+    let instructions = (0_u32..200).map(|i| TreeInstruction::Read(Key::from(i)));
     let (output, _) = storage.extend_with_proofs(instructions.collect());
     assert_eq!(output.leaf_count, 200);
     assert_eq!(output.logs.len(), 200);
@@ -687,17 +707,15 @@ fn test_recovery_pruning_equivalence(
     );
 
     let mut rng = StdRng::seed_from_u64(RNG_SEED);
-    let kvs = (0..100).map(|i| {
-        (
-            U256([rng.gen(), rng.gen(), rng.gen(), rng.gen()]),
-            ValueHash::repeat_byte(i),
-        )
+    let entries = (0..100).map(|i| {
+        let key = U256([rng.gen(), rng.gen(), rng.gen(), rng.gen()]);
+        TreeEntry::new(key, u64::from(i) + 1, ValueHash::repeat_byte(i))
     });
-    let kvs: Vec<_> = kvs.collect();
+    let entries: Vec<_> = entries.collect();
 
     // Add `kvs` into the tree in several commits.
     let mut db = PatchSet::default();
-    for (version, chunk) in kvs.chunks(chunk_size).enumerate() {
+    for (version, chunk) in entries.chunks(chunk_size).enumerate() {
         let (_, patch) = Storage::new(&db, hasher, version as u64, true).extend(chunk.to_vec());
         db.apply_patch(patch);
     }
@@ -716,11 +734,7 @@ fn test_recovery_pruning_equivalence(
     // Generate recovery entries.
     let recovery_entries = all_nodes.values().filter_map(|node| {
         if let Node::Leaf(leaf) = node {
-            return Some(RecoveryEntry {
-                key: leaf.full_key,
-                value: leaf.value_hash,
-                leaf_index: leaf.leaf_index,
-            });
+            return Some(TreeEntry::from(*leaf));
         }
         None
     });
diff --git a/core/lib/merkle_tree/src/types/internal.rs b/core/lib/merkle_tree/src/types/internal.rs
index 5e875f6e28ac..cb35b0281c2b 100644
--- a/core/lib/merkle_tree/src/types/internal.rs
+++ b/core/lib/merkle_tree/src/types/internal.rs
@@ -4,10 +4,9 @@
 
 use std::{fmt, num::NonZeroU64};
 
-use zksync_types::{H256, U256};
-
 use crate::{
     hasher::{HashTree, InternalNodeCache},
+    types::{Key, TreeEntry, ValueHash},
     utils::SmallMap,
 };
 
@@ -323,11 +322,6 @@ impl fmt::Display for NodeKey {
     }
 }
 
-/// Key stored in the tree.
-pub type Key = U256;
-/// Hashed value stored in the tree.
-pub type ValueHash = H256;
-
 /// Leaf node of the tree.
 #[derive(Debug, Clone, Copy)]
 #[cfg_attr(test, derive(PartialEq, Eq))]
@@ -338,13 +332,18 @@ pub struct LeafNode {
 }
 
 impl LeafNode {
-    pub(crate) fn new(full_key: Key, value_hash: ValueHash, leaf_index: u64) -> Self {
+    pub(crate) fn new(entry: TreeEntry) -> Self {
         Self {
-            full_key,
-            value_hash,
-            leaf_index,
+            full_key: entry.key,
+            value_hash: entry.value,
+            leaf_index: entry.leaf_index,
         }
     }
+
+    pub(crate) fn update_from(&mut self, entry: TreeEntry) {
+        self.value_hash = entry.value;
+        self.leaf_index = entry.leaf_index;
+    }
 }
 
 /// Reference to a child in an [`InternalNode`].
@@ -556,6 +555,7 @@ impl StaleNodeKey {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use zksync_types::U256;
 
     // `U256` uses little-endian `u64` ordering; i.e., this is
     // 0x_dead_beef_0000_0000_.._0000.
diff --git a/core/lib/merkle_tree/src/types/mod.rs b/core/lib/merkle_tree/src/types/mod.rs
index de35d9024b7b..15ab72b6911d 100644
--- a/core/lib/merkle_tree/src/types/mod.rs
+++ b/core/lib/merkle_tree/src/types/mod.rs
@@ -5,22 +5,53 @@ mod internal;
 pub(crate) use self::internal::{
     ChildRef, Nibbles, NibblesBytes, StaleNodeKey, TreeTags, HASH_SIZE, KEY_SIZE, TREE_DEPTH,
 };
-pub use self::internal::{InternalNode, Key, LeafNode, Manifest, Node, NodeKey, Root, ValueHash};
+pub use self::internal::{InternalNode, LeafNode, Manifest, Node, NodeKey, Root};
+
+use zksync_types::{H256, U256};
+
+/// Key stored in the tree.
+pub type Key = U256;
+/// Hash type of values and intermediate nodes in the tree.
+pub type ValueHash = H256;
 
 /// Instruction to read or write a tree value at a certain key.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum TreeInstruction {
-    /// Read the current tree value.
-    Read,
-    /// Write the specified value.
-    Write(ValueHash),
+pub enum TreeInstruction<K = Key> {
+    /// Read the current tree value at the specified key.
+    Read(K),
+    /// Write the specified entry.
+    Write(TreeEntry<K>),
+}
+
+impl<K: Copy> TreeInstruction<K> {
+    /// Creates a write instruction.
+    pub fn write(key: K, leaf_index: u64, value: ValueHash) -> Self {
+        Self::Write(TreeEntry::new(key, leaf_index, value))
+    }
+
+    /// Returns the tree key this instruction is related to.
+    pub fn key(&self) -> K {
+        match self {
+            Self::Read(key) => *key,
+            Self::Write(entry) => entry.key,
+        }
+    }
+
+    pub(crate) fn map_key<U>(&self, map_fn: impl FnOnce(&K) -> U) -> TreeInstruction<U> {
+        match self {
+            Self::Read(key) => TreeInstruction::Read(map_fn(key)),
+            Self::Write(entry) => TreeInstruction::Write(entry.map_key(map_fn)),
+        }
+    }
 }
 
 /// Entry in a Merkle tree associated with a key.
-#[derive(Debug, Clone, Copy)]
-pub struct TreeEntry {
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct TreeEntry<K = Key> {
+    /// Tree key.
+    pub key: K,
     /// Value associated with the key.
-    pub value_hash: ValueHash,
+    pub value: ValueHash,
     /// Enumeration index of the key.
     pub leaf_index: u64,
 }
@@ -28,23 +59,40 @@ pub struct TreeEntry {
 impl From<LeafNode> for TreeEntry {
     fn from(leaf: LeafNode) -> Self {
         Self {
-            value_hash: leaf.value_hash,
+            key: leaf.full_key,
+            value: leaf.value_hash,
             leaf_index: leaf.leaf_index,
         }
     }
 }
 
+impl<K> TreeEntry<K> {
+    /// Creates a new entry with the specified fields.
+    pub fn new(key: K, leaf_index: u64, value: ValueHash) -> Self {
+        Self {
+            key,
+            value,
+            leaf_index,
+        }
+    }
+
+    pub(crate) fn map_key<U>(&self, map_fn: impl FnOnce(&K) -> U) -> TreeEntry<U> {
+        TreeEntry::new(map_fn(&self.key), self.leaf_index, self.value)
+    }
+}
+
 impl TreeEntry {
-    pub(crate) fn empty() -> Self {
+    pub(crate) fn empty(key: Key) -> Self {
         Self {
-            value_hash: ValueHash::zero(),
+            key,
+            value: ValueHash::zero(),
             leaf_index: 0,
         }
     }
 
     /// Returns `true` if and only if this entry encodes lack of a value.
     pub fn is_empty(&self) -> bool {
-        self.leaf_index == 0 && self.value_hash.is_zero()
+        self.leaf_index == 0 && self.value.is_zero()
     }
 
     pub(crate) fn with_merkle_path(self, merkle_path: Vec<ValueHash>) -> TreeEntryWithProof {
@@ -53,6 +101,12 @@ impl TreeEntry {
             merkle_path,
         }
     }
+
+    /// Replaces the value in this entry and returns the modified entry.
+    #[must_use]
+    pub fn with_value(self, value: H256) -> Self {
+        Self { value, ..self }
+    }
 }
 
 /// Entry in a Merkle tree together with a proof of authenticity.
@@ -86,10 +140,7 @@ pub struct BlockOutput {
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum TreeLogEntry {
     /// A node was inserted into the tree.
-    Inserted {
-        /// Index of the inserted node.
-        leaf_index: u64,
-    },
+    Inserted,
     /// A node with the specified index was updated.
     Updated {
         /// Index of the updated node.
@@ -109,18 +160,14 @@ pub enum TreeLogEntry {
 }
 
 impl TreeLogEntry {
-    pub(crate) fn insert(leaf_index: u64) -> Self {
-        Self::Inserted { leaf_index }
-    }
-
-    pub(crate) fn update(previous_value: ValueHash, leaf_index: u64) -> Self {
+    pub(crate) fn update(leaf_index: u64, previous_value: ValueHash) -> Self {
         Self::Updated {
             leaf_index,
             previous_value,
         }
     }
 
-    pub(crate) fn read(value: ValueHash, leaf_index: u64) -> Self {
+    pub(crate) fn read(leaf_index: u64, value: ValueHash) -> Self {
         Self::Read { leaf_index, value }
     }
 
diff --git a/core/lib/merkle_tree/src/utils.rs b/core/lib/merkle_tree/src/utils.rs
index 9542b24bbd3c..4771a940f2c8 100644
--- a/core/lib/merkle_tree/src/utils.rs
+++ b/core/lib/merkle_tree/src/utils.rs
@@ -114,11 +114,6 @@ impl<V> SmallMap<V> {
     }
 }
 
-pub(crate) fn increment_counter(counter: &mut u64) -> u64 {
-    *counter += 1;
-    *counter
-}
-
 pub(crate) fn find_diverging_bit(lhs: Key, rhs: Key) -> usize {
     let diff = lhs ^ rhs;
     diff.leading_zeros() as usize
diff --git a/core/lib/merkle_tree/tests/integration/common.rs b/core/lib/merkle_tree/tests/integration/common.rs
index fd9e00855c20..096a54ce7111 100644
--- a/core/lib/merkle_tree/tests/integration/common.rs
+++ b/core/lib/merkle_tree/tests/integration/common.rs
@@ -5,23 +5,22 @@ use once_cell::sync::Lazy;
 use std::collections::HashMap;
 
 use zksync_crypto::hasher::{blake2::Blake2Hasher, Hasher};
-use zksync_merkle_tree::{HashTree, TreeInstruction};
+use zksync_merkle_tree::{HashTree, TreeEntry, TreeInstruction};
 use zksync_types::{AccountTreeId, Address, StorageKey, H256, U256};
 
-pub fn generate_key_value_pairs(indexes: impl Iterator<Item = u64>) -> Vec<(U256, H256)> {
+pub fn generate_key_value_pairs(indexes: impl Iterator<Item = u64>) -> Vec<TreeEntry> {
     let address: Address = "4b3af74f66ab1f0da3f2e4ec7a3cb99baf1af7b2".parse().unwrap();
     let kvs = indexes.map(|idx| {
         let key = H256::from_low_u64_be(idx);
         let key = StorageKey::new(AccountTreeId::new(address), key);
-        (key.hashed_key_u256(), H256::from_low_u64_be(idx + 1))
+        let value = H256::from_low_u64_be(idx + 1);
+        TreeEntry::new(key.hashed_key_u256(), idx + 1, value)
     });
     kvs.collect()
 }
 
-pub fn compute_tree_hash(kvs: impl Iterator<Item = (U256, H256)>) -> H256 {
-    let kvs_with_indices = kvs
-        .enumerate()
-        .map(|(i, (key, value))| (key, value, i as u64 + 1));
+pub fn compute_tree_hash(kvs: impl Iterator<Item = TreeEntry>) -> H256 {
+    let kvs_with_indices = kvs.map(|entry| (entry.key, entry.value, entry.leaf_index));
     compute_tree_hash_with_indices(kvs_with_indices)
 }
 
@@ -70,17 +69,18 @@ fn compute_tree_hash_with_indices(kvs: impl Iterator<Item = (U256, H256, u64)>)
 }
 
 // Computing the expected hash takes some time in the debug mode, so we memoize it.
-pub static KVS_AND_HASH: Lazy<(Vec<(U256, H256)>, H256)> = Lazy::new(|| {
-    let kvs = generate_key_value_pairs(0..100);
-    let expected_hash = compute_tree_hash(kvs.iter().copied());
-    (kvs, expected_hash)
+pub static ENTRIES_AND_HASH: Lazy<(Vec<TreeEntry>, H256)> = Lazy::new(|| {
+    let entries = generate_key_value_pairs(0..100);
+    let expected_hash = compute_tree_hash(entries.iter().copied());
+    (entries, expected_hash)
 });
 
-pub fn convert_to_writes(kvs: &[(U256, H256)]) -> Vec<(U256, TreeInstruction)> {
-    let kvs = kvs
+pub fn convert_to_writes(entries: &[TreeEntry]) -> Vec<TreeInstruction> {
+    entries
         .iter()
-        .map(|&(key, hash)| (key, TreeInstruction::Write(hash)));
-    kvs.collect()
+        .copied()
+        .map(TreeInstruction::Write)
+        .collect()
 }
 
 /// Emulates leaf index assignment in a real Merkle tree.
@@ -88,22 +88,22 @@ pub fn convert_to_writes(kvs: &[(U256, H256)]) -> Vec<(U256, TreeInstruction)> {
 pub struct TreeMap(HashMap<U256, (H256, u64)>);
 
 impl TreeMap {
-    pub fn new(initial_entries: &[(U256, H256)]) -> Self {
+    pub fn new(initial_entries: &[TreeEntry]) -> Self {
         let map = initial_entries
             .iter()
-            .enumerate()
-            .map(|(i, (key, value))| (*key, (*value, i as u64 + 1)))
+            .map(|entry| (entry.key, (entry.value, entry.leaf_index)))
             .collect();
         Self(map)
     }
 
-    pub fn extend(&mut self, kvs: &[(U256, H256)]) {
-        for &(key, new_value) in kvs {
-            if let Some((value, _)) = self.0.get_mut(&key) {
-                *value = new_value;
+    pub fn extend(&mut self, kvs: &[TreeEntry]) {
+        for &new_entry in kvs {
+            if let Some((value, leaf_index)) = self.0.get_mut(&new_entry.key) {
+                assert_eq!(*leaf_index, new_entry.leaf_index); // sanity check
+                *value = new_entry.value;
             } else {
-                let leaf_index = self.0.len() as u64 + 1;
-                self.0.insert(key, (new_value, leaf_index));
+                self.0
+                    .insert(new_entry.key, (new_entry.value, new_entry.leaf_index));
             }
         }
     }
@@ -112,7 +112,7 @@ impl TreeMap {
         let entries = self
             .0
             .iter()
-            .map(|(key, (value, idx))| (*key, *value, *idx));
+            .map(|(key, (value, leaf_index))| (*key, *value, *leaf_index));
         compute_tree_hash_with_indices(entries)
     }
 }
diff --git a/core/lib/merkle_tree/tests/integration/consistency.rs b/core/lib/merkle_tree/tests/integration/consistency.rs
index 7c1d69657bff..da3312d2002d 100644
--- a/core/lib/merkle_tree/tests/integration/consistency.rs
+++ b/core/lib/merkle_tree/tests/integration/consistency.rs
@@ -26,7 +26,7 @@ fn five_thousand_angry_monkeys_vs_merkle_tree() {
 
     let kvs = generate_key_value_pairs(0..100);
     tree.extend(kvs);
-    tree.verify_consistency(0).unwrap();
+    tree.verify_consistency(0, true).unwrap();
 
     let mut raw_db = db.into_inner();
     let cf = MerkleTreeColumnFamily::Tree;
@@ -53,7 +53,9 @@ fn five_thousand_angry_monkeys_vs_merkle_tree() {
         raw_db.write(batch).unwrap();
 
         let mut db = RocksDBWrapper::from(raw_db);
-        let err = MerkleTree::new(&mut db).verify_consistency(0).unwrap_err();
+        let err = MerkleTree::new(&mut db)
+            .verify_consistency(0, true)
+            .unwrap_err();
         println!("{err}");
 
         // Restore the value back so that it doesn't influence the following cases.
diff --git a/core/lib/merkle_tree/tests/integration/domain.rs b/core/lib/merkle_tree/tests/integration/domain.rs
index d3b666c88492..f3febda5f06a 100644
--- a/core/lib/merkle_tree/tests/integration/domain.rs
+++ b/core/lib/merkle_tree/tests/integration/domain.rs
@@ -7,14 +7,14 @@ use tempfile::TempDir;
 use std::slice;
 
 use zksync_crypto::hasher::blake2::Blake2Hasher;
-use zksync_merkle_tree::{domain::ZkSyncTree, HashTree};
+use zksync_merkle_tree::{domain::ZkSyncTree, HashTree, TreeEntry, TreeInstruction};
 use zksync_storage::RocksDB;
 use zksync_system_constants::ACCOUNT_CODE_STORAGE_ADDRESS;
 use zksync_types::{
-    proofs::StorageLogMetadata, AccountTreeId, Address, L1BatchNumber, StorageKey, StorageLog, H256,
+    proofs::StorageLogMetadata, AccountTreeId, Address, L1BatchNumber, StorageKey, H256,
 };
 
-fn gen_storage_logs() -> Vec<StorageLog> {
+fn gen_storage_logs() -> Vec<TreeInstruction<StorageKey>> {
     let addrs = vec![
         "4b3af74f66ab1f0da3f2e4ec7a3cb99baf1af7b2",
         "ef4bb7b21c5fe7432a7d63876cc59ecc23b46636",
@@ -32,7 +32,11 @@ fn gen_storage_logs() -> Vec<StorageLog> {
 
     proof_keys
         .zip(proof_values)
-        .map(|(proof_key, proof_value)| StorageLog::new_write_log(proof_key, proof_value))
+        .enumerate()
+        .map(|(i, (proof_key, proof_value))| {
+            let entry = TreeEntry::new(proof_key, i as u64 + 1, proof_value);
+            TreeInstruction::Write(entry)
+        })
         .collect()
 }
 
@@ -54,7 +58,11 @@ fn basic_workflow() {
     assert_eq!(metadata.rollup_last_leaf_index, 101);
     assert_eq!(metadata.initial_writes.len(), logs.len());
     for (write, log) in metadata.initial_writes.iter().zip(&logs) {
-        assert_eq!(write.value, log.value);
+        let expected_value = match log {
+            TreeInstruction::Write(entry) => entry.value,
+            TreeInstruction::Read(_) => unreachable!(),
+        };
+        assert_eq!(write.value, expected_value);
     }
     assert!(metadata.repeated_writes.is_empty());
 
@@ -124,7 +132,10 @@ fn filtering_out_no_op_writes() {
     // Add some actual repeated writes.
     let mut expected_writes_count = 0;
     for log in logs.iter_mut().step_by(3) {
-        log.value = H256::repeat_byte(0xff);
+        let TreeInstruction::Write(entry) = log else {
+            unreachable!("Unexpected instruction: {log:?}");
+        };
+        entry.value = H256::repeat_byte(0xff);
         expected_writes_count += 1;
     }
     let new_metadata = tree.process_l1_batch(&logs);
@@ -155,14 +166,16 @@ fn revert_blocks() {
     // Add couple of blocks of distinct keys/values
     let mut logs: Vec<_> = proof_keys
         .zip(proof_values)
-        .map(|(proof_key, proof_value)| StorageLog::new_write_log(proof_key, proof_value))
+        .map(|(proof_key, proof_value)| {
+            let entry = TreeEntry::new(proof_key, proof_value.to_low_u64_be() + 1, proof_value);
+            TreeInstruction::Write(entry)
+        })
         .collect();
     // Add a block with repeated keys
     let extra_logs = (0..block_size).map(move |i| {
-        StorageLog::new_write_log(
-            StorageKey::new(AccountTreeId::new(address), H256::from_low_u64_be(i as u64)),
-            H256::from_low_u64_be((i + 1) as u64),
-        )
+        let key = StorageKey::new(AccountTreeId::new(address), H256::from_low_u64_be(i as u64));
+        let entry = TreeEntry::new(key, i as u64 + 1, H256::from_low_u64_be(i as u64 + 1));
+        TreeInstruction::Write(entry)
     });
     logs.extend(extra_logs);
 
@@ -277,7 +290,7 @@ fn read_logs() {
     let mut tree = ZkSyncTree::new_lightweight(db);
     let read_logs: Vec<_> = logs
         .into_iter()
-        .map(|log| StorageLog::new_read_log(log.key, log.value))
+        .map(|instr| TreeInstruction::Read(instr.key()))
         .collect();
     let read_metadata = tree.process_l1_batch(&read_logs);
 
@@ -285,14 +298,13 @@ fn read_logs() {
 }
 
 fn create_write_log(
+    leaf_index: u64,
     address: Address,
     address_storage_key: [u8; 32],
     value: [u8; 32],
-) -> StorageLog {
-    StorageLog::new_write_log(
-        StorageKey::new(AccountTreeId::new(address), H256(address_storage_key)),
-        H256(value),
-    )
+) -> TreeInstruction<StorageKey> {
+    let key = StorageKey::new(AccountTreeId::new(address), H256(address_storage_key));
+    TreeInstruction::Write(TreeEntry::new(key, leaf_index, H256(value)))
 }
 
 fn subtract_from_max_value(diff: u8) -> [u8; 32] {
@@ -315,28 +327,33 @@ fn root_hash_compatibility() {
     );
 
     let storage_logs = vec![
-        create_write_log(ACCOUNT_CODE_STORAGE_ADDRESS, [0; 32], [1; 32]),
+        create_write_log(1, ACCOUNT_CODE_STORAGE_ADDRESS, [0; 32], [1; 32]),
         create_write_log(
+            2,
             Address::from_low_u64_be(9223372036854775808),
             [254; 32],
             subtract_from_max_value(1),
         ),
         create_write_log(
+            3,
             Address::from_low_u64_be(9223372036854775809),
             [253; 32],
             subtract_from_max_value(2),
         ),
         create_write_log(
+            4,
             Address::from_low_u64_be(9223372036854775810),
             [252; 32],
             subtract_from_max_value(3),
         ),
         create_write_log(
+            5,
             Address::from_low_u64_be(9223372036854775811),
             [251; 32],
             subtract_from_max_value(4),
         ),
         create_write_log(
+            6,
             Address::from_low_u64_be(9223372036854775812),
             [250; 32],
             subtract_from_max_value(5),
diff --git a/core/lib/merkle_tree/tests/integration/merkle_tree.rs b/core/lib/merkle_tree/tests/integration/merkle_tree.rs
index 9f3eb970cd38..e4f052bb03c4 100644
--- a/core/lib/merkle_tree/tests/integration/merkle_tree.rs
+++ b/core/lib/merkle_tree/tests/integration/merkle_tree.rs
@@ -7,12 +7,14 @@ use std::{cmp, mem};
 
 use zksync_crypto::hasher::blake2::Blake2Hasher;
 use zksync_merkle_tree::{
-    Database, HashTree, MerkleTree, PatchSet, Patched, TreeInstruction, TreeLogEntry,
+    Database, HashTree, MerkleTree, PatchSet, Patched, TreeEntry, TreeInstruction, TreeLogEntry,
     TreeRangeDigest,
 };
 use zksync_types::{AccountTreeId, Address, StorageKey, H256, U256};
 
-use crate::common::{compute_tree_hash, convert_to_writes, generate_key_value_pairs, KVS_AND_HASH};
+use crate::common::{
+    compute_tree_hash, convert_to_writes, generate_key_value_pairs, ENTRIES_AND_HASH,
+};
 
 #[test]
 fn compute_tree_hash_works_correctly() {
@@ -25,7 +27,7 @@ fn compute_tree_hash_works_correctly() {
     let address: Address = "4b3af74f66ab1f0da3f2e4ec7a3cb99baf1af7b2".parse().unwrap();
     let key = StorageKey::new(AccountTreeId::new(address), H256::zero());
     let key = key.hashed_key_u256();
-    let hash = compute_tree_hash([(key, H256([1; 32]))].into_iter());
+    let hash = compute_tree_hash([TreeEntry::new(key, 1, H256([1; 32]))].into_iter());
     assert_eq!(hash, EXPECTED_HASH);
 }
 
@@ -59,7 +61,7 @@ fn output_proofs_are_computed_correctly_on_empty_tree(kv_count: u64) {
 
     let reads = instructions
         .iter()
-        .map(|(key, _)| (*key, TreeInstruction::Read));
+        .map(|instr| TreeInstruction::Read(instr.key()));
     let mut reads: Vec<_> = reads.collect();
     reads.shuffle(&mut rng);
     let output = tree.extend_with_proofs(reads.clone());
@@ -77,25 +79,26 @@ fn entry_proofs_are_computed_correctly_on_empty_tree(kv_count: u64) {
     let expected_hash = compute_tree_hash(kvs.iter().copied());
     tree.extend(kvs.clone());
 
-    let existing_keys: Vec<_> = kvs.iter().map(|(key, _)| *key).collect();
+    let existing_keys: Vec<_> = kvs.iter().map(|entry| entry.key).collect();
     let entries = tree.entries_with_proofs(0, &existing_keys).unwrap();
     assert_eq!(entries.len(), existing_keys.len());
-    for ((key, value), entry) in kvs.iter().zip(entries) {
-        entry.verify(&Blake2Hasher, *key, expected_hash);
-        assert_eq!(entry.base.value_hash, *value);
+    for (input_entry, entry) in kvs.iter().zip(entries) {
+        entry.verify(&Blake2Hasher, expected_hash);
+        assert_eq!(entry.base, *input_entry);
     }
 
     // Test some keys adjacent to existing ones.
-    let adjacent_keys = kvs.iter().flat_map(|(key, _)| {
+    let adjacent_keys = kvs.iter().flat_map(|entry| {
+        let key = entry.key;
         [
-            *key ^ (U256::one() << rng.gen_range(0..256)),
-            *key ^ (U256::one() << rng.gen_range(0..256)),
-            *key ^ (U256::one() << rng.gen_range(0..256)),
+            key ^ (U256::one() << rng.gen_range(0..256)),
+            key ^ (U256::one() << rng.gen_range(0..256)),
+            key ^ (U256::one() << rng.gen_range(0..256)),
         ]
     });
     let random_keys = generate_key_value_pairs(kv_count..(kv_count * 2))
         .into_iter()
-        .map(|(key, _)| key);
+        .map(|entry| entry.key);
     let mut missing_keys: Vec<_> = adjacent_keys.chain(random_keys).collect();
     missing_keys.shuffle(&mut rng);
 
@@ -103,7 +106,8 @@ fn entry_proofs_are_computed_correctly_on_empty_tree(kv_count: u64) {
     assert_eq!(entries.len(), missing_keys.len());
     for (key, entry) in missing_keys.iter().zip(entries) {
         assert!(entry.base.is_empty());
-        entry.verify(&Blake2Hasher, *key, expected_hash);
+        assert_eq!(entry.base.key, *key);
+        entry.verify(&Blake2Hasher, expected_hash);
     }
 }
 
@@ -117,10 +121,13 @@ fn proofs_are_computed_correctly_for_mixed_instructions() {
     let output = tree.extend(kvs.clone());
     let old_root_hash = output.root_hash;
 
-    let reads = kvs.iter().map(|(key, _)| (*key, TreeInstruction::Read));
+    let reads = kvs.iter().map(|entry| TreeInstruction::Read(entry.key));
     let mut instructions: Vec<_> = reads.collect();
     // Overwrite all keys in the tree.
-    let writes: Vec<_> = kvs.iter().map(|(key, _)| (*key, H256::zero())).collect();
+    let writes: Vec<_> = kvs
+        .iter()
+        .map(|entry| entry.with_value(H256::zero()))
+        .collect();
     let expected_hash = compute_tree_hash(writes.iter().copied());
     instructions.extend(convert_to_writes(&writes));
     instructions.shuffle(&mut rng);
@@ -145,7 +152,7 @@ fn proofs_are_computed_correctly_for_missing_keys() {
     let mut instructions = convert_to_writes(&kvs);
     let missing_reads = generate_key_value_pairs(20..50)
         .into_iter()
-        .map(|(key, _)| (key, TreeInstruction::Read));
+        .map(|entry| TreeInstruction::Read(entry.key));
     instructions.extend(missing_reads);
     instructions.shuffle(&mut rng);
 
@@ -161,7 +168,7 @@ fn proofs_are_computed_correctly_for_missing_keys() {
 }
 
 fn test_intermediate_commits(db: &mut impl Database, chunk_size: usize) {
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
     let mut final_hash = H256::zero();
     let mut tree = MerkleTree::new(db);
     for chunk in kvs.chunks(chunk_size) {
@@ -172,7 +179,7 @@ fn test_intermediate_commits(db: &mut impl Database, chunk_size: usize) {
 
     let latest_version = tree.latest_version().unwrap();
     for version in 0..=latest_version {
-        tree.verify_consistency(version).unwrap();
+        tree.verify_consistency(version, true).unwrap();
     }
 }
 
@@ -183,7 +190,7 @@ fn root_hash_is_computed_correctly_with_intermediate_commits(chunk_size: usize)
 
 #[test_casing(6, [3, 5, 10, 17, 28, 42])]
 fn output_proofs_are_computed_correctly_with_intermediate_commits(chunk_size: usize) {
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
 
     let mut tree = MerkleTree::new(PatchSet::default());
     let mut root_hash = Blake2Hasher.empty_subtree_hash(256);
@@ -198,8 +205,8 @@ fn output_proofs_are_computed_correctly_with_intermediate_commits(chunk_size: us
 
 #[test_casing(4, [10, 17, 28, 42])]
 fn entry_proofs_are_computed_correctly_with_intermediate_commits(chunk_size: usize) {
-    let (kvs, _) = &*KVS_AND_HASH;
-    let all_keys: Vec<_> = kvs.iter().map(|(key, _)| *key).collect();
+    let (kvs, _) = &*ENTRIES_AND_HASH;
+    let all_keys: Vec<_> = kvs.iter().map(|entry| entry.key).collect();
     let mut tree = MerkleTree::new(PatchSet::default());
     let mut root_hashes = vec![];
     for chunk in kvs.chunks(chunk_size) {
@@ -210,8 +217,9 @@ fn entry_proofs_are_computed_correctly_with_intermediate_commits(chunk_size: usi
         let entries = tree.entries_with_proofs(version as u64, &all_keys).unwrap();
         assert_eq!(entries.len(), all_keys.len());
         for (i, (key, entry)) in all_keys.iter().zip(entries).enumerate() {
+            assert_eq!(entry.base.key, *key);
             assert_eq!(entry.base.is_empty(), i >= (version + 1) * chunk_size);
-            entry.verify(&Blake2Hasher, *key, output.root_hash);
+            entry.verify(&Blake2Hasher, output.root_hash);
         }
     }
 
@@ -220,14 +228,15 @@ fn entry_proofs_are_computed_correctly_with_intermediate_commits(chunk_size: usi
         let entries = tree.entries_with_proofs(version as u64, &all_keys).unwrap();
         assert_eq!(entries.len(), all_keys.len());
         for (i, (key, entry)) in all_keys.iter().zip(entries).enumerate() {
+            assert_eq!(entry.base.key, *key);
             assert_eq!(entry.base.is_empty(), i >= (version + 1) * chunk_size);
-            entry.verify(&Blake2Hasher, *key, root_hash);
+            entry.verify(&Blake2Hasher, root_hash);
         }
     }
 }
 
 fn test_accumulated_commits<DB: Database>(db: DB, chunk_size: usize) -> DB {
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
     let mut db = Patched::new(db);
     let mut final_hash = H256::zero();
     for chunk in kvs.chunks(chunk_size) {
@@ -242,7 +251,7 @@ fn test_accumulated_commits<DB: Database>(db: DB, chunk_size: usize) -> DB {
     let tree = MerkleTree::new(&mut db);
     let latest_version = tree.latest_version().unwrap();
     for version in 0..=latest_version {
-        tree.verify_consistency(version).unwrap();
+        tree.verify_consistency(version, true).unwrap();
     }
     db
 }
@@ -253,9 +262,12 @@ fn accumulating_commits(chunk_size: usize) {
 }
 
 fn test_root_hash_computing_with_reverts(db: &mut impl Database) {
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
     let (initial_update, final_update) = kvs.split_at(75);
-    let key_updates: Vec<_> = kvs.iter().map(|(key, _)| (*key, H256([255; 32]))).collect();
+    let key_updates: Vec<_> = kvs
+        .iter()
+        .map(|entry| entry.with_value(H256([255; 32])))
+        .collect();
     let key_inserts = generate_key_value_pairs(100..200);
 
     let mut tree = MerkleTree::new(db);
@@ -300,7 +312,7 @@ fn test_root_hash_computing_with_key_updates(db: impl Database) {
     // Overwrite some `kvs` entries and add some new ones.
     let changed_kvs = kvs.iter_mut().enumerate().filter_map(|(i, kv)| {
         if i % 3 == 1 {
-            kv.1 = H256::from_low_u64_be((i + 100) as u64);
+            *kv = kv.with_value(H256::from_low_u64_be((i + 100) as u64));
             return Some(*kv);
         }
         None
@@ -361,12 +373,12 @@ fn root_hash_is_computed_correctly_with_key_updates() {
 fn proofs_are_computed_correctly_with_key_updates(updated_keys: usize) {
     const RNG_SEED: u64 = 1_234;
 
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
     let mut rng = StdRng::seed_from_u64(RNG_SEED);
 
     let old_instructions: Vec<_> = kvs[..updated_keys]
         .iter()
-        .map(|(key, _)| (*key, TreeInstruction::Write(H256([255; 32]))))
+        .map(|entry| TreeInstruction::Write(entry.with_value(H256([255; 32]))))
         .collect();
     // Move the updated keys to the random places in the `kvs` vector.
     let mut writes = convert_to_writes(kvs);
@@ -386,11 +398,11 @@ fn proofs_are_computed_correctly_with_key_updates(updated_keys: usize) {
     assert_eq!(output.root_hash(), Some(*expected_hash));
     output.verify_proofs(&Blake2Hasher, root_hash, &instructions);
 
-    let keys: Vec<_> = kvs.iter().map(|(key, _)| *key).collect();
+    let keys: Vec<_> = kvs.iter().map(|entry| entry.key).collect();
     let proofs = tree.entries_with_proofs(1, &keys).unwrap();
-    for ((key, value), proof) in kvs.iter().zip(proofs) {
-        assert_eq!(proof.base.value_hash, *value);
-        proof.verify(&Blake2Hasher, *key, *expected_hash);
+    for (entry, proof) in kvs.iter().zip(proofs) {
+        assert_eq!(proof.base, *entry);
+        proof.verify(&Blake2Hasher, *expected_hash);
     }
 }
 
@@ -417,7 +429,11 @@ fn test_root_hash_equals_to_previous_implementation(db: &mut impl Database) {
         })
     });
     let values = (0..100).map(H256::from_low_u64_be);
-    let kvs: Vec<_> = keys.zip(values).collect();
+    let kvs: Vec<_> = keys
+        .zip(values)
+        .enumerate()
+        .map(|(idx, (key, value))| TreeEntry::new(key, idx as u64 + 1, value))
+        .collect();
 
     let expected_hash = compute_tree_hash(kvs.iter().copied());
     assert_eq!(expected_hash, PREV_IMPL_HASH);
@@ -437,13 +453,13 @@ fn root_hash_equals_to_previous_implementation() {
 
 #[test_casing(7, [2, 3, 5, 10, 17, 28, 42])]
 fn range_proofs_with_multiple_existing_items(range_size: usize) {
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
     assert!(range_size >= 2 && range_size <= kvs.len());
 
     let mut tree = MerkleTree::new(PatchSet::default());
     tree.extend(kvs.clone());
 
-    let mut sorted_keys: Vec<_> = kvs.iter().map(|(key, _)| *key).collect();
+    let mut sorted_keys: Vec<_> = kvs.iter().map(|entry| entry.key).collect();
     sorted_keys.sort_unstable();
 
     for start_idx in 0..(sorted_keys.len() - range_size) {
@@ -460,10 +476,10 @@ fn range_proofs_with_multiple_existing_items(range_size: usize) {
         let other_entries = tree.entries(0, other_keys).unwrap();
 
         let mut range = TreeRangeDigest::new(&Blake2Hasher, *first_key, &first_entry);
-        for (key, entry) in other_keys.iter().zip(other_entries) {
-            range.update(*key, entry);
+        for entry in other_entries {
+            range.update(entry);
         }
-        let range_hash = range.finalize(*last_key, &last_entry);
+        let range_hash = range.finalize(&last_entry);
         assert_eq!(range_hash, *expected_hash);
     }
 }
@@ -479,7 +495,7 @@ fn range_proofs_with_random_ranges() {
     const RNG_SEED: u64 = 321;
 
     let mut rng = StdRng::seed_from_u64(RNG_SEED);
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
     let mut tree = MerkleTree::new(PatchSet::default());
     tree.extend(kvs.clone());
 
@@ -493,9 +509,9 @@ fn range_proofs_with_random_ranges() {
         }
 
         // Find out keys falling into the range.
-        let keys_in_range = kvs
-            .iter()
-            .filter_map(|&(key, _)| (key > start_key && key < end_key).then_some(key));
+        let keys_in_range = kvs.iter().filter_map(|entry| {
+            (entry.key > start_key && entry.key < end_key).then_some(entry.key)
+        });
         let mut keys_in_range: Vec<_> = keys_in_range.collect();
         keys_in_range.sort_unstable();
         println!("Proving range with {} keys", keys_in_range.len());
@@ -506,10 +522,10 @@ fn range_proofs_with_random_ranges() {
         let other_entries = tree.entries(0, &keys_in_range).unwrap();
 
         let mut range = TreeRangeDigest::new(&Blake2Hasher, start_key, &first_entry);
-        for (key, entry) in keys_in_range.iter().zip(other_entries) {
-            range.update(*key, entry);
+        for entry in other_entries {
+            range.update(entry);
         }
-        let range_hash = range.finalize(end_key, &last_entry);
+        let range_hash = range.finalize(&last_entry);
         assert_eq!(range_hash, *expected_hash);
     }
 }
@@ -633,7 +649,7 @@ mod rocksdb {
     fn tree_tags_mismatch() {
         let Harness { mut db, dir: _dir } = Harness::new();
         let mut tree = MerkleTree::new(&mut db);
-        tree.extend(vec![(U256::zero(), H256::zero())]);
+        tree.extend(vec![TreeEntry::new(U256::zero(), 1, H256::zero())]);
 
         MerkleTree::with_hasher(&mut db, ());
     }
@@ -643,7 +659,7 @@ mod rocksdb {
     fn tree_tags_mismatch_with_cold_restart() {
         let Harness { db, dir } = Harness::new();
         let mut tree = MerkleTree::new(db);
-        tree.extend(vec![(U256::zero(), H256::zero())]);
+        tree.extend(vec![TreeEntry::new(U256::zero(), 1, H256::zero())]);
         drop(tree);
 
         let db = RocksDBWrapper::new(dir.path());
diff --git a/core/lib/merkle_tree/tests/integration/recovery.rs b/core/lib/merkle_tree/tests/integration/recovery.rs
index fda57f788514..6739e4ffe023 100644
--- a/core/lib/merkle_tree/tests/integration/recovery.rs
+++ b/core/lib/merkle_tree/tests/integration/recovery.rs
@@ -5,11 +5,10 @@ use test_casing::test_casing;
 
 use zksync_crypto::hasher::blake2::Blake2Hasher;
 use zksync_merkle_tree::{
-    recovery::{MerkleTreeRecovery, RecoveryEntry},
-    Database, MerkleTree, PatchSet, PruneDatabase, ValueHash,
+    recovery::MerkleTreeRecovery, Database, MerkleTree, PatchSet, PruneDatabase, ValueHash,
 };
 
-use crate::common::{convert_to_writes, generate_key_value_pairs, TreeMap, KVS_AND_HASH};
+use crate::common::{convert_to_writes, generate_key_value_pairs, TreeMap, ENTRIES_AND_HASH};
 
 #[derive(Debug, Clone, Copy)]
 enum RecoveryKind {
@@ -23,16 +22,8 @@ impl RecoveryKind {
 
 #[test]
 fn recovery_basics() {
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
-    let recovery_entries = kvs
-        .iter()
-        .enumerate()
-        .map(|(i, &(key, value))| RecoveryEntry {
-            key,
-            value,
-            leaf_index: i as u64 + 1,
-        });
-    let mut recovery_entries: Vec<_> = recovery_entries.collect();
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
+    let mut recovery_entries: Vec<_> = kvs.clone();
     recovery_entries.sort_unstable_by_key(|entry| entry.key);
     let greatest_key = recovery_entries[99].key;
 
@@ -44,20 +35,12 @@ fn recovery_basics() {
     assert_eq!(recovery.root_hash(), *expected_hash);
 
     let tree = recovery.finalize();
-    tree.verify_consistency(recovered_version).unwrap();
+    tree.verify_consistency(recovered_version, true).unwrap();
 }
 
 fn test_recovery_in_chunks(mut db: impl PruneDatabase, kind: RecoveryKind, chunk_size: usize) {
-    let (kvs, expected_hash) = &*KVS_AND_HASH;
-    let recovery_entries = kvs
-        .iter()
-        .enumerate()
-        .map(|(i, &(key, value))| RecoveryEntry {
-            key,
-            value,
-            leaf_index: i as u64 + 1,
-        });
-    let mut recovery_entries: Vec<_> = recovery_entries.collect();
+    let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
+    let mut recovery_entries = kvs.clone();
     if matches!(kind, RecoveryKind::Linear) {
         recovery_entries.sort_unstable_by_key(|entry| entry.key);
     }
@@ -84,7 +67,7 @@ fn test_recovery_in_chunks(mut db: impl PruneDatabase, kind: RecoveryKind, chunk
     assert_eq!(recovery.root_hash(), *expected_hash);
 
     let mut tree = recovery.finalize();
-    tree.verify_consistency(recovered_version).unwrap();
+    tree.verify_consistency(recovered_version, true).unwrap();
     // Check that new tree versions can be built and function as expected.
     test_tree_after_recovery(&mut tree, recovered_version, *expected_hash);
 }
@@ -107,13 +90,13 @@ fn test_tree_after_recovery<DB: Database>(
     let mut rng = StdRng::seed_from_u64(RNG_SEED);
     let mut kvs = generate_key_value_pairs(100..=150);
     let mut modified_kvs = generate_key_value_pairs(50..=100);
-    for (_, value) in &mut modified_kvs {
-        *value = ValueHash::repeat_byte(1);
+    for entry in &mut modified_kvs {
+        entry.value = ValueHash::repeat_byte(1);
     }
+    modified_kvs.shuffle(&mut rng);
     kvs.extend(modified_kvs);
-    kvs.shuffle(&mut rng);
 
-    let mut tree_map = TreeMap::new(&KVS_AND_HASH.0);
+    let mut tree_map = TreeMap::new(&ENTRIES_AND_HASH.0);
     let mut prev_root_hash = root_hash;
     for (i, chunk) in kvs.chunks(CHUNK_SIZE).enumerate() {
         tree_map.extend(chunk);
@@ -129,7 +112,7 @@ fn test_tree_after_recovery<DB: Database>(
         };
 
         assert_eq!(new_root_hash, tree_map.root_hash());
-        tree.verify_consistency(recovered_version + i as u64)
+        tree.verify_consistency(recovered_version + i as u64, true)
             .unwrap();
         prev_root_hash = new_root_hash;
     }
diff --git a/core/lib/zksync_core/src/api_server/tree/mod.rs b/core/lib/zksync_core/src/api_server/tree/mod.rs
index 74dd3e5b70c1..7b4c9086ac68 100644
--- a/core/lib/zksync_core/src/api_server/tree/mod.rs
+++ b/core/lib/zksync_core/src/api_server/tree/mod.rs
@@ -54,7 +54,7 @@ impl TreeEntryWithProof {
         let mut merkle_path = src.merkle_path;
         merkle_path.reverse(); // Use root-to-leaf enumeration direction as in Ethereum
         Self {
-            value: src.base.value_hash,
+            value: src.base.value,
             index: src.base.leaf_index,
             merkle_path,
         }
diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs
index 32f39276a1e5..9ae936febfe6 100644
--- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs
+++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs
@@ -16,10 +16,10 @@ use zksync_dal::StorageProcessor;
 use zksync_health_check::{Health, HealthStatus};
 use zksync_merkle_tree::{
     domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader},
-    Key, MerkleTreeColumnFamily, NoVersionError, TreeEntryWithProof,
+    Key, MerkleTreeColumnFamily, NoVersionError, TreeEntryWithProof, TreeInstruction,
 };
 use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries};
-use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageLog, H256};
+use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageKey, H256};
 
 use super::metrics::{LoadChangesStage, TreeUpdateStage, METRICS};
 
@@ -147,7 +147,10 @@ impl AsyncTree {
         self.as_ref().root_hash()
     }
 
-    pub async fn process_l1_batch(&mut self, storage_logs: Vec<StorageLog>) -> TreeMetadata {
+    pub async fn process_l1_batch(
+        &mut self,
+        storage_logs: Vec<TreeInstruction<StorageKey>>,
+    ) -> TreeMetadata {
         let mut tree = self.inner.take().expect(Self::INCONSISTENT_MSG);
         let (tree, metadata) = tokio::task::spawn_blocking(move || {
             let metadata = tree.process_l1_batch(&storage_logs);
@@ -242,7 +245,7 @@ impl Delayer {
 #[cfg_attr(test, derive(PartialEq))]
 pub(crate) struct L1BatchWithLogs {
     pub header: L1BatchHeader,
-    pub storage_logs: Vec<StorageLog>,
+    pub storage_logs: Vec<TreeInstruction<StorageKey>>,
 }
 
 impl L1BatchWithLogs {
@@ -276,15 +279,22 @@ impl L1BatchWithLogs {
             .await;
         touched_slots_latency.observe_with_count(touched_slots.len());
 
+        let leaf_indices_latency = METRICS.start_load_stage(LoadChangesStage::LoadLeafIndices);
+        let hashed_keys_for_writes: Vec<_> =
+            touched_slots.keys().map(StorageKey::hashed_key).collect();
+        let l1_batches_for_initial_writes = storage
+            .storage_logs_dal()
+            .get_l1_batches_and_indices_for_initial_writes(&hashed_keys_for_writes)
+            .await;
+        leaf_indices_latency.observe_with_count(hashed_keys_for_writes.len());
+
         let mut storage_logs = BTreeMap::new();
         for storage_key in protective_reads {
             touched_slots.remove(&storage_key);
             // ^ As per deduplication rules, all keys in `protective_reads` haven't *really* changed
             // in the considered L1 batch. Thus, we can remove them from `touched_slots` in order to simplify
             // their further processing.
-
-            let log = StorageLog::new_read_log(storage_key, H256::zero());
-            // ^ The tree doesn't use the read value, so we set it to zero.
+            let log = TreeInstruction::Read(storage_key);
             storage_logs.insert(storage_key, log);
         }
         tracing::debug!(
@@ -292,45 +302,17 @@ impl L1BatchWithLogs {
             touched_slots.len()
         );
 
-        // We don't want to update the tree with zero values which were never written to per storage log
-        // deduplication rules. If we write such values to the tree, it'd result in bogus tree hashes because
-        // new (bogus) leaf indices would be allocated for them. To filter out those values, it's sufficient
-        // to check when a `storage_key` was first written per `initial_writes` table. If this never occurred
-        // or occurred after the considered `l1_batch_number`, this means that the write must be ignored.
-        //
-        // Note that this approach doesn't filter out no-op writes of the same value, but this is fine;
-        // since no new leaf indices are allocated in the tree for them, such writes are no-op on the tree side as well.
-        let hashed_keys_for_zero_values: Vec<_> = touched_slots
-            .iter()
-            .filter(|(_, value)| {
-                // Only zero values are worth checking for initial writes; non-zero values are always
-                // written per deduplication rules.
-                value.is_zero()
-            })
-            .map(|(key, _)| key.hashed_key())
-            .collect();
-        METRICS
-            .load_changes_zero_values
-            .observe(hashed_keys_for_zero_values.len());
-
-        let latency = METRICS.start_load_stage(LoadChangesStage::LoadInitialWritesForZeroValues);
-        let l1_batches_for_initial_writes = storage
-            .storage_logs_dal()
-            .get_l1_batches_and_indices_for_initial_writes(&hashed_keys_for_zero_values)
-            .await;
-        latency.observe_with_count(hashed_keys_for_zero_values.len());
-
         for (storage_key, value) in touched_slots {
-            let write_matters = if value.is_zero() {
-                let initial_write_batch_for_key =
-                    l1_batches_for_initial_writes.get(&storage_key.hashed_key());
-                initial_write_batch_for_key.map_or(false, |&(number, _)| number <= l1_batch_number)
-            } else {
-                true
-            };
-
-            if write_matters {
-                storage_logs.insert(storage_key, StorageLog::new_write_log(storage_key, value));
+            if let Some(&(initial_write_batch_for_key, leaf_index)) =
+                l1_batches_for_initial_writes.get(&storage_key.hashed_key())
+            {
+                // Filter out logs that correspond to deduplicated writes.
+                if initial_write_batch_for_key <= l1_batch_number {
+                    storage_logs.insert(
+                        storage_key,
+                        TreeInstruction::write(storage_key, leaf_index, value),
+                    );
+                }
             }
         }
 
@@ -347,7 +329,7 @@ mod tests {
     use tempfile::TempDir;
 
     use zksync_dal::ConnectionPool;
-    use zksync_types::{proofs::PrepareBasicCircuitsJob, L2ChainId, StorageKey, StorageLogKind};
+    use zksync_types::{proofs::PrepareBasicCircuitsJob, L2ChainId, StorageKey, StorageLog};
 
     use super::*;
     use crate::{
@@ -386,6 +368,10 @@ mod tests {
                 .storage_logs_dal()
                 .get_previous_storage_values(&hashed_keys, l1_batch_number)
                 .await;
+            let l1_batches_for_initial_writes = storage
+                .storage_logs_dal()
+                .get_l1_batches_and_indices_for_initial_writes(&hashed_keys)
+                .await;
 
             for storage_key in protective_reads {
                 let previous_value = previous_values[&storage_key.hashed_key()].unwrap_or_default();
@@ -397,16 +383,17 @@ mod tests {
                     );
                 }
 
-                storage_logs.insert(
-                    storage_key,
-                    StorageLog::new_read_log(storage_key, previous_value),
-                );
+                storage_logs.insert(storage_key, TreeInstruction::Read(storage_key));
             }
 
             for (storage_key, value) in touched_slots {
                 let previous_value = previous_values[&storage_key.hashed_key()].unwrap_or_default();
                 if previous_value != value {
-                    storage_logs.insert(storage_key, StorageLog::new_write_log(storage_key, value));
+                    let (_, leaf_index) = l1_batches_for_initial_writes[&storage_key.hashed_key()];
+                    storage_logs.insert(
+                        storage_key,
+                        TreeInstruction::write(storage_key, leaf_index, value),
+                    );
                 }
             }
 
@@ -608,7 +595,7 @@ mod tests {
         let read_logs_count = l1_batch_with_logs
             .storage_logs
             .iter()
-            .filter(|log| log.kind == StorageLogKind::Read)
+            .filter(|log| matches!(log, TreeInstruction::Read(_)))
             .count();
         assert_eq!(read_logs_count, 7);
 
diff --git a/core/lib/zksync_core/src/metadata_calculator/metrics.rs b/core/lib/zksync_core/src/metadata_calculator/metrics.rs
index f2bedf47229d..f8ef8f85b641 100644
--- a/core/lib/zksync_core/src/metadata_calculator/metrics.rs
+++ b/core/lib/zksync_core/src/metadata_calculator/metrics.rs
@@ -35,7 +35,7 @@ pub(super) enum LoadChangesStage {
     LoadL1BatchHeader,
     LoadProtectiveReads,
     LoadTouchedSlots,
-    LoadInitialWritesForZeroValues,
+    LoadLeafIndices,
 }
 
 /// Latency metric for a certain stage of the tree update.