Skip to content

Commit

Permalink
feat(en): Restore state keeper storage from snapshot (#885)
Browse files Browse the repository at this point in the history
## What ❔

Allows to restore secondary RocksDB storage used by the state keeper
from a snapshot.

## Why ❔

Part of preparations of EN code to support snapshot recovery.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jan 23, 2024
1 parent c2fe45a commit 872913a
Show file tree
Hide file tree
Showing 45 changed files with 1,570 additions and 824 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ async fn init_tasks(
memtable_capacity: config.optional.merkle_tree_memtable_capacity(),
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
};
let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None).await;
let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None)
.await
.context("failed initializing metadata calculator")?;
healthchecks.push(Box::new(metadata_calculator.tree_health_check()));

let consistency_checker = ConsistencyChecker::new(
Expand Down
2 changes: 1 addition & 1 deletion core/bin/merkle_tree_consistency_checker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Cli {
let db_path = &config.merkle_tree.path;
tracing::info!("Verifying consistency of Merkle tree at {db_path}");
let start = Instant::now();
let db = RocksDB::new(Path::new(db_path));
let db = RocksDB::new(Path::new(db_path)).unwrap();
let tree = ZkSyncTree::new_lightweight(db.into());

let l1_batch_number = if let Some(number) = self.l1_batch {
Expand Down
1 change: 0 additions & 1 deletion core/bin/snapshots_creator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ prometheus_exporter = { path = "../../lib/prometheus_exporter" }
zksync_config = { path = "../../lib/config" }
zksync_dal = { path = "../../lib/dal" }
zksync_env_config = { path = "../../lib/env_config" }
zksync_utils = { path = "../../lib/utils" }
zksync_types = { path = "../../lib/types" }
zksync_object_store = { path = "../../lib/object_store" }
vlog = { path = "../../lib/vlog" }
Expand Down
69 changes: 0 additions & 69 deletions core/bin/snapshots_creator/src/chunking.rs

This file was deleted.

23 changes: 13 additions & 10 deletions core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,15 @@ use zksync_dal::{ConnectionPool, StorageProcessor};
use zksync_object_store::ObjectStore;
use zksync_types::{
snapshots::{
SnapshotFactoryDependencies, SnapshotMetadata, SnapshotStorageLogsChunk,
SnapshotStorageLogsStorageKey,
uniform_hashed_keys_chunk, SnapshotFactoryDependencies, SnapshotFactoryDependency,
SnapshotMetadata, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey,
},
L1BatchNumber, MiniblockNumber,
};
use zksync_utils::ceil_div;

use crate::metrics::{FactoryDepsStage, StorageChunkStage, METRICS};
#[cfg(test)]
use crate::tests::HandleEvent;
use crate::{
chunking::get_chunk_hashed_keys_range,
metrics::{FactoryDepsStage, StorageChunkStage, METRICS},
};

/// Encapsulates progress of creating a particular storage snapshot.
#[derive(Debug)]
Expand Down Expand Up @@ -91,7 +87,7 @@ impl SnapshotCreator {
return Ok(());
}

let hashed_keys_range = get_chunk_hashed_keys_range(chunk_id, chunk_count);
let hashed_keys_range = uniform_hashed_keys_chunk(chunk_id, chunk_count);
let mut conn = self.connect_to_replica().await?;

let latency =
Expand Down Expand Up @@ -166,6 +162,12 @@ impl SnapshotCreator {
tracing::info!("Saving factory deps to GCS...");
let latency =
METRICS.factory_deps_processing_duration[&FactoryDepsStage::SaveToGcs].start();
let factory_deps = factory_deps
.into_iter()
.map(|(_, bytecode)| SnapshotFactoryDependency {
bytecode: bytecode.into(),
})
.collect();
let factory_deps = SnapshotFactoryDependencies { factory_deps };
let filename = self
.blob_store
Expand Down Expand Up @@ -216,8 +218,9 @@ impl SnapshotCreator {
.await?;
let chunk_size = config.storage_logs_chunk_size;
// We force the minimum number of chunks to avoid situations where only one chunk is created in tests.
let chunk_count =
ceil_div(distinct_storage_logs_keys_count, chunk_size).max(min_chunk_count);
let chunk_count = distinct_storage_logs_keys_count
.div_ceil(chunk_size)
.max(min_chunk_count);

tracing::info!(
"Selected storage logs chunking for L1 batch {l1_batch_number}: \
Expand Down
1 change: 0 additions & 1 deletion core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use zksync_object_store::ObjectStoreFactory;

use crate::creator::SnapshotCreator;

mod chunking;
mod creator;
mod metrics;
#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion core/bin/snapshots_creator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ async fn prepare_postgres(
let expected_l1_batches_and_indices = conn
.storage_logs_dal()
.get_l1_batches_and_indices_for_initial_writes(&hashed_keys)
.await;
.await
.unwrap();

let logs = logs.into_iter().map(|log| {
let (l1_batch_number_of_initial_write, enumeration_index) =
Expand Down
3 changes: 2 additions & 1 deletion core/bin/storage_logs_dedup_migration/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ async fn main() {
let values_for_missing_keys: HashMap<_, _> = connection
.storage_logs_dal()
.get_storage_values(&missing_keys, miniblock_number - 1)
.await;
.await
.expect("failed getting storage values for missing keys");

in_memory_prev_values_iter
.chain(
Expand Down

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions core/lib/dal/src/models/storage_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ impl From<DBStorageLog> for StorageLog {

// We don't want to rely on the Merkle tree crate to import a single type, so we duplicate `TreeEntry` here.
#[derive(Debug, Clone, Copy)]
pub struct StorageTreeEntry {
pub key: U256,
pub struct StorageRecoveryLogEntry {
pub key: H256,
pub value: H256,
pub leaf_index: u64,
}

impl StorageRecoveryLogEntry {
/// Converts `key` to the format used by the Merkle tree (little-endian [`U256`]).
pub fn tree_key(&self) -> U256 {
U256::from_little_endian(&self.key.0)
}
}
12 changes: 6 additions & 6 deletions core/lib/dal/src/snapshots_creator_dal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use zksync_types::{
snapshots::{SnapshotFactoryDependency, SnapshotStorageLog},
AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, H256,
snapshots::SnapshotStorageLog, AccountTreeId, Address, L1BatchNumber, MiniblockNumber,
StorageKey, H256,
};

use crate::{instrument::InstrumentExt, StorageProcessor};
Expand Down Expand Up @@ -99,13 +99,15 @@ impl SnapshotsCreatorDal<'_, '_> {
Ok(storage_logs)
}

/// Returns all factory dependencies up to and including the specified `miniblock_number`.
pub async fn get_all_factory_deps(
&mut self,
miniblock_number: MiniblockNumber,
) -> sqlx::Result<Vec<SnapshotFactoryDependency>> {
) -> sqlx::Result<Vec<(H256, Vec<u8>)>> {
let rows = sqlx::query!(
r#"
SELECT
bytecode_hash,
bytecode
FROM
factory_deps
Expand All @@ -121,9 +123,7 @@ impl SnapshotsCreatorDal<'_, '_> {

Ok(rows
.into_iter()
.map(|row| SnapshotFactoryDependency {
bytecode: row.bytecode.into(),
})
.map(|row| (H256::from_slice(&row.bytecode_hash), row.bytecode))
.collect())
}
}
9 changes: 4 additions & 5 deletions core/lib/dal/src/storage_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl StorageDal<'_, '_> {
pub async fn get_factory_deps_for_revert(
&mut self,
block_number: MiniblockNumber,
) -> Vec<H256> {
sqlx::query!(
) -> sqlx::Result<Vec<H256>> {
Ok(sqlx::query!(
r#"
SELECT
bytecode_hash
Expand All @@ -147,11 +147,10 @@ impl StorageDal<'_, '_> {
block_number.0 as i64
)
.fetch_all(self.storage.conn())
.await
.unwrap()
.await?
.into_iter()
.map(|row| H256::from_slice(&row.bytecode_hash))
.collect()
.collect())
}

/// Applies the specified storage logs for a miniblock. Returns the map of unique storage updates.
Expand Down
Loading

0 comments on commit 872913a

Please sign in to comment.