Skip to content

Commit

Permalink
Adapt to new wiring layer interface
Browse files Browse the repository at this point in the history
  • Loading branch information
popzxc committed Jul 9, 2024
1 parent f13f317 commit ef7dca2
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 82 deletions.
2 changes: 1 addition & 1 deletion core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use zksync_node_framework::{
main_node_fee_params_fetcher::MainNodeFeeParamsFetcherLayer,
metadata_calculator::MetadataCalculatorLayer,
node_storage_init::{
external_node_role::{ExternalNodeInitStrategyLayer, SnapshotRecoveryConfig},
external_node_strategy::{ExternalNodeInitStrategyLayer, SnapshotRecoveryConfig},
NodeStorageInitializerLayer,
},
pools_layer::PoolsLayerBuilder,
Expand Down
2 changes: 1 addition & 1 deletion core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use zksync_node_framework::{
l1_gas::SequencerL1GasLayer,
metadata_calculator::MetadataCalculatorLayer,
node_storage_init::{
main_node_role::MainNodeInitStrategyLayer, NodeStorageInitializerLayer,
main_node_strategy::MainNodeInitStrategyLayer, NodeStorageInitializerLayer,
},
object_store::ObjectStoreLayer,
pk_signing_eth_client::PKSigningEthClientLayer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,56 @@ use crate::{
pools::{MasterPool, PoolResource},
reverter::BlockReverterResource,
},
service::ServiceContext,
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};

/// Wiring layer for external node initialization strategy.
///
/// ## Requests resources
///
/// - `PoolResource<MasterPool>`
/// - `MainNodeClientResource`
/// - `BlockReverterResource` (optional)
/// - `AppHealthCheckResource` (adds a health check)
///
/// ## Adds resources
///
/// - `NodeInitializationStrategyResource`
#[derive(Debug)]
pub struct ExternalNodeInitStrategyLayer {
pub l2_chain_id: L2ChainId,
pub snapshot_recovery_config: Option<SnapshotRecoveryConfig>,
}

#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
pub main_node_client: MainNodeClientResource,
pub block_reverter: Option<BlockReverterResource>,
#[context(default)]
pub app_health: AppHealthCheckResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
pub strategy: NodeInitializationStrategyResource,
}

#[async_trait::async_trait]
impl WiringLayer for ExternalNodeInitStrategyLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
"external_node_role_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let pool = context
.get_resource::<PoolResource<MasterPool>>()?
.get()
.await?;
let MainNodeClientResource(client) = context.get_resource()?;
let AppHealthCheckResource(app_health) = context.get_resource_or_default();
let block_reverter = match context.get_resource::<BlockReverterResource>() {
Ok(reverter) => {
async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let pool = input.master_pool.get().await?;
let MainNodeClientResource(client) = input.main_node_client;
let AppHealthCheckResource(app_health) = input.app_health;
let block_reverter = match input.block_reverter {
Some(reverter) => {
// If reverter was provided, we intend to be its sole consumer.
// We don't want multiple components to attempt reverting blocks.
let reverter = reverter.0.take().ok_or(WiringError::Configuration(
"BlockReverterResource is taken".into(),
))?;
Some(reverter)
}
Err(WiringError::ResourceLacking { .. }) => None,
Err(err) => return Err(err),
None => None,
};

let genesis = Arc::new(ExternalNodeGenesis {
Expand Down Expand Up @@ -90,7 +94,8 @@ impl WiringLayer for ExternalNodeInitStrategyLayer {
block_reverter,
};

context.insert_resource(NodeInitializationStrategyResource(strategy))?;
Ok(())
Ok(Output {
strategy: strategy.into(),
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,42 @@ use crate::{
eth_interface::EthInterfaceResource,
pools::{MasterPool, PoolResource},
},
service::ServiceContext,
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};

/// Wiring layer for main node initialization strategy.
///
/// ## Requests resources
///
/// - `PoolResource<MasterPool>`
/// - `EthInterfaceResource`
///
/// ## Adds resources
///
/// - `NodeRoleResource`
#[derive(Debug)]
pub struct MainNodeInitStrategyLayer {
pub genesis: GenesisConfig,
pub contracts: ContractsConfig,
}

#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
pub eth_interface: EthInterfaceResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
pub strategy: NodeInitializationStrategyResource,
}

#[async_trait::async_trait]
impl WiringLayer for MainNodeInitStrategyLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
"main_node_role_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let pool = context
.get_resource::<PoolResource<MasterPool>>()?
.get()
.await?;
let EthInterfaceResource(l1_client) = context.get_resource()?;
async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let pool = input.master_pool.get().await?;
let EthInterfaceResource(l1_client) = input.eth_interface;
let genesis = Arc::new(MainNodeGenesis {
contracts: self.contracts,
genesis: self.genesis,
Expand All @@ -53,7 +57,8 @@ impl WiringLayer for MainNodeInitStrategyLayer {
block_reverter: None,
};

context.insert_resource(NodeInitializationStrategyResource(strategy))?;
Ok(())
Ok(Output {
strategy: strategy.into(),
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use zksync_node_storage_init::{NodeInitializationStrategy, NodeStorageInitialize
use crate::{
implementations::resources::pools::{MasterPool, PoolResource},
resource::Resource,
service::{ServiceContext, StopReceiver},
service::StopReceiver,
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};

pub mod external_node_role;
pub mod main_node_role;
pub mod external_node_strategy;
pub mod main_node_strategy;

/// Wiring layer for `NodeStorageInializer`.
///
Expand Down Expand Up @@ -38,32 +39,64 @@ impl NodeStorageInitializerLayer {
}
}

#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
pub strategy: NodeInitializationStrategyResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
#[context(task)]
pub initializer: Option<NodeStorageInitializer>,
#[context(task)]
pub precondition: Option<NodeStorageInitializerPrecondition>,
}

impl Output {
fn initializer(initializer: NodeStorageInitializer) -> Self {
Self {
initializer: Some(initializer),
precondition: None,
}
}

fn precondition(precondition: NodeStorageInitializer) -> Self {
Self {
initializer: None,
precondition: Some(NodeStorageInitializerPrecondition(precondition)),
}
}
}

#[async_trait::async_trait]
impl WiringLayer for NodeStorageInitializerLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
if self.as_precondition {
return "node_storage_initializer_precondition_layer";
}
"node_storage_initializer_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let pool = context
.get_resource::<PoolResource<MasterPool>>()?
.get()
.await?;
let NodeInitializationStrategyResource(role) = context.get_resource()?;
async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let pool = input.master_pool.get().await?;
let NodeInitializationStrategyResource(strategy) = input.strategy;

let initializer = NodeStorageInitializer::new(role, pool);
let initializer = NodeStorageInitializer::new(strategy, pool);

// Insert either task or precondition.
if self.as_precondition {
context.add_task(NodeStorageInitializerPrecondition(initializer));
let output = if self.as_precondition {
Output::precondition(initializer)
} else {
context.add_task(initializer);
}
Output::initializer(initializer)
};

Ok(())
Ok(output)
}
}

Expand All @@ -85,8 +118,10 @@ impl Task for NodeStorageInitializer {
}
}

/// Runs [`NodeStorageInitializer`] as a precondition, blocking
/// tasks from starting until the storage is initialized.
#[derive(Debug)]
struct NodeStorageInitializerPrecondition(NodeStorageInitializer);
pub struct NodeStorageInitializerPrecondition(NodeStorageInitializer);

#[async_trait::async_trait]
impl Task for NodeStorageInitializerPrecondition {
Expand Down Expand Up @@ -117,3 +152,9 @@ impl Resource for NodeInitializationStrategyResource {
"node_initialization_strategy".into()
}
}

impl From<NodeInitializationStrategy> for NodeInitializationStrategyResource {
fn from(strategy: NodeInitializationStrategy) -> Self {
Self(strategy)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,56 @@ use crate::{
main_node_client::MainNodeClientResource,
pools::{MasterPool, PoolResource},
},
service::{ServiceContext, StopReceiver},
service::StopReceiver,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};

/// Wiring layer for [`ReorgDetector`] checker.
/// This layer is responsible for detecting reorgs and shutting down the node if one is detected.
///
/// This layer assumes that the node starts with the initialized state.
///
/// ## Requests resources
///
/// - `MainNodeClientResource`
/// - `PoolResource<MasterPool>`
/// - `AppHealthCheckResource` (adds a health check)
///
/// ## Adds preconditions
///
/// - `ReorgDetector`
#[derive(Debug)]
pub struct ReorgDetectorLayer;

#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub main_node_client: MainNodeClientResource,
pub master_pool: PoolResource<MasterPool>,
#[context(default)]
pub app_health: AppHealthCheckResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
#[context(task)]
pub reorg_detector: ReorgDetector,
}

#[async_trait::async_trait]
impl WiringLayer for ReorgDetectorLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
"reorg_detector_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
// Get resources.
let main_node_client = context.get_resource::<MainNodeClientResource>()?.0;

let pool_resource = context.get_resource::<PoolResource<MasterPool>>()?;
let pool = pool_resource.get().await?;
async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let MainNodeClientResource(main_node_client) = input.main_node_client;
let pool = input.master_pool.get().await?;

let reorg_detector = ReorgDetector::new(main_node_client, pool);

let AppHealthCheckResource(app_health) = context.get_resource_or_default();
let AppHealthCheckResource(app_health) = input.app_health;
app_health
.insert_component(reorg_detector.health_check().clone())
.map_err(WiringError::internal)?;

context.add_task(reorg_detector);

Ok(())
Ok(Output { reorg_detector })
}
}

Expand Down

0 comments on commit ef7dca2

Please sign in to comment.