Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unify and port node storage initialization #2363

Merged
merged 26 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
876c488
Snapshot
popzxc Jul 1, 2024
3362cd2
Implement node storage initializer
popzxc Jul 1, 2024
d55ee1c
Make snapshot applier aware of stop signals
popzxc Jul 1, 2024
5356453
Implement unified initializer
popzxc Jul 1, 2024
ce0d27a
Rework node role to be a trait
popzxc Jul 1, 2024
6739f80
Implement node storage init layer
popzxc Jul 1, 2024
3e9941f
Rework reorg detector layer
popzxc Jul 1, 2024
f960c57
Improve framework integration
popzxc Jul 1, 2024
3ed1ab9
Port main node genesis to the framework
popzxc Jul 2, 2024
2373118
Port external node storage init to the framework
popzxc Jul 2, 2024
c2b88e3
zk fmt
popzxc Jul 2, 2024
a5ffc3e
Merge branch 'main' into popzxc-init-en-storage
popzxc Jul 2, 2024
2d869cd
Polishing
popzxc Jul 2, 2024
68c4e0d
Merge branch 'main' into popzxc-init-en-storage
popzxc Jul 2, 2024
376bed7
Rework initialization
popzxc Jul 3, 2024
d6274cc
Change box to arc
popzxc Jul 4, 2024
3a91b25
Rename revert to reverter
popzxc Jul 4, 2024
0be4309
Use connection_tagged
popzxc Jul 4, 2024
e5f4ed7
Explicitly use unit type
popzxc Jul 4, 2024
dba8892
is_revert_needed -> last_correct_batch_for_reorg
popzxc Jul 4, 2024
6c2faae
Pass stop receiver to revert check
popzxc Jul 4, 2024
e76a601
Any way to initialize db is fine
popzxc Jul 4, 2024
a40fb89
Merge branch 'main' into popzxc-init-en-storage
popzxc Jul 4, 2024
f13f317
Merge branch 'main' into popzxc-init-en-storage
popzxc Jul 9, 2024
ef7dca2
Adapt to new wiring layer interface
popzxc Jul 9, 2024
3224251
Remove commented code
popzxc Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ members = [
"core/node/consistency_checker",
"core/node/metadata_calculator",
"core/node/node_sync",
"core/node/node_storage_init",
"core/node/consensus",
"core/node/contract_verification_server",
"core/node/api_server",
Expand Down Expand Up @@ -274,6 +275,7 @@ zksync_reorg_detector = { path = "core/node/reorg_detector" }
zksync_consistency_checker = { path = "core/node/consistency_checker" }
zksync_metadata_calculator = { path = "core/node/metadata_calculator" }
zksync_node_sync = { path = "core/node/node_sync" }
zksync_node_storage_init = { path = "core/node/node_storage_init" }
zksync_node_consensus = { path = "core/node/consensus" }
zksync_contract_verification_server = { path = "core/node/contract_verification_server" }
zksync_node_api_server = { path = "core/node/api_server" }
Expand Down
8 changes: 7 additions & 1 deletion core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::time::Instant;

use anyhow::Context as _;
use tokio::sync::watch;
use zksync_config::ObjectStoreConfig;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::AppHealthCheck;
Expand Down Expand Up @@ -30,6 +31,7 @@ enum InitDecision {
}

pub(crate) async fn ensure_storage_initialized(
stop_receiver: watch::Receiver<bool>,
pool: ConnectionPool<Core>,
main_node_client: Box<DynClient<L2>>,
app_health: &AppHealthCheck,
Expand Down Expand Up @@ -120,7 +122,7 @@ pub(crate) async fn ensure_storage_initialized(

let recovery_started_at = Instant::now();
let stats = snapshots_applier_task
.run()
.run(stop_receiver)
.await
.context("snapshot recovery failed")?;
if stats.done_work {
Expand All @@ -129,6 +131,10 @@ pub(crate) async fn ensure_storage_initialized(
.set(latency);
tracing::info!("Recovered Postgres from snapshot in {latency:?}");
}
assert!(
!stats.canceled,
"Snapshot recovery task cannot be canceled in the current implementation"
);
}
}
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,10 @@ async fn run_node(
.snapshots_recovery_drop_storage_key_preimages,
object_store_config: config.optional.snapshots_recovery_object_store.clone(),
});
// Note: while stop receiver is passed there, it won't be respected, since we wait this task
// to complete. Will be fixed after migration to the node framework.
ensure_storage_initialized(
stop_receiver.clone(),
connection_pool.clone(),
main_node_client.clone(),
&app_health,
Expand Down
65 changes: 63 additions & 2 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ use zksync_node_framework::{
main_node_client::MainNodeClientLayer,
main_node_fee_params_fetcher::MainNodeFeeParamsFetcherLayer,
metadata_calculator::MetadataCalculatorLayer,
node_storage_init::{
external_node_strategy::{ExternalNodeInitStrategyLayer, SnapshotRecoveryConfig},
NodeStorageInitializerLayer,
},
pools_layer::PoolsLayerBuilder,
postgres_metrics::PostgresMetricsLayer,
prometheus_exporter::PrometheusExporterLayer,
pruning::PruningLayer,
query_eth_client::QueryEthClientLayer,
reorg_detector::ReorgDetectorLayer,
sigint::SigintHandlerLayer,
state_keeper::{
external_io::ExternalIOLayer, main_batch_executor::MainBatchExecutorLayer,
Expand Down Expand Up @@ -421,6 +426,49 @@ impl ExternalNodeBuilder {
Ok(self)
}

fn add_reorg_detector_layer(mut self) -> anyhow::Result<Self> {
self.node.add_layer(ReorgDetectorLayer);
Ok(self)
}

/// This layer will make sure that the database is initialized correctly,
/// e.g.:
/// - genesis or snapshot recovery will be performed if it's required.
/// - we perform the storage rollback if required (e.g. if reorg is detected).
///
/// Depending on the `kind` provided, either a task or a precondition will be added.
///
/// *Important*: the task should be added by at most one component, because
/// it assumes unique control over the database. Multiple components adding this
/// layer in a distributed mode may result in the database corruption.
///
/// This task works in pair with precondition, which must be present in every component:
/// the precondition will prevent node from starting until the database is initialized.
fn add_storage_initialization_layer(mut self, kind: LayerKind) -> anyhow::Result<Self> {
let config = &self.config;
let snapshot_recovery_config =
config
.optional
.snapshots_recovery_enabled
.then_some(SnapshotRecoveryConfig {
snapshot_l1_batch_override: config.experimental.snapshots_recovery_l1_batch,
drop_storage_key_preimages: config
.experimental
.snapshots_recovery_drop_storage_key_preimages,
object_store_config: config.optional.snapshots_recovery_object_store.clone(),
});
self.node.add_layer(ExternalNodeInitStrategyLayer {
l2_chain_id: self.config.required.l2_chain_id,
snapshot_recovery_config,
});
let mut layer = NodeStorageInitializerLayer::new();
if matches!(kind, LayerKind::Precondition) {
layer = layer.as_precondition();
}
self.node.add_layer(layer);
Ok(self)
}

pub fn build(mut self, mut components: Vec<Component>) -> anyhow::Result<ZkStackService> {
// Add "base" layers
self = self
Expand All @@ -429,12 +477,14 @@ impl ExternalNodeBuilder {
.add_prometheus_exporter_layer()?
.add_pools_layer()?
.add_main_node_client_layer()?
.add_query_eth_client_layer()?;
.add_query_eth_client_layer()?
.add_reorg_detector_layer()?;

// Add preconditions for all the components.
self = self
.add_l1_batch_commitment_mode_validation_layer()?
.add_validate_chain_ids_layer()?;
.add_validate_chain_ids_layer()?
.add_storage_initialization_layer(LayerKind::Precondition)?;

// Sort the components, so that the components they may depend on each other are added in the correct order.
components.sort_unstable_by_key(|component| match component {
Expand Down Expand Up @@ -499,10 +549,21 @@ impl ExternalNodeBuilder {
.add_consistency_checker_layer()?
.add_commitment_generator_layer()?
.add_batch_status_updater_layer()?;

// We assign the storage initialization to the core, as it's considered to be
// the "main" component.
self = self.add_storage_initialization_layer(LayerKind::Task)?;
}
}
}

Ok(self.node.build()?)
}
}

/// Marker for layers that can add either a task or a precondition.
#[derive(Debug)]
enum LayerKind {
Task,
Precondition,
}
63 changes: 8 additions & 55 deletions core/bin/zksync_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ use zksync_config::{
SnapshotsCreatorConfig,
};
use zksync_core_leftovers::{
genesis_init, is_genesis_needed,
temp_config_store::{decode_yaml_repr, TempConfigStore},
Component, Components,
};
use zksync_env_config::FromEnv;
use zksync_eth_client::clients::Client;

use crate::node_builder::MainNodeBuilder;

Expand All @@ -42,9 +40,6 @@ struct Cli {
/// Generate genesis block for the first contract deployment using temporary DB.
#[arg(long)]
genesis: bool,
/// Rebuild tree.
#[arg(long)]
rebuild_tree: bool,
/// Comma-separated list of components to launch.
#[arg(
long,
Expand Down Expand Up @@ -180,65 +175,23 @@ fn main() -> anyhow::Result<()> {
}
};

run_genesis_if_needed(opt.genesis, &genesis, &contracts_config, &secrets)?;
if opt.genesis {
// If genesis is requested, we don't need to run the node.
return Ok(());
}

let components = if opt.rebuild_tree {
vec![Component::Tree]
} else {
opt.components.0
};

let node = MainNodeBuilder::new(
configs,
wallets,
genesis,
contracts_config,
secrets,
consensus,
)
.build(components)?;
node.run()?;
Ok(())
}
);

fn run_genesis_if_needed(
force_genesis: bool,
genesis: &GenesisConfig,
contracts_config: &ContractsConfig,
secrets: &Secrets,
) -> anyhow::Result<()> {
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
tokio_runtime.block_on(async move {
let database_secrets = secrets.database.clone().context("DatabaseSecrets")?;
if force_genesis || is_genesis_needed(&database_secrets).await {
genesis_init(genesis.clone(), &database_secrets)
.await
.context("genesis_init")?;
if opt.genesis {
// If genesis is requested, we don't need to run the node.
node.only_genesis()?.run()?;
return Ok(());
}

if let Some(ecosystem_contracts) = &contracts_config.ecosystem_contracts {
let l1_secrets = secrets.l1.as_ref().context("l1_screts")?;
let query_client = Client::http(l1_secrets.l1_rpc_url.clone())
.context("Ethereum client")?
.for_network(genesis.l1_chain_id.into())
.build();
zksync_node_genesis::save_set_chain_id_tx(
&query_client,
contracts_config.diamond_proxy_addr,
ecosystem_contracts.state_transition_proxy_addr,
&database_secrets,
)
.await
.context("Failed to save SetChainId upgrade transaction")?;
}
}
Ok(())
})
node.build(opt.components.0)?.run()?;
Ok(())
}

fn load_env_config() -> anyhow::Result<TempConfigStore> {
Expand Down
Loading
Loading