Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: matter-labs/zksync-era
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: ba0d8b2a454d18b10f7439c602c95758f91e12da
Choose a base ref
..
head repository: matter-labs/zksync-era
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 512a7899390457f9c427d3d55c429de6d15f6e8b
Choose a head ref
Showing with 1,210 additions and 136 deletions.
  1. +23 −0 Cargo.lock
  2. +2 −0 Cargo.toml
  3. +7 −1 core/bin/external_node/src/init.rs
  4. +3 −0 core/bin/external_node/src/main.rs
  5. +63 −2 core/bin/external_node/src/node_builder.rs
  6. +8 −55 core/bin/zksync_server/src/main.rs
  7. +58 −5 core/bin/zksync_server/src/node_builder.rs
  8. +45 −7 core/lib/snapshots_applier/src/lib.rs
  9. +72 −18 core/lib/snapshots_applier/src/tests/mod.rs
  10. +0 −32 core/lib/zksync_core_leftovers/src/lib.rs
  11. +7 −7 core/node/genesis/src/lib.rs
  12. +1 −0 core/node/node_framework/Cargo.toml
  13. +2 −0 core/node/node_framework/src/implementations/layers/mod.rs
  14. +101 −0 core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs
  15. +64 −0 core/node/node_framework/src/implementations/layers/node_storage_init/main_node_strategy.rs
  16. +160 −0 core/node/node_framework/src/implementations/layers/node_storage_init/mod.rs
  17. +72 −0 core/node/node_framework/src/implementations/layers/reorg_detector.rs
  18. +2 −4 core/node/node_framework/src/implementations/resources/reverter.rs
  19. +29 −0 core/node/node_storage_init/Cargo.toml
  20. +5 −0 core/node/node_storage_init/README.md
  21. +39 −0 core/node/node_storage_init/src/external_node/genesis.rs
  22. +8 −0 core/node/node_storage_init/src/external_node/mod.rs
  23. +50 −0 core/node/node_storage_init/src/external_node/revert.rs
  24. +82 −0 core/node/node_storage_init/src/external_node/snapshot_recovery.rs
  25. +213 −0 core/node/node_storage_init/src/lib.rs
  26. +54 −0 core/node/node_storage_init/src/main_node/genesis.rs
  27. +3 −0 core/node/node_storage_init/src/main_node/mod.rs
  28. +33 −0 core/node/node_storage_init/src/traits.rs
  29. +4 −0 core/node/node_sync/src/genesis.rs
  30. +0 −5 infrastructure/zk/src/server.ts
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ members = [
"core/node/consistency_checker",
"core/node/metadata_calculator",
"core/node/node_sync",
"core/node/node_storage_init",
"core/node/consensus",
"core/node/contract_verification_server",
"core/node/api_server",
@@ -277,6 +278,7 @@ zksync_reorg_detector = { path = "core/node/reorg_detector" }
zksync_consistency_checker = { path = "core/node/consistency_checker" }
zksync_metadata_calculator = { path = "core/node/metadata_calculator" }
zksync_node_sync = { path = "core/node/node_sync" }
zksync_node_storage_init = { path = "core/node/node_storage_init" }
zksync_node_consensus = { path = "core/node/consensus" }
zksync_contract_verification_server = { path = "core/node/contract_verification_server" }
zksync_node_api_server = { path = "core/node/api_server" }
8 changes: 7 additions & 1 deletion core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
use std::time::Instant;

use anyhow::Context as _;
use tokio::sync::watch;
use zksync_config::ObjectStoreConfig;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::AppHealthCheck;
@@ -30,6 +31,7 @@ enum InitDecision {
}

pub(crate) async fn ensure_storage_initialized(
stop_receiver: watch::Receiver<bool>,
pool: ConnectionPool<Core>,
main_node_client: Box<DynClient<L2>>,
app_health: &AppHealthCheck,
@@ -120,7 +122,7 @@ pub(crate) async fn ensure_storage_initialized(

let recovery_started_at = Instant::now();
let stats = snapshots_applier_task
.run()
.run(stop_receiver)
.await
.context("snapshot recovery failed")?;
if stats.done_work {
@@ -129,6 +131,10 @@ pub(crate) async fn ensure_storage_initialized(
.set(latency);
tracing::info!("Recovered Postgres from snapshot in {latency:?}");
}
assert!(
!stats.canceled,
"Snapshot recovery task cannot be canceled in the current implementation"
);
}
}
Ok(())
3 changes: 3 additions & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
@@ -976,7 +976,10 @@ async fn run_node(
.snapshots_recovery_drop_storage_key_preimages,
object_store_config: config.optional.snapshots_recovery_object_store.clone(),
});
// Note: while stop receiver is passed there, it won't be respected, since we wait this task
// to complete. Will be fixed after migration to the node framework.
ensure_storage_initialized(
stop_receiver.clone(),
connection_pool.clone(),
main_node_client.clone(),
&app_health,
65 changes: 63 additions & 2 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
@@ -23,11 +23,16 @@ use zksync_node_framework::{
main_node_client::MainNodeClientLayer,
main_node_fee_params_fetcher::MainNodeFeeParamsFetcherLayer,
metadata_calculator::MetadataCalculatorLayer,
node_storage_init::{
external_node_strategy::{ExternalNodeInitStrategyLayer, SnapshotRecoveryConfig},
NodeStorageInitializerLayer,
},
pools_layer::PoolsLayerBuilder,
postgres_metrics::PostgresMetricsLayer,
prometheus_exporter::PrometheusExporterLayer,
pruning::PruningLayer,
query_eth_client::QueryEthClientLayer,
reorg_detector::ReorgDetectorLayer,
sigint::SigintHandlerLayer,
state_keeper::{
external_io::ExternalIOLayer, main_batch_executor::MainBatchExecutorLayer,
@@ -421,6 +426,49 @@ impl ExternalNodeBuilder {
Ok(self)
}

fn add_reorg_detector_layer(mut self) -> anyhow::Result<Self> {
self.node.add_layer(ReorgDetectorLayer);
Ok(self)
}

/// This layer will make sure that the database is initialized correctly,
/// e.g.:
/// - genesis or snapshot recovery will be performed if it's required.
/// - we perform the storage rollback if required (e.g. if reorg is detected).
///
/// Depending on the `kind` provided, either a task or a precondition will be added.
///
/// *Important*: the task should be added by at most one component, because
/// it assumes unique control over the database. Multiple components adding this
/// layer in a distributed mode may result in the database corruption.
///
/// This task works in pair with precondition, which must be present in every component:
/// the precondition will prevent node from starting until the database is initialized.
fn add_storage_initialization_layer(mut self, kind: LayerKind) -> anyhow::Result<Self> {
let config = &self.config;
let snapshot_recovery_config =
config
.optional
.snapshots_recovery_enabled
.then_some(SnapshotRecoveryConfig {
snapshot_l1_batch_override: config.experimental.snapshots_recovery_l1_batch,
drop_storage_key_preimages: config
.experimental
.snapshots_recovery_drop_storage_key_preimages,
object_store_config: config.optional.snapshots_recovery_object_store.clone(),
});
self.node.add_layer(ExternalNodeInitStrategyLayer {
l2_chain_id: self.config.required.l2_chain_id,
snapshot_recovery_config,
});
let mut layer = NodeStorageInitializerLayer::new();
if matches!(kind, LayerKind::Precondition) {
layer = layer.as_precondition();
}
self.node.add_layer(layer);
Ok(self)
}

pub fn build(mut self, mut components: Vec<Component>) -> anyhow::Result<ZkStackService> {
// Add "base" layers
self = self
@@ -429,12 +477,14 @@ impl ExternalNodeBuilder {
.add_prometheus_exporter_layer()?
.add_pools_layer()?
.add_main_node_client_layer()?
.add_query_eth_client_layer()?;
.add_query_eth_client_layer()?
.add_reorg_detector_layer()?;

// Add preconditions for all the components.
self = self
.add_l1_batch_commitment_mode_validation_layer()?
.add_validate_chain_ids_layer()?;
.add_validate_chain_ids_layer()?
.add_storage_initialization_layer(LayerKind::Precondition)?;

// Sort the components, so that the components they may depend on each other are added in the correct order.
components.sort_unstable_by_key(|component| match component {
@@ -499,10 +549,21 @@ impl ExternalNodeBuilder {
.add_consistency_checker_layer()?
.add_commitment_generator_layer()?
.add_batch_status_updater_layer()?;

// We assign the storage initialization to the core, as it's considered to be
// the "main" component.
self = self.add_storage_initialization_layer(LayerKind::Task)?;
}
}
}

Ok(self.node.build()?)
}
}

/// Marker for layers that can add either a task or a precondition.
#[derive(Debug)]
enum LayerKind {
Task,
Precondition,
}
63 changes: 8 additions & 55 deletions core/bin/zksync_server/src/main.rs
Original file line number Diff line number Diff line change
@@ -21,12 +21,10 @@ use zksync_config::{
SnapshotsCreatorConfig,
};
use zksync_core_leftovers::{
genesis_init, is_genesis_needed,
temp_config_store::{decode_yaml_repr, TempConfigStore},
Component, Components,
};
use zksync_env_config::FromEnv;
use zksync_eth_client::clients::Client;

use crate::node_builder::MainNodeBuilder;

@@ -42,9 +40,6 @@ struct Cli {
/// Generate genesis block for the first contract deployment using temporary DB.
#[arg(long)]
genesis: bool,
/// Rebuild tree.
#[arg(long)]
rebuild_tree: bool,
/// Comma-separated list of components to launch.
#[arg(
long,
@@ -180,65 +175,23 @@ fn main() -> anyhow::Result<()> {
}
};

run_genesis_if_needed(opt.genesis, &genesis, &contracts_config, &secrets)?;
if opt.genesis {
// If genesis is requested, we don't need to run the node.
return Ok(());
}

let components = if opt.rebuild_tree {
vec![Component::Tree]
} else {
opt.components.0
};

let node = MainNodeBuilder::new(
configs,
wallets,
genesis,
contracts_config,
secrets,
consensus,
)
.build(components)?;
node.run()?;
Ok(())
}
);

fn run_genesis_if_needed(
force_genesis: bool,
genesis: &GenesisConfig,
contracts_config: &ContractsConfig,
secrets: &Secrets,
) -> anyhow::Result<()> {
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
tokio_runtime.block_on(async move {
let database_secrets = secrets.database.clone().context("DatabaseSecrets")?;
if force_genesis || is_genesis_needed(&database_secrets).await {
genesis_init(genesis.clone(), &database_secrets)
.await
.context("genesis_init")?;
if opt.genesis {
// If genesis is requested, we don't need to run the node.
node.only_genesis()?.run()?;
return Ok(());
}

if let Some(ecosystem_contracts) = &contracts_config.ecosystem_contracts {
let l1_secrets = secrets.l1.as_ref().context("l1_screts")?;
let query_client = Client::http(l1_secrets.l1_rpc_url.clone())
.context("Ethereum client")?
.for_network(genesis.l1_chain_id.into())
.build();
zksync_node_genesis::save_set_chain_id_tx(
&query_client,
contracts_config.diamond_proxy_addr,
ecosystem_contracts.state_transition_proxy_addr,
&database_secrets,
)
.await
.context("Failed to save SetChainId upgrade transaction")?;
}
}
Ok(())
})
node.build(opt.components.0)?.run()?;
Ok(())
}

fn load_env_config() -> anyhow::Result<TempConfigStore> {
63 changes: 58 additions & 5 deletions core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
@@ -35,6 +35,9 @@ use zksync_node_framework::{
l1_batch_commitment_mode_validation::L1BatchCommitmentModeValidationLayer,
l1_gas::SequencerL1GasLayer,
metadata_calculator::MetadataCalculatorLayer,
node_storage_init::{
main_node_strategy::MainNodeInitStrategyLayer, NodeStorageInitializerLayer,
},
object_store::ObjectStoreLayer,
pk_signing_eth_client::PKSigningEthClientLayer,
pools_layer::PoolsLayerBuilder,
@@ -532,6 +535,41 @@ impl MainNodeBuilder {
Ok(self)
}

/// This layer will make sure that the database is initialized correctly,
/// e.g. genesis will be performed if it's required.
///
/// Depending on the `kind` provided, either a task or a precondition will be added.
///
/// *Important*: the task should be added by at most one component, because
/// it assumes unique control over the database. Multiple components adding this
/// layer in a distributed mode may result in the database corruption.
///
/// This task works in pair with precondition, which must be present in every component:
/// the precondition will prevent node from starting until the database is initialized.
fn add_storage_initialization_layer(mut self, kind: LayerKind) -> anyhow::Result<Self> {
self.node.add_layer(MainNodeInitStrategyLayer {
genesis: self.genesis_config.clone(),
contracts: self.contracts_config.clone(),
});
let mut layer = NodeStorageInitializerLayer::new();
if matches!(kind, LayerKind::Precondition) {
layer = layer.as_precondition();
}
self.node.add_layer(layer);
Ok(self)
}

/// Builds the node with the genesis initialization task only.
pub fn only_genesis(mut self) -> anyhow::Result<ZkStackService> {
self = self
.add_pools_layer()?
.add_query_eth_client_layer()?
.add_storage_initialization_layer(LayerKind::Task)?;

Ok(self.node.build()?)
}

/// Builds the node with the specified components.
pub fn build(mut self, mut components: Vec<Component>) -> anyhow::Result<ZkStackService> {
// Add "base" layers (resources and helper tasks).
self = self
@@ -542,8 +580,12 @@ impl MainNodeBuilder {
.add_healthcheck_layer()?
.add_prometheus_exporter_layer()?
.add_query_eth_client_layer()?
.add_sequencer_l1_gas_layer()?
.add_l1_batch_commitment_mode_validation_layer()?;
.add_sequencer_l1_gas_layer()?;

// Add preconditions for all the components.
self = self
.add_l1_batch_commitment_mode_validation_layer()?
.add_storage_initialization_layer(LayerKind::Precondition)?;

// Sort the components, so that the components they may depend on each other are added in the correct order.
components.sort_unstable_by_key(|component| match component {
@@ -557,6 +599,13 @@ impl MainNodeBuilder {
// Note that the layers are added only once, so it's fine to add the same layer multiple times.
for component in &components {
match component {
Component::StateKeeper => {
// State keeper is the core component of the sequencer,
// which is why we consider it to be responsible for the storage initialization.
self = self
.add_storage_initialization_layer(LayerKind::Task)?
.add_state_keeper_layer()?;
}
Component::HttpApi => {
self = self
.add_tx_sender_layer()?
@@ -596,9 +645,6 @@ impl MainNodeBuilder {
Component::EthTxManager => {
self = self.add_eth_tx_manager_layer()?;
}
Component::StateKeeper => {
self = self.add_state_keeper_layer()?;
}
Component::TeeVerifierInputProducer => {
self = self.add_tee_verifier_input_producer_layer()?;
}
@@ -633,3 +679,10 @@ impl MainNodeBuilder {
Ok(self.node.build()?)
}
}

/// Marker for layers that can add either a task or a precondition.
#[derive(Debug)]
enum LayerKind {
Task,
Precondition,
}
52 changes: 45 additions & 7 deletions core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use std::{
use anyhow::Context as _;
use async_trait::async_trait;
use serde::Serialize;
use tokio::sync::Semaphore;
use tokio::sync::{watch, Semaphore};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError, SqlxError};
use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
use zksync_object_store::{ObjectStore, ObjectStoreError};
@@ -76,6 +76,8 @@ enum SnapshotsApplierError {
Fatal(#[from] anyhow::Error),
#[error(transparent)]
Retryable(anyhow::Error),
#[error("Snapshot recovery has been canceled")]
Canceled,
}

impl SnapshotsApplierError {
@@ -245,6 +247,8 @@ impl SnapshotsApplierConfig {
pub struct SnapshotApplierTaskStats {
/// Did the task do any work?
pub done_work: bool,
/// Was the task canceled?
pub canceled: bool,
}

#[derive(Debug)]
@@ -339,13 +343,23 @@ impl SnapshotsApplierTask {
/// or under any of the following conditions:
///
/// - There are no snapshots on the main node
pub async fn run(self) -> anyhow::Result<SnapshotApplierTaskStats> {
pub async fn run(
self,
mut stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<SnapshotApplierTaskStats> {
tracing::info!("Starting snapshot recovery with config: {:?}", self.config);

let mut backoff = self.config.initial_retry_backoff;
let mut last_error = None;
for retry_id in 0..self.config.retry_count {
let result = SnapshotsApplier::load_snapshot(&self).await;
if *stop_receiver.borrow() {
return Ok(SnapshotApplierTaskStats {
done_work: false, // Not really relevant, since the node will be shut down.
canceled: true,
});
}

let result = SnapshotsApplier::load_snapshot(&self, &mut stop_receiver).await;

match result {
Ok((strategy, final_status)) => {
@@ -357,6 +371,7 @@ impl SnapshotsApplierTask {
self.health_updater.freeze();
return Ok(SnapshotApplierTaskStats {
done_work: !matches!(strategy, SnapshotRecoveryStrategy::Completed),
canceled: false,
});
}
Err(SnapshotsApplierError::Fatal(err)) => {
@@ -370,9 +385,19 @@ impl SnapshotsApplierTask {
"Recovering from error; attempt {retry_id} / {}, retrying in {backoff:?}",
self.config.retry_count
);
tokio::time::sleep(backoff).await;
tokio::time::timeout(backoff, stop_receiver.changed())
.await
.ok();
// Stop receiver will be checked on the next iteration.
backoff = backoff.mul_f32(self.config.retry_backoff_multiplier);
}
Err(SnapshotsApplierError::Canceled) => {
tracing::info!("Snapshot recovery has been canceled");
return Ok(SnapshotApplierTaskStats {
done_work: false,
canceled: true,
});
}
}
}

@@ -637,6 +662,7 @@ impl<'a> SnapshotsApplier<'a> {
/// Returns final snapshot recovery status.
async fn load_snapshot(
task: &'a SnapshotsApplierTask,
stop_receiver: &mut watch::Receiver<bool>,
) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> {
let health_updater = &task.health_updater;
let connection_pool = &task.connection_pool;
@@ -717,7 +743,7 @@ impl<'a> SnapshotsApplier<'a> {
this.factory_deps_recovered = true;
this.update_health();

this.recover_storage_logs().await?;
this.recover_storage_logs(stop_receiver).await?;
for is_chunk_processed in &mut this.applied_snapshot_status.storage_logs_chunks_processed {
*is_chunk_processed = true;
}
@@ -900,7 +926,10 @@ impl<'a> SnapshotsApplier<'a> {
Ok(())
}

async fn recover_storage_logs(&self) -> Result<(), SnapshotsApplierError> {
async fn recover_storage_logs(
&self,
stop_receiver: &mut watch::Receiver<bool>,
) -> Result<(), SnapshotsApplierError> {
let effective_concurrency =
(self.connection_pool.max_size() as usize).min(self.max_concurrency);
tracing::info!(
@@ -917,7 +946,16 @@ impl<'a> SnapshotsApplier<'a> {
.map(|(chunk_id, _)| {
self.recover_storage_logs_single_chunk(&semaphore, chunk_id as u64)
});
futures::future::try_join_all(tasks).await?;
let job_completion = futures::future::try_join_all(tasks);

tokio::select! {
res = job_completion => {
res?;
},
_ = stop_receiver.changed() => {
return Err(SnapshotsApplierError::Canceled);
}
}

let mut storage = self
.connection_pool
90 changes: 72 additions & 18 deletions core/lib/snapshots_applier/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -84,7 +84,8 @@ async fn snapshots_creator_can_successfully_recover_db(
object_store.clone(),
);
let task_health = task.health_check();
let stats = task.run().await.unwrap();
let (_stop_sender, stop_receiver) = watch::channel(false);
let stats = task.run(stop_receiver).await.unwrap();
assert!(stats.done_work);
assert_matches!(
task_health.check_health().await.status(),
@@ -138,7 +139,9 @@ async fn snapshots_creator_can_successfully_recover_db(
Box::new(client.clone()),
object_store.clone(),
);
task.run().await.unwrap();

let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap();
// Here, stats would unfortunately have `done_work: true` because work detection isn't smart enough.

// Emulate a node processing data after recovery.
@@ -161,7 +164,8 @@ async fn snapshots_creator_can_successfully_recover_db(
Box::new(client),
object_store,
);
let stats = task.run().await.unwrap();
let (_stop_sender, stop_receiver) = watch::channel(false);
let stats = task.run(stop_receiver).await.unwrap();
assert!(!stats.done_work);
}

@@ -182,7 +186,8 @@ async fn applier_recovers_v0_snapshot(drop_storage_key_preimages: bool) {
if drop_storage_key_preimages {
task.drop_storage_key_preimages();
}
let stats = task.run().await.unwrap();
let (_stop_sender, stop_receiver) = watch::channel(false);
let stats = task.run(stop_receiver).await.unwrap();
assert!(stats.done_work);

let mut storage = pool.connection().await.unwrap();
@@ -226,7 +231,8 @@ async fn applier_recovers_explicitly_specified_snapshot() {
object_store,
);
task.set_snapshot_l1_batch(expected_status.l1_batch_number);
let stats = task.run().await.unwrap();
let (_stop_sender, stop_receiver) = watch::channel(false);
let stats = task.run(stop_receiver).await.unwrap();
assert!(stats.done_work);

let mut storage = pool.connection().await.unwrap();
@@ -252,7 +258,8 @@ async fn applier_error_for_missing_explicitly_specified_snapshot() {
);
task.set_snapshot_l1_batch(expected_status.l1_batch_number + 1);

let err = task.run().await.unwrap_err();
let (_stop_sender, stop_receiver) = watch::channel(false);
let err = task.run(stop_receiver).await.unwrap_err();
assert!(
format!("{err:#}").contains("not present on main node"),
"{err:#}"
@@ -277,7 +284,8 @@ async fn snapshot_applier_recovers_after_stopping() {
Box::new(client.clone()),
Arc::new(stopping_object_store),
);
let task_handle = tokio::spawn(task.run());
let (_stop_sender, task_stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(task_stop_receiver));

// Wait until the first storage logs chunk is requested (the object store hangs up at this point)
stop_receiver.wait_for(|&count| count > 1).await.unwrap();
@@ -313,7 +321,8 @@ async fn snapshot_applier_recovers_after_stopping() {
Box::new(client.clone()),
Arc::new(stopping_object_store),
);
let task_handle = tokio::spawn(task.run());
let (_stop_sender, task_stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(task_stop_receiver));

stop_receiver.wait_for(|&count| count > 3).await.unwrap();
assert!(!task_handle.is_finished());
@@ -340,7 +349,8 @@ async fn snapshot_applier_recovers_after_stopping() {
Arc::new(stopping_object_store),
);
task.set_snapshot_l1_batch(expected_status.l1_batch_number); // check that this works fine
task.run().await.unwrap();
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap();

assert_eq!(
is_recovery_completed(&pool, &client).await,
@@ -411,7 +421,8 @@ async fn health_status_immediately_after_task_start() {
object_store,
);
let task_health = task.health_check();
let task_handle = tokio::spawn(task.run());
let (_stop_sender, task_stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(task_stop_receiver));

client.0.wait().await; // Wait for the first L2 client call (at which point, the task is certainly initialized)
assert_matches!(
@@ -465,7 +476,8 @@ async fn applier_errors_after_genesis() {
Box::new(client),
object_store,
);
task.run().await.unwrap_err();
let (_stop_sender, task_stop_receiver) = watch::channel(false);
task.run(task_stop_receiver).await.unwrap_err();
}

#[tokio::test]
@@ -480,7 +492,8 @@ async fn applier_errors_without_snapshots() {
Box::new(client),
object_store,
);
task.run().await.unwrap_err();
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap_err();
}

#[tokio::test]
@@ -499,7 +512,8 @@ async fn applier_errors_with_unrecognized_snapshot_version() {
Box::new(client),
object_store,
);
task.run().await.unwrap_err();
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap_err();
}

#[tokio::test]
@@ -518,7 +532,8 @@ async fn applier_returns_error_on_fatal_object_store_error() {
Box::new(client),
Arc::new(object_store),
);
let err = task.run().await.unwrap_err();
let (_stop_sender, stop_receiver) = watch::channel(false);
let err = task.run(stop_receiver).await.unwrap_err();
assert!(err.chain().any(|cause| {
matches!(
cause.downcast_ref::<ObjectStoreError>(),
@@ -546,7 +561,8 @@ async fn applier_returns_error_after_too_many_object_store_retries() {
Box::new(client),
Arc::new(object_store),
);
let err = task.run().await.unwrap_err();
let (_stop_sender, stop_receiver) = watch::channel(false);
let err = task.run(stop_receiver).await.unwrap_err();
assert!(err.chain().any(|cause| {
matches!(
cause.downcast_ref::<ObjectStoreError>(),
@@ -585,7 +601,8 @@ async fn recovering_tokens() {
Box::new(client.clone()),
object_store.clone(),
);
let task_result = task.run().await;
let (_stop_sender, stop_receiver) = watch::channel(false);
let task_result = task.run(stop_receiver).await;
assert!(task_result.is_err());

assert_eq!(
@@ -601,7 +618,8 @@ async fn recovering_tokens() {
Box::new(client.clone()),
object_store.clone(),
);
task.run().await.unwrap();
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap();

assert_eq!(
is_recovery_completed(&pool, &client).await,
@@ -635,5 +653,41 @@ async fn recovering_tokens() {
Box::new(client),
object_store,
);
task.run().await.unwrap();
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap();
}

#[tokio::test]
async fn snapshot_applier_can_be_canceled() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut expected_status = mock_recovery_status();
expected_status.storage_logs_chunks_processed = vec![true; 10];
let storage_logs = random_storage_logs::<H256>(expected_status.l1_batch_number, 200);
let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await;
let (stopping_object_store, mut stop_receiver) =
HangingObjectStore::new(object_store.clone(), 1);

let mut config = SnapshotsApplierConfig::for_tests();
config.max_concurrency = NonZeroUsize::new(1).unwrap();
let task = SnapshotsApplierTask::new(
config.clone(),
pool.clone(),
Box::new(client.clone()),
Arc::new(stopping_object_store),
);
let (task_stop_sender, task_stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(task_stop_receiver));

// Wait until the first storage logs chunk is requested (the object store hangs up at this point)
stop_receiver.wait_for(|&count| count > 1).await.unwrap();
assert!(!task_handle.is_finished());

task_stop_sender.send(true).unwrap();
let result = tokio::time::timeout(Duration::from_secs(5), task_handle)
.await
.expect("Task wasn't canceled")
.unwrap()
.expect("Task erred");
assert!(result.canceled);
assert!(!result.done_work);
}
32 changes: 0 additions & 32 deletions core/lib/zksync_core_leftovers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -2,42 +2,10 @@

use std::str::FromStr;

use anyhow::Context as _;
use tokio::sync::oneshot;
use zksync_config::{configs::DatabaseSecrets, GenesisConfig};
use zksync_dal::{ConnectionPool, Core, CoreDal as _};
use zksync_node_genesis::{ensure_genesis_state, GenesisParams};

pub mod temp_config_store;

/// Inserts the initial information about ZKsync tokens into the database.
pub async fn genesis_init(
genesis_config: GenesisConfig,
database_secrets: &DatabaseSecrets,
) -> anyhow::Result<()> {
let db_url = database_secrets.master_url()?;
let pool = ConnectionPool::<Core>::singleton(db_url)
.build()
.await
.context("failed to build connection_pool")?;
let mut storage = pool.connection().await.context("connection()")?;

let params = GenesisParams::load_genesis_params(genesis_config)?;
ensure_genesis_state(&mut storage, &params).await?;

Ok(())
}

pub async fn is_genesis_needed(database_secrets: &DatabaseSecrets) -> bool {
let db_url = database_secrets.master_url().unwrap();
let pool = ConnectionPool::<Core>::singleton(db_url)
.build()
.await
.expect("failed to build connection_pool");
let mut storage = pool.connection().await.expect("connection()");
storage.blocks_dal().is_genesis_needed().await.unwrap()
}

/// Sets up an interrupt handler and returns a future that resolves once an interrupt signal
/// is received.
pub fn setup_sigint_handler() -> oneshot::Receiver<()> {
14 changes: 7 additions & 7 deletions core/node/genesis/src/lib.rs
Original file line number Diff line number Diff line change
@@ -5,9 +5,9 @@
use std::fmt::Formatter;

use anyhow::Context as _;
use zksync_config::{configs::DatabaseSecrets, GenesisConfig};
use zksync_config::GenesisConfig;
use zksync_contracts::{BaseSystemContracts, BaseSystemContractsHashes, SET_CHAIN_ID_EVENT};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError};
use zksync_dal::{Connection, Core, CoreDal, DalError};
use zksync_eth_client::EthInterface;
use zksync_merkle_tree::{domain::ZkSyncTree, TreeInstruction};
use zksync_multivm::utils::get_max_gas_per_pubdata_byte;
@@ -270,6 +270,10 @@ pub async fn insert_genesis_batch(
})
}

pub async fn is_genesis_needed(storage: &mut Connection<'_, Core>) -> Result<bool, GenesisError> {
Ok(storage.blocks_dal().is_genesis_needed().await?)
}

pub async fn ensure_genesis_state(
storage: &mut Connection<'_, Core>,
genesis_params: &GenesisParams,
@@ -411,15 +415,11 @@ pub async fn create_genesis_l1_batch(
// Save chain id transaction into the database
// We keep returning anyhow and will refactor it later
pub async fn save_set_chain_id_tx(
storage: &mut Connection<'_, Core>,
query_client: &dyn EthInterface,
diamond_proxy_address: Address,
state_transition_manager_address: Address,
database_secrets: &DatabaseSecrets,
) -> anyhow::Result<()> {
let db_url = database_secrets.master_url()?;
let pool = ConnectionPool::<Core>::singleton(db_url).build().await?;
let mut storage = pool.connection().await?;

let to = query_client.block_number().await?.as_u64();
let from = to.saturating_sub(PRIORITY_EXPIRATION);
let filter = FilterBuilder::default()
1 change: 1 addition & 0 deletions core/node/node_framework/Cargo.toml
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ zksync_reorg_detector.workspace = true
zksync_vm_runner.workspace = true
zksync_node_db_pruner.workspace = true
zksync_base_token_adjuster.workspace = true
zksync_node_storage_init.workspace = true

pin-project-lite.workspace = true
tracing.workspace = true
2 changes: 2 additions & 0 deletions core/node/node_framework/src/implementations/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ pub mod l1_gas;
pub mod main_node_client;
pub mod main_node_fee_params_fetcher;
pub mod metadata_calculator;
pub mod node_storage_init;
pub mod object_store;
pub mod pk_signing_eth_client;
pub mod pools_layer;
@@ -24,6 +25,7 @@ pub mod prometheus_exporter;
pub mod proof_data_handler;
pub mod pruning;
pub mod query_eth_client;
pub mod reorg_detector;
pub mod sigint;
pub mod state_keeper;
pub mod sync_state_updater;
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::sync::Arc;

// Re-export to initialize the layer without having to depend on the crate directly.
pub use zksync_node_storage_init::SnapshotRecoveryConfig;
use zksync_node_storage_init::{
external_node::{ExternalNodeGenesis, ExternalNodeReverter, ExternalNodeSnapshotRecovery},
InitializeStorage, NodeInitializationStrategy, RevertStorage,
};
use zksync_types::L2ChainId;

use super::NodeInitializationStrategyResource;
use crate::{
implementations::resources::{
healthcheck::AppHealthCheckResource,
main_node_client::MainNodeClientResource,
pools::{MasterPool, PoolResource},
reverter::BlockReverterResource,
},
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};

/// Wiring layer for external node initialization strategy.
#[derive(Debug)]
pub struct ExternalNodeInitStrategyLayer {
pub l2_chain_id: L2ChainId,
pub snapshot_recovery_config: Option<SnapshotRecoveryConfig>,
}

#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
pub main_node_client: MainNodeClientResource,
pub block_reverter: Option<BlockReverterResource>,
#[context(default)]
pub app_health: AppHealthCheckResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
pub strategy: NodeInitializationStrategyResource,
}

#[async_trait::async_trait]
impl WiringLayer for ExternalNodeInitStrategyLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
"external_node_role_layer"
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let pool = input.master_pool.get().await?;
let MainNodeClientResource(client) = input.main_node_client;
let AppHealthCheckResource(app_health) = input.app_health;
let block_reverter = match input.block_reverter {
Some(reverter) => {
// If reverter was provided, we intend to be its sole consumer.
// We don't want multiple components to attempt reverting blocks.
let reverter = reverter.0.take().ok_or(WiringError::Configuration(
"BlockReverterResource is taken".into(),
))?;
Some(reverter)
}
None => None,
};

let genesis = Arc::new(ExternalNodeGenesis {
l2_chain_id: self.l2_chain_id,
client: client.clone(),
pool: pool.clone(),
});
let snapshot_recovery = self.snapshot_recovery_config.map(|recovery_config| {
Arc::new(ExternalNodeSnapshotRecovery {
client: client.clone(),
pool: pool.clone(),
recovery_config,
app_health,
}) as Arc<dyn InitializeStorage>
});
let block_reverter = block_reverter.map(|block_reverter| {
Arc::new(ExternalNodeReverter {
client,
pool: pool.clone(),
reverter: block_reverter,
}) as Arc<dyn RevertStorage>
});
let strategy = NodeInitializationStrategy {
genesis,
snapshot_recovery,
block_reverter,
};

Ok(Output {
strategy: strategy.into(),
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::sync::Arc;

use zksync_config::{ContractsConfig, GenesisConfig};
use zksync_node_storage_init::{main_node::MainNodeGenesis, NodeInitializationStrategy};

use super::NodeInitializationStrategyResource;
use crate::{
implementations::resources::{
eth_interface::EthInterfaceResource,
pools::{MasterPool, PoolResource},
},
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};

/// Wiring layer for main node initialization strategy.
#[derive(Debug)]
pub struct MainNodeInitStrategyLayer {
pub genesis: GenesisConfig,
pub contracts: ContractsConfig,
}

#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
pub eth_interface: EthInterfaceResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
pub strategy: NodeInitializationStrategyResource,
}

#[async_trait::async_trait]
impl WiringLayer for MainNodeInitStrategyLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
"main_node_role_layer"
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let pool = input.master_pool.get().await?;
let EthInterfaceResource(l1_client) = input.eth_interface;
let genesis = Arc::new(MainNodeGenesis {
contracts: self.contracts,
genesis: self.genesis,
l1_client,
pool,
});
let strategy = NodeInitializationStrategy {
genesis,
snapshot_recovery: None,
block_reverter: None,
};

Ok(Output {
strategy: strategy.into(),
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use zksync_node_storage_init::{NodeInitializationStrategy, NodeStorageInitializer};

use crate::{
implementations::resources::pools::{MasterPool, PoolResource},
resource::Resource,
service::StopReceiver,
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};

pub mod external_node_strategy;
pub mod main_node_strategy;

/// Wiring layer for `NodeStorageInializer`.
///
/// ## Requests resources
///
/// - `PoolResource<MasterPool>`
/// - `NodeInitializationStrategyResource`
///
/// ## Adds tasks
///
/// Depends on the mode, either `NodeStorageInitializer` or `NodeStorageInitializerPrecondition`
#[derive(Debug, Default)]
pub struct NodeStorageInitializerLayer {
as_precondition: bool,
}

impl NodeStorageInitializerLayer {
pub fn new() -> Self {
Self::default()
}

/// Changes the wiring logic to treat the initializer as a precondition.
pub fn as_precondition(mut self) -> Self {
self.as_precondition = true;
self
}
}

#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
pub strategy: NodeInitializationStrategyResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
#[context(task)]
pub initializer: Option<NodeStorageInitializer>,
#[context(task)]
pub precondition: Option<NodeStorageInitializerPrecondition>,
}

impl Output {
fn initializer(initializer: NodeStorageInitializer) -> Self {
Self {
initializer: Some(initializer),
precondition: None,
}
}

fn precondition(precondition: NodeStorageInitializer) -> Self {
Self {
initializer: None,
precondition: Some(NodeStorageInitializerPrecondition(precondition)),
}
}
}

#[async_trait::async_trait]
impl WiringLayer for NodeStorageInitializerLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
if self.as_precondition {
return "node_storage_initializer_precondition_layer";
}
"node_storage_initializer_layer"
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let pool = input.master_pool.get().await?;
let NodeInitializationStrategyResource(strategy) = input.strategy;

let initializer = NodeStorageInitializer::new(strategy, pool);

// Insert either task or precondition.
let output = if self.as_precondition {
Output::precondition(initializer)
} else {
Output::initializer(initializer)
};

Ok(output)
}
}

#[async_trait::async_trait]
impl Task for NodeStorageInitializer {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedOneshotTask
}

fn id(&self) -> TaskId {
"node_storage_initializer".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
tracing::info!("Starting the node storage initialization task");
(*self).run(stop_receiver.0).await?;
tracing::info!("Node storage initialization task completed");
Ok(())
}
}

/// Runs [`NodeStorageInitializer`] as a precondition, blocking
/// tasks from starting until the storage is initialized.
#[derive(Debug)]
pub struct NodeStorageInitializerPrecondition(NodeStorageInitializer);

#[async_trait::async_trait]
impl Task for NodeStorageInitializerPrecondition {
fn kind(&self) -> TaskKind {
TaskKind::Precondition
}

fn id(&self) -> TaskId {
"node_storage_initializer_precondition".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
tracing::info!("Waiting for node storage to be initialized");
let result = self.0.wait_for_initialized_storage(stop_receiver.0).await;
tracing::info!("Node storage initialization precondition completed");
result
}
}

// Note: unlike with other modules, this one keeps within the same file to simplify
// moving the implementations out of the framework soon.
/// Resource representing the node initialization strategy.
#[derive(Debug, Clone)]
pub struct NodeInitializationStrategyResource(NodeInitializationStrategy);

impl Resource for NodeInitializationStrategyResource {
fn name() -> String {
"node_initialization_strategy".into()
}
}

impl From<NodeInitializationStrategy> for NodeInitializationStrategyResource {
fn from(strategy: NodeInitializationStrategy) -> Self {
Self(strategy)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use zksync_reorg_detector::{self, ReorgDetector};

use crate::{
implementations::resources::{
healthcheck::AppHealthCheckResource,
main_node_client::MainNodeClientResource,
pools::{MasterPool, PoolResource},
},
service::StopReceiver,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};

/// Wiring layer for [`ReorgDetector`] checker.
/// This layer is responsible for detecting reorgs and shutting down the node if one is detected.
///
/// This layer assumes that the node starts with the initialized state.
#[derive(Debug)]
pub struct ReorgDetectorLayer;

#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub main_node_client: MainNodeClientResource,
pub master_pool: PoolResource<MasterPool>,
#[context(default)]
pub app_health: AppHealthCheckResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
#[context(task)]
pub reorg_detector: ReorgDetector,
}

#[async_trait::async_trait]
impl WiringLayer for ReorgDetectorLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
"reorg_detector_layer"
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let MainNodeClientResource(main_node_client) = input.main_node_client;
let pool = input.master_pool.get().await?;

let reorg_detector = ReorgDetector::new(main_node_client, pool);

let AppHealthCheckResource(app_health) = input.app_health;
app_health
.insert_component(reorg_detector.health_check().clone())
.map_err(WiringError::internal)?;

Ok(Output { reorg_detector })
}
}

#[async_trait::async_trait]
impl Task for ReorgDetector {
fn id(&self) -> TaskId {
"reorg_detector".into()
}

async fn run(mut self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
(*self).run(stop_receiver.0).await?;
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::Arc;

use zksync_block_reverter::BlockReverter;

use crate::resource::Resource;
use crate::resource::{Resource, Unique};

/// A resource that provides [`BlockReverter`] to the service.
#[derive(Debug, Clone)]
pub struct BlockReverterResource(pub Arc<BlockReverter>);
pub struct BlockReverterResource(pub Unique<BlockReverter>);

impl Resource for BlockReverterResource {
fn name() -> String {
29 changes: 29 additions & 0 deletions core/node/node_storage_init/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "zksync_node_storage_init"
version = "0.1.0"
edition.workspace = true
authors.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true
keywords.workspace = true
categories.workspace = true

[dependencies]
zksync_config.workspace = true
zksync_dal.workspace = true
zksync_health_check.workspace = true
zksync_node_sync.workspace = true
zksync_node_genesis.workspace = true
zksync_object_store.workspace = true
zksync_shared_metrics.workspace = true
zksync_snapshots_applier.workspace = true
zksync_types.workspace = true
zksync_web3_decl.workspace = true
zksync_reorg_detector.workspace = true
zksync_block_reverter.workspace = true

anyhow.workspace = true
async-trait.workspace = true
tokio.workspace = true
tracing.workspace = true
5 changes: 5 additions & 0 deletions core/node/node_storage_init/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# `zksync_node_storage_init`

A set of actions to ensure that any ZKsync node has initialized storage and can start running.

This includes genesis, but not limited to it, and may involve other steps.
39 changes: 39 additions & 0 deletions core/node/node_storage_init/src/external_node/genesis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use anyhow::Context as _;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core};
use zksync_types::L2ChainId;
use zksync_web3_decl::client::{DynClient, L2};

use crate::InitializeStorage;

#[derive(Debug)]
pub struct ExternalNodeGenesis {
pub l2_chain_id: L2ChainId,
pub client: Box<DynClient<L2>>,
pub pool: ConnectionPool<Core>,
}

#[async_trait::async_trait]
impl InitializeStorage for ExternalNodeGenesis {
/// Will perform genesis initialization if it's required.
/// If genesis is already performed, this method will do nothing.
async fn initialize_storage(
&self,
_stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let mut storage = self.pool.connection_tagged("en").await?;
zksync_node_sync::genesis::perform_genesis_if_needed(
&mut storage,
self.l2_chain_id,
&self.client.clone().for_component("genesis"),
)
.await
.context("performing genesis failed")
}

async fn is_initialized(&self) -> anyhow::Result<bool> {
let mut storage = self.pool.connection_tagged("en").await?;
let needed = zksync_node_sync::genesis::is_genesis_needed(&mut storage).await?;
Ok(!needed)
}
}
8 changes: 8 additions & 0 deletions core/node/node_storage_init/src/external_node/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub use self::{
genesis::ExternalNodeGenesis, revert::ExternalNodeReverter,
snapshot_recovery::ExternalNodeSnapshotRecovery,
};

mod genesis;
mod revert;
mod snapshot_recovery;
50 changes: 50 additions & 0 deletions core/node/node_storage_init/src/external_node/revert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use anyhow::Context as _;
use tokio::sync::watch;
use zksync_block_reverter::BlockReverter;
use zksync_dal::{ConnectionPool, Core};
use zksync_reorg_detector::ReorgDetector;
use zksync_types::L1BatchNumber;
use zksync_web3_decl::client::{DynClient, L2};

use crate::RevertStorage;

#[derive(Debug)]
pub struct ExternalNodeReverter {
pub client: Box<DynClient<L2>>,
pub pool: ConnectionPool<Core>,
pub reverter: BlockReverter,
}

#[async_trait::async_trait]
impl RevertStorage for ExternalNodeReverter {
async fn revert_storage(
&self,
to_batch: L1BatchNumber,
_stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
tracing::info!("Reverting to l1 batch number {to_batch}");
self.reverter.roll_back(to_batch).await?;
tracing::info!("Revert successfully completed");
Ok(())
}

async fn last_correct_batch_for_reorg(
&self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<Option<L1BatchNumber>> {
let mut reorg_detector = ReorgDetector::new(self.client.clone(), self.pool.clone());
let batch = match reorg_detector.run_once(stop_receiver).await {
Ok(()) => {
// Even if stop signal was received, the node will shut down without launching any tasks.
tracing::info!("No rollback was detected");
None
}
Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => {
tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}");
Some(last_correct_l1_batch)
}
Err(err) => return Err(err).context("reorg_detector.check_consistency()"),
};
Ok(batch)
}
}
82 changes: 82 additions & 0 deletions core/node/node_storage_init/src/external_node/snapshot_recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::{sync::Arc, time::Instant};

use anyhow::Context as _;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core};
use zksync_health_check::AppHealthCheck;
use zksync_object_store::ObjectStoreFactory;
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_snapshots_applier::{
RecoveryCompletionStatus, SnapshotsApplierConfig, SnapshotsApplierTask,
};
use zksync_web3_decl::client::{DynClient, L2};

use crate::{InitializeStorage, SnapshotRecoveryConfig};

#[derive(Debug)]
pub struct ExternalNodeSnapshotRecovery {
pub client: Box<DynClient<L2>>,
pub pool: ConnectionPool<Core>,
pub recovery_config: SnapshotRecoveryConfig,
pub app_health: Arc<AppHealthCheck>,
}

#[async_trait::async_trait]
impl InitializeStorage for ExternalNodeSnapshotRecovery {
async fn initialize_storage(&self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let pool = self.pool.clone();
tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
let object_store_config =
self.recovery_config.object_store_config.clone().context(
"Snapshot object store must be presented if snapshot recovery is activated",
)?;
let object_store = ObjectStoreFactory::new(object_store_config)
.create_store()
.await?;

let config = SnapshotsApplierConfig::default();
let mut snapshots_applier_task = SnapshotsApplierTask::new(
config,
pool,
Box::new(self.client.clone().for_component("snapshot_recovery")),
object_store,
);
if let Some(snapshot_l1_batch) = self.recovery_config.snapshot_l1_batch_override {
tracing::info!(
"Using a specific snapshot with L1 batch #{snapshot_l1_batch}; this may not work \
if the snapshot is too old (order of several weeks old) or non-existent"
);
snapshots_applier_task.set_snapshot_l1_batch(snapshot_l1_batch);
}
if self.recovery_config.drop_storage_key_preimages {
tracing::info!("Dropping storage key preimages for snapshot storage logs");
snapshots_applier_task.drop_storage_key_preimages();
}
self.app_health
.insert_component(snapshots_applier_task.health_check())?;

let recovery_started_at = Instant::now();
let stats = snapshots_applier_task
.run(stop_receiver)
.await
.context("snapshot recovery failed")?;
if stats.done_work {
let latency = recovery_started_at.elapsed();
APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::Postgres].set(latency);
tracing::info!("Recovered Postgres from snapshot in {latency:?}");
}
// We don't really care if the task was canceled.
// If it was, all the other tasks are canceled as well.

Ok(())
}

async fn is_initialized(&self) -> anyhow::Result<bool> {
let mut storage = self.pool.connection_tagged("en").await?;
let completed = matches!(
SnapshotsApplierTask::is_recovery_completed(&mut storage, &self.client).await?,
RecoveryCompletionStatus::Completed
);
Ok(completed)
}
}
213 changes: 213 additions & 0 deletions core/node/node_storage_init/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
use std::{future::Future, sync::Arc, time::Duration};

use tokio::sync::watch;
use zksync_config::ObjectStoreConfig;
use zksync_dal::{ConnectionPool, Core, CoreDal as _};
use zksync_types::L1BatchNumber;

pub use crate::traits::{InitializeStorage, RevertStorage};

pub mod external_node;
pub mod main_node;
mod traits;

#[derive(Debug)]
pub struct SnapshotRecoveryConfig {
/// If not specified, the latest snapshot will be used.
pub snapshot_l1_batch_override: Option<L1BatchNumber>,
pub drop_storage_key_preimages: bool,
pub object_store_config: Option<ObjectStoreConfig>,
}

#[derive(Debug, Clone, Copy)]
enum InitDecision {
/// Perform or check genesis.
Genesis,
/// Perform or check snapshot recovery.
SnapshotRecovery,
}

#[derive(Debug, Clone)]
pub struct NodeInitializationStrategy {
pub genesis: Arc<dyn InitializeStorage>,
pub snapshot_recovery: Option<Arc<dyn InitializeStorage>>,
pub block_reverter: Option<Arc<dyn RevertStorage>>,
}

/// Node storage initializer.
/// This structure is responsible for making sure that the node storage is initialized.
///
/// This structure operates together with [`NodeRole`] to achieve that:
/// `NodeStorageInitializer` understands what does initialized storage mean, but it defers
/// any actual initialization to the `NodeRole` implementation. This allows to have different
/// initialization strategies for different node types, while keeping common invariants
/// for the whole system.
#[derive(Debug)]
pub struct NodeStorageInitializer {
strategy: NodeInitializationStrategy,
pool: ConnectionPool<Core>,
}

impl NodeStorageInitializer {
pub fn new(strategy: NodeInitializationStrategy, pool: ConnectionPool<Core>) -> Self {
Self { strategy, pool }
}

/// Returns the preferred kind of storage initialization.
/// The decision is based on the current state of the storage.
/// Note that the decision does not guarantee that the initialization has not been performed
/// already, so any returned decision should be checked before performing the initialization.
async fn decision(&self) -> anyhow::Result<InitDecision> {
let mut storage = self.pool.connection_tagged("node_init").await?;
let genesis_l1_batch = storage
.blocks_dal()
.get_l1_batch_header(L1BatchNumber(0))
.await?;
let snapshot_recovery = storage
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await?;
drop(storage);

let decision = match (genesis_l1_batch, snapshot_recovery) {
(Some(batch), Some(snapshot_recovery)) => {
anyhow::bail!(
"Node has both genesis L1 batch: {batch:?} and snapshot recovery information: {snapshot_recovery:?}. \
This is not supported and can be caused by broken snapshot recovery."
);
}
(Some(batch), None) => {
tracing::info!(
"Node has a genesis L1 batch: {batch:?} and no snapshot recovery info"
);
InitDecision::Genesis
}
(None, Some(snapshot_recovery)) => {
tracing::info!("Node has no genesis L1 batch and snapshot recovery information: {snapshot_recovery:?}");
InitDecision::SnapshotRecovery
}
(None, None) => {
tracing::info!("Node has neither genesis L1 batch, nor snapshot recovery info");
if self.strategy.snapshot_recovery.is_some() {
InitDecision::SnapshotRecovery
} else {
InitDecision::Genesis
}
}
};
Ok(decision)
}

/// Initializes the storage for the node.
/// After the initialization, the node can safely start operating.
pub async fn run(self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let decision = self.decision().await?;

// Make sure that we have state to work with.
match decision {
InitDecision::Genesis => {
tracing::info!("Performing genesis initialization");
self.strategy
.genesis
.initialize_storage(stop_receiver.clone())
.await?;
}
InitDecision::SnapshotRecovery => {
tracing::info!("Performing snapshot recovery initialization");
if let Some(recovery) = &self.strategy.snapshot_recovery {
recovery.initialize_storage(stop_receiver.clone()).await?;
} else {
anyhow::bail!(
"Snapshot recovery should be performed, but the strategy is not provided"
);
}
}
}

// Now we may check whether we're in the invalid state and should perform a rollback.
if let Some(reverter) = &self.strategy.block_reverter {
if let Some(to_batch) = reverter
.last_correct_batch_for_reorg(stop_receiver.clone())
.await?
{
tracing::info!(l1_batch = %to_batch, "State must be rolled back to L1 batch");
tracing::info!("Performing the rollback");
reverter.revert_storage(to_batch, stop_receiver).await?;
}
}

Ok(())
}

/// Checks if the node can safely start operating.
pub async fn wait_for_initialized_storage(
&self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
const POLLING_INTERVAL: Duration = Duration::from_secs(1);

// Wait until data is added to the database.
poll(stop_receiver.clone(), POLLING_INTERVAL, || {
self.is_database_initialized()
})
.await?;
if *stop_receiver.borrow() {
return Ok(());
}

// Wait until the rollback is no longer needed.
poll(stop_receiver.clone(), POLLING_INTERVAL, || {
self.is_chain_tip_correct(stop_receiver.clone())
})
.await?;

Ok(())
}

async fn is_database_initialized(&self) -> anyhow::Result<bool> {
// We're fine if the database is initialized in any meaningful way we can check.
if self.strategy.genesis.is_initialized().await? {
return Ok(true);
}
if let Some(snapshot_recovery) = &self.strategy.snapshot_recovery {
return snapshot_recovery.is_initialized().await;
}
Ok(false)
}

/// Checks if the head of the chain has correct state, e.g. no rollback needed.
async fn is_chain_tip_correct(
&self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<bool> {
// May be `true` if stop signal is received, but the node will shut down without launching any tasks anyway.
let initialized = if let Some(reverter) = &self.strategy.block_reverter {
reverter
.last_correct_batch_for_reorg(stop_receiver)
.await?
.is_none()
} else {
true
};
Ok(initialized)
}
}

async fn poll<F, Fut>(
mut stop_receiver: watch::Receiver<bool>,
polling_interval: Duration,
mut check: F,
) -> anyhow::Result<()>
where
F: FnMut() -> Fut,
Fut: Future<Output = anyhow::Result<bool>>,
{
while !*stop_receiver.borrow() && !check().await? {
// Return value will be checked on the next iteration.
tokio::time::timeout(polling_interval, stop_receiver.changed())
.await
.ok();
}

Ok(())
}
54 changes: 54 additions & 0 deletions core/node/node_storage_init/src/main_node/genesis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use anyhow::Context as _;
use tokio::sync::watch;
use zksync_config::{ContractsConfig, GenesisConfig};
use zksync_dal::{ConnectionPool, Core, CoreDal as _};
use zksync_node_genesis::GenesisParams;
use zksync_web3_decl::client::{DynClient, L1};

use crate::traits::InitializeStorage;

#[derive(Debug)]
pub struct MainNodeGenesis {
pub genesis: GenesisConfig,
pub contracts: ContractsConfig,
pub l1_client: Box<DynClient<L1>>,
pub pool: ConnectionPool<Core>,
}

#[async_trait::async_trait]
impl InitializeStorage for MainNodeGenesis {
/// Will perform genesis initialization if it's required.
/// If genesis is already performed, this method will do nothing.
async fn initialize_storage(
&self,
_stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let mut storage = self.pool.connection_tagged("genesis").await?;

if !storage.blocks_dal().is_genesis_needed().await? {
return Ok(());
}

let params = GenesisParams::load_genesis_params(self.genesis.clone())?;
zksync_node_genesis::ensure_genesis_state(&mut storage, &params).await?;

if let Some(ecosystem_contracts) = &self.contracts.ecosystem_contracts {
zksync_node_genesis::save_set_chain_id_tx(
&mut storage,
&self.l1_client,
self.contracts.diamond_proxy_addr,
ecosystem_contracts.state_transition_proxy_addr,
)
.await
.context("Failed to save SetChainId upgrade transaction")?;
}

Ok(())
}

async fn is_initialized(&self) -> anyhow::Result<bool> {
let mut storage = self.pool.connection_tagged("genesis").await?;
let needed = zksync_node_genesis::is_genesis_needed(&mut storage).await?;
Ok(!needed)
}
}
3 changes: 3 additions & 0 deletions core/node/node_storage_init/src/main_node/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub use self::genesis::MainNodeGenesis;

mod genesis;
33 changes: 33 additions & 0 deletions core/node/node_storage_init/src/traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::fmt;

use tokio::sync::watch;
use zksync_types::L1BatchNumber;

/// An abstract storage initialization strategy.
#[async_trait::async_trait]
pub trait InitializeStorage: fmt::Debug + Send + Sync + 'static {
/// Checks if the storage is already initialized.
async fn is_initialized(&self) -> anyhow::Result<bool>;

/// Initializes the storage.
/// Implementors of this method may assume that they have unique access to the storage.
async fn initialize_storage(&self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()>;
}

/// An abstract storage revert strategy.
/// This trait assumes that for any invalid state there exists a batch number to which the storage can be rolled back.
#[async_trait::async_trait]
pub trait RevertStorage: fmt::Debug + Send + Sync + 'static {
/// Checks if the storage is invalid state and has to be rolled back.
async fn last_correct_batch_for_reorg(
&self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<Option<L1BatchNumber>>;

/// Reverts the storage to the provided batch number.
async fn revert_storage(
&self,
to_batch: L1BatchNumber,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()>;
}
4 changes: 4 additions & 0 deletions core/node/node_sync/src/genesis.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,10 @@ use zksync_types::{

use super::client::MainNodeClient;

pub async fn is_genesis_needed(storage: &mut Connection<'_, Core>) -> anyhow::Result<bool> {
Ok(storage.blocks_dal().is_genesis_needed().await?)
}

pub async fn perform_genesis_if_needed(
storage: &mut Connection<'_, Core>,
zksync_chain_id: L2ChainId,
5 changes: 0 additions & 5 deletions infrastructure/zk/src/server.ts
Original file line number Diff line number Diff line change
@@ -14,10 +14,6 @@ export async function server(rebuildTree: boolean, uring: boolean, components?:
if (rebuildTree || components || useNodeFramework) {
options += ' --';
}
if (rebuildTree) {
clean('db');
options += ' --rebuild-tree';
}
if (components) {
options += ` --components=${components}`;
}
@@ -75,7 +71,6 @@ export async function genesisFromBinary() {
export const serverCommand = new Command('server')
.description('start zksync server')
.option('--genesis', 'generate genesis data via server')
.option('--rebuild-tree', 'rebuilds merkle tree from database logs', 'rebuild_tree')
.option('--uring', 'enables uring support for RocksDB')
.option('--components <components>', 'comma-separated list of components to run')
.option('--chain-name <chain-name>', 'environment name')