Skip to content

Commit

Permalink
fix(en): Remove duplicate reorg detector (#1783)
Browse files Browse the repository at this point in the history
## What ❔

- Removes duplicate reorg detector initialization that sneaked in #1627
when updating the PR. Changes the health check API to detect duplicate
components.
- Fixes a race condition in the snapshot recovery test.

## Why ❔

Running multiple reorg detectors (or multiple component instances in
general) doesn't make sense.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli authored Apr 24, 2024
1 parent df54516 commit 3417941
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub(crate) async fn ensure_storage_initialized(
Box::new(main_node_client.for_component("snapshot_recovery")),
blob_store,
);
app_health.insert_component(snapshots_applier_task.health_check());
app_health.insert_component(snapshots_applier_task.health_check())?;

let recovery_started_at = Instant::now();
let stats = snapshots_applier_task
Expand Down
34 changes: 11 additions & 23 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn run_tree(
.with_recovery_pool(recovery_pool);

let tree_reader = Arc::new(metadata_calculator.tree_reader());
app_health.insert_component(metadata_calculator.tree_health_check());
app_health.insert_component(metadata_calculator.tree_health_check())?;

if let Some(api_config) = api_config {
let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into();
Expand Down Expand Up @@ -201,7 +201,7 @@ async fn run_core(
) -> anyhow::Result<SyncState> {
// Create components.
let sync_state = SyncState::default();
app_health.insert_custom_component(Arc::new(sync_state.clone()));
app_health.insert_custom_component(Arc::new(sync_state.clone()))?;
let (action_queue_sender, action_queue) = ActionQueue::new();

let (persistence, miniblock_sealer) = StateKeeperPersistence::new(
Expand Down Expand Up @@ -299,18 +299,6 @@ async fn run_core(
task_handles.push(tokio::spawn(db_pruner.run(stop_receiver.clone())));
}

let reorg_detector = ReorgDetector::new(main_node_client.clone(), connection_pool.clone());
app_health.insert_component(reorg_detector.health_check().clone());
task_handles.push(tokio::spawn({
let stop = stop_receiver.clone();
async move {
reorg_detector
.run(stop)
.await
.context("reorg_detector.run()")
}
}));

let sk_handle = task::spawn(state_keeper.run());
let fee_params_fetcher_handle =
tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone()));
Expand Down Expand Up @@ -359,7 +347,7 @@ async fn run_core(
.context("cannot initialize consistency checker")?
.with_diamond_proxy_addr(diamond_proxy_addr);

app_health.insert_component(consistency_checker.health_check().clone());
app_health.insert_component(consistency_checker.health_check().clone())?;
let consistency_checker_handle = tokio::spawn(consistency_checker.run(stop_receiver.clone()));

let batch_status_updater = BatchStatusUpdater::new(
Expand All @@ -369,14 +357,14 @@ async fn run_core(
.await
.context("failed to build a connection pool for BatchStatusUpdater")?,
);
app_health.insert_component(batch_status_updater.health_check());
app_health.insert_component(batch_status_updater.health_check())?;

let commitment_generator_pool = singleton_pool_builder
.build()
.await
.context("failed to build a commitment_generator_pool")?;
let commitment_generator = CommitmentGenerator::new(commitment_generator_pool);
app_health.insert_component(commitment_generator.health_check());
app_health.insert_component(commitment_generator.health_check())?;
let commitment_generator_handle = tokio::spawn(commitment_generator.run(stop_receiver.clone()));

let updater_handle = task::spawn(batch_status_updater.run(stop_receiver.clone()));
Expand Down Expand Up @@ -533,7 +521,7 @@ async fn run_api(
.run(stop_receiver.clone())
.await
.context("Failed initializing HTTP JSON-RPC server")?;
app_health.insert_component(http_server_handles.health_check);
app_health.insert_component(http_server_handles.health_check)?;
task_handles.extend(http_server_handles.tasks);
}

Expand Down Expand Up @@ -562,7 +550,7 @@ async fn run_api(
.run(stop_receiver.clone())
.await
.context("Failed initializing WS JSON-RPC server")?;
app_health.insert_component(ws_server_handles.health_check);
app_health.insert_component(ws_server_handles.health_check)?;
task_handles.extend(ws_server_handles.tasks);
}

Expand Down Expand Up @@ -674,7 +662,7 @@ async fn init_tasks(
if let Some(port) = config.optional.prometheus_port {
let (prometheus_health_check, prometheus_health_updater) =
ReactiveHealthCheck::new("prometheus_exporter");
app_health.insert_component(prometheus_health_check);
app_health.insert_component(prometheus_health_check)?;
task_handles.push(tokio::spawn(async move {
prometheus_health_updater.update(HealthStatus::Ready.into());
let result = PrometheusExporterConfig::pull(port)
Expand Down Expand Up @@ -887,10 +875,10 @@ async fn run_node(
));
app_health.insert_custom_component(Arc::new(MainNodeHealthCheck::from(
main_node_client.clone(),
)));
)))?;
app_health.insert_custom_component(Arc::new(ConnectionPoolHealthCheck::new(
connection_pool.clone(),
)));
)))?;

// Start the health check server early into the node lifecycle so that its health can be monitored from the very start.
let healthcheck_handle = HealthCheckHandle::spawn_server(
Expand Down Expand Up @@ -983,7 +971,7 @@ async fn run_node(
tracing::info!("Rollback successfully completed");
}

app_health.insert_component(reorg_detector.health_check().clone());
app_health.insert_component(reorg_detector.health_check().clone())?;
task_handles.push(tokio::spawn({
let stop = stop_receiver.clone();
async move {
Expand Down
1 change: 1 addition & 0 deletions core/lib/health_check/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-trait.workspace = true
futures.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["sync", "time"] }
tracing.workspace = true

Expand Down
34 changes: 27 additions & 7 deletions core/lib/health_check/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ impl From<HealthStatus> for Health {
}
}

#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum AppHealthCheckError {
/// Component is redefined.
#[error("cannot insert health check for component `{0}`: it is redefined")]
RedefinedComponent(&'static str),
}

/// Application health check aggregating health from multiple components.
#[derive(Debug)]
pub struct AppHealthCheck {
Expand Down Expand Up @@ -132,24 +140,36 @@ impl AppHealthCheck {
}

/// Inserts health check for a component.
pub fn insert_component(&self, health_check: ReactiveHealthCheck) {
self.insert_custom_component(Arc::new(health_check));
///
/// # Errors
///
/// Returns an error if the component with the same name is already defined.
pub fn insert_component(
&self,
health_check: ReactiveHealthCheck,
) -> Result<(), AppHealthCheckError> {
self.insert_custom_component(Arc::new(health_check))
}

/// Inserts a custom health check for a component.
pub fn insert_custom_component(&self, health_check: Arc<dyn CheckHealth>) {
///
/// # Errors
///
/// Returns an error if the component with the same name is already defined.
pub fn insert_custom_component(
&self,
health_check: Arc<dyn CheckHealth>,
) -> Result<(), AppHealthCheckError> {
let health_check_name = health_check.name();
let mut guard = self
.components
.lock()
.expect("`AppHealthCheck` is poisoned");
if guard.iter().any(|check| check.name() == health_check_name) {
tracing::warn!(
"Health check with name `{health_check_name}` is redefined; only the last mention \
will be present in `/health` endpoint output"
);
return Err(AppHealthCheckError::RedefinedComponent(health_check_name));
}
guard.push(health_check);
Ok(())
}

/// Checks the overall application health. This will query all component checks concurrently.
Expand Down
14 changes: 14 additions & 0 deletions core/lib/health_check/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,17 @@ async fn aggregating_health_checks() {
HealthStatus::Affected
);
}

#[test]
fn adding_duplicate_component() {
let checks = AppHealthCheck::default();
let (health_check, _health_updater) = ReactiveHealthCheck::new("test");
checks.insert_component(health_check.clone()).unwrap();

let err = checks.insert_component(health_check.clone()).unwrap_err();
assert_matches!(err, AppHealthCheckError::RedefinedComponent("test"));
let err = checks
.insert_custom_component(Arc::new(health_check))
.unwrap_err();
assert_matches!(err, AppHealthCheckError::RedefinedComponent("test"));
}
16 changes: 8 additions & 8 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ pub async fn initialize_components(

let (prometheus_health_check, prometheus_health_updater) =
ReactiveHealthCheck::new("prometheus_exporter");
app_health.insert_component(prometheus_health_check);
app_health.insert_component(prometheus_health_check)?;
let prometheus_task = prom_config.run(stop_receiver.clone());
let prometheus_task = tokio::spawn(async move {
prometheus_health_updater.update(HealthStatus::Ready.into());
Expand Down Expand Up @@ -802,15 +802,15 @@ pub async fn initialize_components(
.await
.context("failed to build commitment_generator_pool")?;
let commitment_generator = CommitmentGenerator::new(commitment_generator_pool);
app_health.insert_component(commitment_generator.health_check());
app_health.insert_component(commitment_generator.health_check())?;
task_futures.push(tokio::spawn(
commitment_generator.run(stop_receiver.clone()),
));
}

// Run healthcheck server for all components.
let db_health_check = ConnectionPoolHealthCheck::new(replica_connection_pool);
app_health.insert_custom_component(Arc::new(db_health_check));
app_health.insert_custom_component(Arc::new(db_health_check))?;
let health_check_handle =
HealthCheckHandle::spawn_server(health_check_config.bind_addr(), app_health);

Expand Down Expand Up @@ -1035,7 +1035,7 @@ async fn run_tree(
}

let tree_health_check = metadata_calculator.tree_health_check();
app_health.insert_component(tree_health_check);
app_health.insert_component(tree_health_check)?;
let tree_task = tokio::spawn(metadata_calculator.run(stop_receiver));
task_futures.push(tree_task);

Expand Down Expand Up @@ -1331,7 +1331,7 @@ async fn run_http_api(
if let Some(tree_api_url) = api_config.web3_json_rpc.tree_api_url() {
let tree_api = Arc::new(TreeApiHttpClient::new(tree_api_url));
api_builder = api_builder.with_tree_api(tree_api.clone());
app_health.insert_custom_component(tree_api);
app_health.insert_custom_component(tree_api)?;
}

let server_handles = api_builder
Expand All @@ -1340,7 +1340,7 @@ async fn run_http_api(
.run(stop_receiver)
.await?;
task_futures.extend(server_handles.tasks);
app_health.insert_component(server_handles.health_check);
app_health.insert_component(server_handles.health_check)?;
Ok(())
}

Expand Down Expand Up @@ -1399,7 +1399,7 @@ async fn run_ws_api(
if let Some(tree_api_url) = api_config.web3_json_rpc.tree_api_url() {
let tree_api = Arc::new(TreeApiHttpClient::new(tree_api_url));
api_builder = api_builder.with_tree_api(tree_api.clone());
app_health.insert_custom_component(tree_api);
app_health.insert_custom_component(tree_api)?;
}

let server_handles = api_builder
Expand All @@ -1408,7 +1408,7 @@ async fn run_ws_api(
.run(stop_receiver)
.await?;
task_futures.extend(server_handles.tasks);
app_health.insert_component(server_handles.health_check);
app_health.insert_component(server_handles.health_check)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ impl WiringLayer for CommitmentGeneratorLayer {
let commitment_generator = CommitmentGenerator::new(main_pool);

let AppHealthCheckResource(app_health) = context.get_resource_or_default().await;
app_health.insert_component(commitment_generator.health_check());
app_health
.insert_component(commitment_generator.health_check())
.map_err(WiringError::internal)?;

context.add_task(Box::new(CommitmentGeneratorTask {
commitment_generator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ impl WiringLayer for ConsistencyCheckerLayer {
.with_diamond_proxy_addr(self.diamond_proxy_addr);

let AppHealthCheckResource(app_health) = context.get_resource_or_default().await;
app_health.insert_component(consistency_checker.health_check().clone());
app_health
.insert_component(consistency_checker.health_check().clone())
.map_err(WiringError::internal)?;

// Create and add tasks.
context.add_task(Box::new(ConsistencyCheckerTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ impl WiringLayer for MetadataCalculatorLayer {
.with_recovery_pool(recovery_pool);

let AppHealthCheckResource(app_health) = context.get_resource_or_default().await;
app_health.insert_component(metadata_calculator.tree_health_check());
app_health
.insert_component(metadata_calculator.tree_health_check())
.map_err(WiringError::internal)?;

let task = Box::new(MetadataCalculatorTask {
metadata_calculator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ impl WiringLayer for PrometheusExporterLayer {
ReactiveHealthCheck::new("prometheus_exporter");

let AppHealthCheckResource(app_health) = node.get_resource_or_default().await;
app_health.insert_component(prometheus_health_check);
app_health
.insert_component(prometheus_health_check)
.map_err(WiringError::internal)?;

let task = Box::new(PrometheusExporterTask {
config: self.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ impl WiringLayer for Web3ServerLayer {
// Insert healthcheck.
let api_health_check = server.health_check();
let AppHealthCheckResource(app_health) = context.get_resource_or_default().await;
app_health.insert_component(api_health_check);
app_health
.insert_component(api_health_check)
.map_err(WiringError::internal)?;

// Insert circuit breaker.
let circuit_breaker_resource = context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ impl WiringLayer for TreeApiClientLayer {
if let Some(url) = &self.url {
let client = Arc::new(TreeApiHttpClient::new(url));
let AppHealthCheckResource(app_health) = context.get_resource_or_default().await;
app_health.insert_custom_component(client.clone());
app_health
.insert_custom_component(client.clone())
.map_err(WiringError::internal)?;
context.insert_resource(TreeApiClientResource(client))?;
}
Ok(())
Expand Down
7 changes: 7 additions & 0 deletions core/node/node_framework/src/wiring_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ pub enum WiringError {
#[error(transparent)]
Internal(#[from] anyhow::Error),
}

impl WiringError {
/// Wraps the specified internal error.
pub fn internal(err: impl Into<anyhow::Error>) -> Self {
Self::Internal(err.into())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ describe('snapshot recovery', () => {

if (!consistencyCheckerSucceeded) {
const status = health.components.consistency_checker?.status;
expect(status).to.be.oneOf([undefined, 'ready']);
expect(status).to.be.oneOf([undefined, 'not_ready', 'ready']);
const details = health.components.consistency_checker?.details;
if (details !== undefined) {
if (status === 'ready' && details !== undefined) {
console.log('Received consistency checker health details', details);
if (details.first_checked_batch !== undefined && details.last_checked_batch !== undefined) {
expect(details.first_checked_batch).to.equal(snapshotMetadata.l1BatchNumber + 1);
Expand All @@ -281,9 +281,9 @@ describe('snapshot recovery', () => {

if (!reorgDetectorSucceeded) {
const status = health.components.reorg_detector?.status;
expect(status).to.be.oneOf([undefined, 'ready']);
expect(status).to.be.oneOf([undefined, 'not_ready', 'ready']);
const details = health.components.reorg_detector?.details;
if (details !== undefined) {
if (status === 'ready' && details !== undefined) {
console.log('Received reorg detector health details', details);
if (details.last_correct_l1_batch !== undefined && details.last_correct_miniblock !== undefined) {
expect(details.last_correct_l1_batch).to.be.greaterThan(snapshotMetadata.l1BatchNumber);
Expand Down
1 change: 1 addition & 0 deletions prover/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3417941

Please sign in to comment.