Skip to content

Commit

Permalink
feat(en): Improve tree snapshot recovery (#1938)
Browse files Browse the repository at this point in the history
## What ❔

- Adds more logs that will allow to track tree recovery progress more
clearly.
- Uses tagged DB connections.
- Makes chunk size configurable.

## Why ❔

- Logs and tagged connections improve observability.
- Configuring chunk size during tree recovery allows to fine-tune its
performance (theoretically; needs to be tested on E2E tests).

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
slowli authored May 29, 2024

Unverified

This user has not yet uploaded their public signing key.
1 parent 5e5628f commit 5bc8234
Showing 11 changed files with 230 additions and 33 deletions.
15 changes: 15 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ use zksync_config::{
use zksync_core_leftovers::temp_config_store::decode_yaml_repr;
#[cfg(test)]
use zksync_dal::{ConnectionPool, Core};
use zksync_metadata_calculator::MetadataCalculatorRecoveryConfig;
use zksync_node_api_server::{
tx_sender::TxSenderConfig,
web3::{state::InternalApiConfig, Namespace},
@@ -746,6 +747,15 @@ pub(crate) struct ExperimentalENConfig {
/// as a rudimentary way to control RAM usage of the cache.
pub state_keeper_db_max_open_files: Option<NonZeroU32>,

// Snapshot recovery
/// Approximate chunk size (measured in the number of entries) to recover in a single iteration.
/// Reasonable values are order of 100,000 (meaning an iteration takes several seconds).
///
/// **Important.** This value cannot be changed in the middle of tree recovery (i.e., if a node is stopped in the middle
/// of recovery and then restarted with a different config).
#[serde(default = "ExperimentalENConfig::default_snapshots_recovery_tree_chunk_size")]
pub snapshots_recovery_tree_chunk_size: u64,

// Commitment generator
/// Maximum degree of parallelism during commitment generation, i.e., the maximum number of L1 batches being processed in parallel.
/// If not specified, commitment generator will use a value roughly equal to the number of CPU cores with some clamping applied.
@@ -757,12 +767,17 @@ impl ExperimentalENConfig {
128
}

fn default_snapshots_recovery_tree_chunk_size() -> u64 {
MetadataCalculatorRecoveryConfig::default().desired_chunk_size
}

#[cfg(test)]
fn mock() -> Self {
Self {
state_keeper_db_block_cache_capacity_mb:
Self::default_state_keeper_db_block_cache_capacity_mb(),
state_keeper_db_max_open_files: None,
snapshots_recovery_tree_chunk_size: Self::default_snapshots_recovery_tree_chunk_size(),
commitment_generator_max_parallelism: None,
}
}
5 changes: 4 additions & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ use zksync_db_connection::{
use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck};
use zksync_metadata_calculator::{
api_server::{TreeApiClient, TreeApiHttpClient},
MetadataCalculator, MetadataCalculatorConfig,
MetadataCalculator, MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig,
};
use zksync_node_api_server::{
execution_sandbox::VmConcurrencyLimiter,
@@ -139,6 +139,9 @@ async fn run_tree(
.merkle_tree_include_indices_and_filters_in_block_cache,
memtable_capacity: config.optional.merkle_tree_memtable_capacity(),
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
recovery: MetadataCalculatorRecoveryConfig {
desired_chunk_size: config.experimental.snapshots_recovery_tree_chunk_size,
},
};

let max_concurrency = config
5 changes: 5 additions & 0 deletions core/lib/merkle_tree/src/lib.rs
Original file line number Diff line number Diff line change
@@ -256,6 +256,8 @@ impl<DB: PruneDatabase> MerkleTree<DB> {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::types::TreeTags;

@@ -268,6 +270,7 @@ mod tests {
depth: 256,
hasher: "blake2s256".to_string(),
is_recovering: false,
custom: HashMap::new(),
});

MerkleTree::new(db);
@@ -282,6 +285,7 @@ mod tests {
depth: 128,
hasher: "blake2s256".to_string(),
is_recovering: false,
custom: HashMap::new(),
});

MerkleTree::new(db);
@@ -296,6 +300,7 @@ mod tests {
depth: 256,
hasher: "sha256".to_string(),
is_recovering: false,
custom: HashMap::new(),
});

MerkleTree::new(db);
20 changes: 19 additions & 1 deletion core/lib/merkle_tree/src/recovery.rs
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@
//! before extending the tree; these nodes are guaranteed to be the *only* DB reads necessary
//! to insert new entries.
use std::time::Instant;
use std::{collections::HashMap, time::Instant};

use zksync_crypto::hasher::blake2::Blake2Hasher;

@@ -111,6 +111,24 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
}
}

/// Updates custom tags for the tree using the provided closure. The update is atomic and unconditional.
#[allow(clippy::missing_panics_doc)] // should never be triggered; the manifest is added in the constructor
pub fn update_custom_tags<R>(
&mut self,
update: impl FnOnce(&mut HashMap<String, String>) -> R,
) -> R {
let mut manifest = self
.db
.manifest()
.expect("Merkle tree manifest disappeared");
let tags = manifest
.tags
.get_or_insert_with(|| TreeTags::new(&self.hasher));
let output = update(&mut tags.custom);
self.db.apply_patch(PatchSet::from_manifest(manifest));
output
}

/// Returns the version of the tree being recovered.
pub fn recovered_version(&self) -> u64 {
self.recovered_version
55 changes: 52 additions & 3 deletions core/lib/merkle_tree/src/storage/serialization.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Serialization of node types in the database.
use std::str;
use std::{collections::HashMap, str};

use crate::{
errors::{DeserializeError, DeserializeErrorKind, ErrorContext},
@@ -206,12 +206,14 @@ impl Node {
impl TreeTags {
/// Tags are serialized as a length-prefixed list of `(&str, &str)` tuples, where each
/// `&str` is length-prefixed as well. All lengths are encoded using LEB128.
/// Custom tag keys are prefixed with `custom.` to ensure they don't intersect with standard tags.
fn deserialize(bytes: &mut &[u8]) -> Result<Self, DeserializeError> {
let tag_count = leb128::read::unsigned(bytes).map_err(DeserializeErrorKind::Leb128)?;
let mut architecture = None;
let mut hasher = None;
let mut depth = None;
let mut is_recovering = false;
let mut custom = HashMap::new();

for _ in 0..tag_count {
let key = Self::deserialize_str(bytes)?;
@@ -237,14 +239,21 @@ impl TreeTags {
})?;
is_recovering = parsed;
}
_ => return Err(DeserializeErrorKind::UnknownTag(key.to_owned()).into()),
key => {
if let Some(custom_key) = key.strip_prefix("custom.") {
custom.insert(custom_key.to_owned(), value.to_owned());
} else {
return Err(DeserializeErrorKind::UnknownTag(key.to_owned()).into());
}
}
}
}
Ok(Self {
architecture: architecture.ok_or(DeserializeErrorKind::MissingTag("architecture"))?,
hasher: hasher.ok_or(DeserializeErrorKind::MissingTag("hasher"))?,
depth: depth.ok_or(DeserializeErrorKind::MissingTag("depth"))?,
is_recovering,
custom,
})
}

@@ -266,8 +275,9 @@ impl TreeTags {
}

fn serialize(&self, buffer: &mut Vec<u8>) {
let entry_count = 3 + u64::from(self.is_recovering);
let entry_count = 3 + u64::from(self.is_recovering) + self.custom.len() as u64;
leb128::write::unsigned(buffer, entry_count).unwrap();

Self::serialize_str(buffer, "architecture");
Self::serialize_str(buffer, &self.architecture);
Self::serialize_str(buffer, "depth");
@@ -278,6 +288,11 @@ impl TreeTags {
Self::serialize_str(buffer, "is_recovering");
Self::serialize_str(buffer, "true");
}

for (custom_key, value) in &self.custom {
Self::serialize_str(buffer, &format!("custom.{custom_key}"));
Self::serialize_str(buffer, value);
}
}
}

@@ -347,6 +362,40 @@ mod tests {
assert_eq!(manifest_copy, manifest);
}

#[test]
fn serializing_manifest_with_custom_tags() {
let mut manifest = Manifest::new(42, &());
// Test a single custom tag first to not deal with non-determinism when enumerating tags.
manifest.tags.as_mut().unwrap().custom =
HashMap::from([("test".to_owned(), "1".to_owned())]);
let mut buffer = vec![];
manifest.serialize(&mut buffer);
assert_eq!(buffer[0], 42); // version count
assert_eq!(buffer[1], 4); // number of tags (3 standard + 1 custom)
assert_eq!(
buffer[2..],
*b"\x0Carchitecture\x06AR16MT\x05depth\x03256\x06hasher\x08no_op256\x0Bcustom.test\x011"
);

let manifest_copy = Manifest::deserialize(&buffer).unwrap();
assert_eq!(manifest_copy, manifest);

// Test multiple tags.
let tags = manifest.tags.as_mut().unwrap();
tags.is_recovering = true;
tags.custom = HashMap::from([
("test".to_owned(), "1".to_owned()),
("other.long.tag".to_owned(), "123456!!!".to_owned()),
]);
let mut buffer = vec![];
manifest.serialize(&mut buffer);
assert_eq!(buffer[0], 42); // version count
assert_eq!(buffer[1], 6); // number of tags (4 standard + 2 custom)

let manifest_copy = Manifest::deserialize(&buffer).unwrap();
assert_eq!(manifest_copy, manifest);
}

#[test]
fn manifest_serialization_errors() {
let manifest = Manifest::new(42, &());
5 changes: 4 additions & 1 deletion core/lib/merkle_tree/src/types/internal.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
//! some of these types are declared as public and can be even exported using the `unstable` module.
//! Still, logically these types are private, so adding them to new public APIs etc. is a logical error.
use std::{fmt, num::NonZeroU64};
use std::{collections::HashMap, fmt, num::NonZeroU64};

use crate::{
hasher::{HashTree, InternalNodeCache},
@@ -25,6 +25,8 @@ pub(crate) struct TreeTags {
pub depth: usize,
pub hasher: String,
pub is_recovering: bool,
/// Custom / user-defined tags.
pub custom: HashMap<String, String>,
}

impl TreeTags {
@@ -36,6 +38,7 @@ impl TreeTags {
hasher: hasher.name().to_owned(),
depth: TREE_DEPTH,
is_recovering: false,
custom: HashMap::new(),
}
}

33 changes: 33 additions & 0 deletions core/node/metadata_calculator/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -395,6 +395,39 @@ impl AsyncTreeRecovery {
.recovered_version()
}

pub async fn ensure_desired_chunk_size(
&mut self,
desired_chunk_size: u64,
) -> anyhow::Result<()> {
const CHUNK_SIZE_KEY: &str = "recovery.desired_chunk_size";

let mut tree = self.inner.take().expect(Self::INCONSISTENT_MSG);
let tree = tokio::task::spawn_blocking(move || {
// **Important.** Tags should not be mutated on error (i.e., it would be an error to unconditionally call `tags.insert()`
// and then check the previous value).
tree.update_custom_tags(|tags| {
if let Some(chunk_size_in_tree) = tags.get(CHUNK_SIZE_KEY) {
let chunk_size_in_tree: u64 = chunk_size_in_tree
.parse()
.with_context(|| format!("error parsing desired_chunk_size `{chunk_size_in_tree}` in Merkle tree tags"))?;
anyhow::ensure!(
chunk_size_in_tree == desired_chunk_size,
"Mismatch between the configured desired chunk size ({desired_chunk_size}) and one that was used previously ({chunk_size_in_tree}). \
Either change the desired chunk size in configuration, or reset Merkle tree recovery by clearing its RocksDB directory"
);
} else {
tags.insert(CHUNK_SIZE_KEY.to_owned(), desired_chunk_size.to_string());
}
Ok(())
})?;
anyhow::Ok(tree)
})
.await??;

self.inner = Some(tree);
Ok(())
}

/// Returns an entry for the specified keys.
pub async fn entries(&mut self, keys: Vec<Key>) -> Vec<TreeEntry> {
let tree = self.inner.take().expect(Self::INCONSISTENT_MSG);
25 changes: 24 additions & 1 deletion core/node/metadata_calculator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -37,6 +37,24 @@ mod recovery;
pub(crate) mod tests;
mod updater;

#[derive(Debug, Clone)]
pub struct MetadataCalculatorRecoveryConfig {
/// Approximate chunk size (measured in the number of entries) to recover on a single iteration.
/// Reasonable values are order of 100,000 (meaning an iteration takes several seconds).
///
/// **Important.** This value cannot be changed in the middle of tree recovery (i.e., if a node is stopped in the middle
/// of recovery and then restarted with a different config).
pub desired_chunk_size: u64,
}

impl Default for MetadataCalculatorRecoveryConfig {
fn default() -> Self {
Self {
desired_chunk_size: 200_000,
}
}
}

/// Configuration of [`MetadataCalculator`].
#[derive(Debug, Clone)]
pub struct MetadataCalculatorConfig {
@@ -65,6 +83,8 @@ pub struct MetadataCalculatorConfig {
pub memtable_capacity: usize,
/// Timeout to wait for the Merkle tree database to run compaction on stalled writes.
pub stalled_writes_timeout: Duration,
/// Configuration specific to the Merkle tree recovery.
pub recovery: MetadataCalculatorRecoveryConfig,
}

impl MetadataCalculatorConfig {
@@ -83,6 +103,8 @@ impl MetadataCalculatorConfig {
include_indices_and_filters_in_block_cache: false,
memtable_capacity: merkle_tree_config.memtable_capacity(),
stalled_writes_timeout: merkle_tree_config.stalled_writes_timeout(),
// The main node isn't supposed to be recovered yet, so this value doesn't matter much
recovery: MetadataCalculatorRecoveryConfig::default(),
}
}
}
@@ -193,10 +215,11 @@ impl MetadataCalculator {
let tree = self.create_tree().await?;
let tree = tree
.ensure_ready(
&self.config.recovery,
&self.pool,
self.recovery_pool,
&stop_receiver,
&self.health_updater,
&stop_receiver,
)
.await?;
let Some(mut tree) = tree else {
Loading

0 comments on commit 5bc8234

Please sign in to comment.