Skip to content

Commit

Permalink
feat(merkle tree): Snapshot recovery in metadata calculator (#607)
Browse files Browse the repository at this point in the history
## What ❔

Implements tree recovery based on Postgres data in Metadata calculator.

## Why ❔

We obviously need tree recovery. Driving it by Metadata calculator
(i.e., choreography) allows to restore multiple tree instances from the
same snapshot.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
slowli authored Dec 12, 2023
1 parent b89e5a4 commit f49418b
Show file tree
Hide file tree
Showing 22 changed files with 1,394 additions and 286 deletions.
2 changes: 1 addition & 1 deletion core/bin/merkle_tree_consistency_checker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Cli {
tracing::info!("Verifying consistency of Merkle tree at {db_path}");
let start = Instant::now();
let db = RocksDB::new(Path::new(db_path));
let tree = ZkSyncTree::new_lightweight(db);
let tree = ZkSyncTree::new_lightweight(db.into());

let l1_batch_number = if let Some(number) = self.l1_batch {
L1BatchNumber(number)
Expand Down
108 changes: 88 additions & 20 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,26 @@
},
"query": "UPDATE l1_batches SET hash = $1, merkle_root_hash = $2, compressed_repeated_writes = $3, compressed_initial_writes = $4, l2_l1_compressed_messages = $5, l2_l1_merkle_root = $6, zkporter_is_available = $7, parent_hash = $8, rollup_last_leaf_index = $9, pass_through_data_hash = $10, meta_parameters_hash = $11, compressed_state_diffs = $12, updated_at = now() WHERE number = $13 AND hash IS NULL"
},
"0a3c928a616b5ebc0b977bd773edcde721ca1c652ae2f8db41fb75cecdecb674": {
"describe": {
"columns": [
{
"name": "count",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "SELECT COUNT(*) FROM storage_logs WHERE miniblock_number = $1"
},
"0cbbcd30fde109c4c44162f94b6ed9bab4e9db9948d03e584c2cab543449d298": {
"describe": {
"columns": [
Expand Down Expand Up @@ -3370,6 +3390,40 @@
},
"query": "SELECT id, contract_address, source_code, contract_name, zk_compiler_version, compiler_version, optimization_used, optimizer_mode, constructor_arguments, is_system FROM contract_verification_requests WHERE status = 'successful' ORDER BY id"
},
"34d78a04a9aa834e1846083c991138cef1e81b57c874a27ec65de5793e5681cc": {
"describe": {
"columns": [
{
"name": "hashed_key",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "value",
"ordinal": 1,
"type_info": "Bytea"
},
{
"name": "index",
"ordinal": 2,
"type_info": "Int8"
}
],
"nullable": [
false,
false,
false
],
"parameters": {
"Left": [
"Int8",
"Bytea",
"Bytea"
]
}
},
"query": "SELECT storage_logs.hashed_key, storage_logs.value, initial_writes.index FROM storage_logs INNER JOIN initial_writes ON storage_logs.hashed_key = initial_writes.hashed_key WHERE storage_logs.miniblock_number = $1 AND storage_logs.hashed_key >= $2::bytea AND storage_logs.hashed_key <= $3::bytea ORDER BY storage_logs.hashed_key"
},
"357347157ed8ff19d223c54533c3a85bd7e64a37514d657f8d49bd6eb5be1806": {
"describe": {
"columns": [
Expand Down Expand Up @@ -5671,26 +5725,6 @@
},
"query": "UPDATE l1_batches SET hash = $1, merkle_root_hash = $2, commitment = $3, default_aa_code_hash = $4, compressed_repeated_writes = $5, compressed_initial_writes = $6, l2_l1_compressed_messages = $7, l2_l1_merkle_root = $8, zkporter_is_available = $9, bootloader_code_hash = $10, rollup_last_leaf_index = $11, aux_data_hash = $12, pass_through_data_hash = $13, meta_parameters_hash = $14, compressed_state_diffs = $15, updated_at = now() WHERE number = $16"
},
"780b30e56a3ecfb3daa5310168ac6cd9e94bd5f1d871e1eaf36fbfd463a5e7e0": {
"describe": {
"columns": [
{
"name": "address_and_key?",
"ordinal": 0,
"type_info": "ByteaArray"
}
],
"nullable": [
null
],
"parameters": {
"Left": [
"ByteaArray"
]
}
},
"query": "SELECT (SELECT ARRAY[address,key] FROM storage_logs WHERE hashed_key = u.hashed_key ORDER BY miniblock_number, operation_number LIMIT 1) as \"address_and_key?\" FROM UNNEST($1::bytea[]) AS u(hashed_key)"
},
"78ba607e97bdf8b7c0b5e3cf87e10dc3b352a8552c2e94532b0f392af7dbe9cd": {
"describe": {
"columns": [
Expand Down Expand Up @@ -9484,6 +9518,40 @@
},
"query": "\n UPDATE scheduler_witness_jobs_fri\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now(),\n picked_by = $2\n WHERE l1_batch_number = (\n SELECT l1_batch_number\n FROM scheduler_witness_jobs_fri\n WHERE status = 'queued'\n AND protocol_version = ANY($1)\n ORDER BY l1_batch_number ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING scheduler_witness_jobs_fri.*\n "
},
"c289a50ec67dcab5a5c84ddfa17b924c1fefe151c887b3f08f54306f1bde47a2": {
"describe": {
"columns": [
{
"name": "hashed_key?",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "value?",
"ordinal": 1,
"type_info": "Bytea"
},
{
"name": "index",
"ordinal": 2,
"type_info": "Int8"
}
],
"nullable": [
null,
null,
true
],
"parameters": {
"Left": [
"Int8",
"ByteaArray",
"ByteaArray"
]
}
},
"query": "WITH sl AS ( SELECT ( SELECT ARRAY[hashed_key, value] AS kv FROM storage_logs WHERE storage_logs.miniblock_number = $1 AND storage_logs.hashed_key >= u.start_key AND storage_logs.hashed_key <= u.end_key ORDER BY storage_logs.hashed_key LIMIT 1 ) FROM UNNEST($2::bytea[], $3::bytea[]) AS u(start_key, end_key) ) SELECT sl.kv[1] AS \"hashed_key?\", sl.kv[2] AS \"value?\", initial_writes.index FROM sl LEFT OUTER JOIN initial_writes ON initial_writes.hashed_key = sl.kv[1]"
},
"c2cf96a9eb6893c5ba7d9e5418d9f24084ccd87980cb6ee05de1b3bde5c654bd": {
"describe": {
"columns": [],
Expand Down
41 changes: 31 additions & 10 deletions core/lib/dal/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ pub struct ConnectionPoolBuilder<'a> {
}

impl<'a> fmt::Debug for ConnectionPoolBuilder<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
// Database URL is potentially sensitive, thus we omit it.
f.debug_struct("ConnectionPoolBuilder")
formatter
.debug_struct("ConnectionPoolBuilder")
.field("max_size", &self.max_size)
.field("statement_timeout", &self.statement_timeout)
.finish()
Expand Down Expand Up @@ -66,7 +67,10 @@ impl<'a> ConnectionPoolBuilder<'a> {
max_connections = self.max_size,
statement_timeout = self.statement_timeout
);
Ok(ConnectionPool(pool))
Ok(ConnectionPool {
inner: pool,
max_size: self.max_size,
})
}
}

Expand All @@ -81,7 +85,9 @@ impl<'a> ConnectionPoolBuilder<'a> {
pub(super) async fn create_test_db() -> anyhow::Result<url::Url> {
use rand::Rng as _;
use sqlx::{Connection as _, Executor as _};

const PREFIX: &str = "test-";

let db_url = get_test_database_url().unwrap();
let mut db_url = url::Url::parse(&db_url)
.with_context(|| format!("{} is not a valid database address", db_url))?;
Expand Down Expand Up @@ -115,11 +121,17 @@ pub(super) async fn create_test_db() -> anyhow::Result<url::Url> {
}

#[derive(Clone)]
pub struct ConnectionPool(pub(crate) PgPool);
pub struct ConnectionPool {
pub(crate) inner: PgPool,
max_size: u32,
}

impl fmt::Debug for ConnectionPool {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("ConnectionPool").finish()
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("ConnectionPool")
.field("max_size", &self.max_size)
.finish_non_exhaustive()
}
}

Expand Down Expand Up @@ -152,6 +164,13 @@ impl ConnectionPool {
Self::builder(database_url, 1)
}

/// Returns the maximum number of connections in this pool specified during its creation.
/// This number may be distinct from the current number of connections in the pool (including
/// idle ones).
pub fn max_size(&self) -> u32 {
self.max_size
}

/// Creates a `StorageProcessor` entity over a recoverable connection.
/// Upon a database outage connection will block the thread until
/// it will be able to recover the connection (or, if connection cannot
Expand Down Expand Up @@ -198,10 +217,12 @@ impl ConnectionPool {

let mut retry_count = 0;
while retry_count < DB_CONNECTION_RETRIES {
CONNECTION_METRICS.pool_size.observe(self.0.size() as usize);
CONNECTION_METRICS.pool_idle.observe(self.0.num_idle());
CONNECTION_METRICS
.pool_size
.observe(self.inner.size() as usize);
CONNECTION_METRICS.pool_idle.observe(self.inner.num_idle());

let connection = self.0.acquire().await;
let connection = self.inner.acquire().await;
let connection_err = match connection {
Ok(connection) => return Ok(connection),
Err(err) => {
Expand All @@ -218,7 +239,7 @@ impl ConnectionPool {
}

// Attempting to get the pooled connection for the last time
match self.0.acquire().await {
match self.inner.acquire().await {
Ok(conn) => Ok(conn),
Err(err) => {
Self::report_connection_error(&err);
Expand Down
9 changes: 5 additions & 4 deletions core/lib/dal/src/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use serde::Serialize;
use sqlx::PgPool;
use zksync_health_check::{async_trait, CheckHealth, Health, HealthStatus};

use crate::ConnectionPool;

#[derive(Debug, Serialize)]
struct ConnectionPoolHealthDetails {
pool_size: u32,
max_size: u32,
}

impl ConnectionPoolHealthDetails {
async fn new(pool: &PgPool) -> Self {
fn new(pool: &ConnectionPool) -> Self {
Self {
pool_size: pool.size(),
pool_size: pool.inner.size(),
max_size: pool.max_size(),
}
}
}
Expand Down Expand Up @@ -41,7 +42,7 @@ impl CheckHealth for ConnectionPoolHealthCheck {
// This check is rather feeble, plan to make reliable here:
// https://linear.app/matterlabs/issue/PLA-255/revamp-db-connection-health-check
self.connection_pool.access_storage().await.unwrap();
let details = ConnectionPoolHealthDetails::new(&self.connection_pool.0).await;
let details = ConnectionPoolHealthDetails::new(&self.connection_pool);
Health::from(HealthStatus::Ready).with_details(details)
}
}
12 changes: 10 additions & 2 deletions core/lib/dal/src/models/storage_log.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use sqlx::types::chrono::NaiveDateTime;
use zksync_types::{AccountTreeId, Address, StorageKey, StorageLog, StorageLogKind, H256};
use zksync_types::{AccountTreeId, Address, StorageKey, StorageLog, StorageLogKind, H256, U256};

#[derive(sqlx::FromRow, Debug, Clone)]
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct DBStorageLog {
pub id: i64,
pub hashed_key: Vec<u8>,
Expand All @@ -27,3 +27,11 @@ impl From<DBStorageLog> for StorageLog {
}
}
}

// We don't want to rely on the Merkle tree crate to import a single type, so we duplicate `TreeEntry` here.
#[derive(Debug, Clone, Copy)]
pub struct StorageTreeEntry {
pub key: U256,
pub value: H256,
pub leaf_index: u64,
}
Loading

0 comments on commit f49418b

Please sign in to comment.