Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(en): Fix reorg detector logic for dealing with last L1 batch #1906

Merged
merged 7 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,9 @@ async fn init_tasks(
}

async fn shutdown_components(
stop_sender: watch::Sender<bool>,
tasks: ManagedTasks,
healthcheck_handle: HealthCheckHandle,
) -> anyhow::Result<()> {
stop_sender.send(true).ok();
task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("error waiting for RocksDB instances to drop")?;
Expand Down Expand Up @@ -892,7 +890,8 @@ async fn run_node(
) -> anyhow::Result<()> {
tracing::warn!("The external node is in the alpha phase, and should be used with caution.");
tracing::info!("Started the external node");
let (stop_sender, stop_receiver) = watch::channel(false);
let (stop_sender, mut stop_receiver) = watch::channel(false);
let stop_sender = Arc::new(stop_sender);

let app_health = Arc::new(AppHealthCheck::new(
config.optional.healthcheck_slow_time_limit(),
Expand Down Expand Up @@ -959,6 +958,16 @@ async fn run_node(
)
.await?;
let sigint_receiver = env.setup_sigint_handler();
// Spawn reacting to signals in a separate task so that the node is responsive to signals right away
// (e.g., during the initial reorg detection).
tokio::spawn({
let stop_sender = stop_sender.clone();
async move {
slowli marked this conversation as resolved.
Show resolved Hide resolved
sigint_receiver.await.ok();
tracing::info!("Stop signal received, shutting down");
stop_sender.send_replace(true);
}
});

// Revert the storage if needed.
let mut reverter = BlockReverter::new(NodeRole::External, connection_pool.clone());
Expand All @@ -974,8 +983,15 @@ async fn run_node(
// the node lifecycle, the node will exit the same way as it does with any other critical error,
// and would restart. Then, on the 2nd launch reorg would be detected here, then processed and the node
// will be able to operate normally afterwards.
match reorg_detector.check_consistency().await {
Ok(()) => {}
match reorg_detector.run_once(stop_receiver.clone()).await {
Ok(()) if *stop_receiver.borrow() => {
tracing::info!("Stop signal received during initial reorg detection; shutting down");
healthcheck_handle.stop().await;
return Ok(());
}
Ok(()) => {
tracing::info!("Successfully checked no reorg compared to the main node");
}
Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => {
tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}");
reverter.roll_back(last_correct_l1_batch).await?;
Expand Down Expand Up @@ -1027,15 +1043,14 @@ async fn run_node(

let mut tasks = ManagedTasks::new(task_handles);
tokio::select! {
_ = tasks.wait_single() => {},
_ = sigint_receiver => {
tracing::info!("Stop signal received, shutting down");
},
() = tasks.wait_single() => {},
_ = stop_receiver.changed() => {},
};

// Reaching this point means that either some actor exited unexpectedly or we received a stop signal.
// Broadcast the stop signal to all actors and exit.
shutdown_components(stop_sender, tasks, healthcheck_handle).await?;
// Broadcast the stop signal (in case it wasn't broadcast previously) to all actors and exit.
stop_sender.send_replace(true);
shutdown_components(tasks, healthcheck_handle).await?;
tracing::info!("Stopped");
Ok(())
}
120 changes: 92 additions & 28 deletions core/bin/external_node/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use zksync_node_genesis::{insert_genesis_batch, GenesisParams};
use zksync_types::{
api, ethabi, fee_model::FeeParams, Address, L1BatchNumber, L2BlockNumber, H256, U64,
};
use zksync_web3_decl::client::MockClient;
use zksync_web3_decl::{client::MockClient, jsonrpsee::core::ClientError};

use super::*;

Expand Down Expand Up @@ -96,6 +96,34 @@ fn expected_health_components(components: &ComponentsToRun) -> Vec<&'static str>
output
}

fn mock_eth_client(diamond_proxy_addr: Address) -> MockEthereum {
MockEthereum::default().with_call_handler(move |call, _| {
tracing::info!("L1 call: {call:?}");
if call.to == Some(diamond_proxy_addr) {
let call_signature = &call.data.as_ref().unwrap().0[..4];
let contract = zksync_contracts::hyperchain_contract();
let pricing_mode_sig = contract
.function("getPubdataPricingMode")
.unwrap()
.short_signature();
let protocol_version_sig = contract
.function("getProtocolVersion")
.unwrap()
.short_signature();
match call_signature {
sig if sig == pricing_mode_sig => {
return ethabi::Token::Uint(0.into()); // "rollup" mode encoding
}
sig if sig == protocol_version_sig => {
return ethabi::Token::Uint((ProtocolVersionId::latest() as u16).into())
}
_ => { /* unknown call; panic below */ }
}
}
panic!("Unexpected L1 call: {call:?}");
})
}

#[test_casing(5, ["all", "core", "api", "tree", "tree,tree_api"])]
#[tokio::test]
#[tracing::instrument] // Add args to the test logs
Expand Down Expand Up @@ -159,33 +187,7 @@ async fn external_node_basics(components_str: &'static str) {
.method("en_whitelistedTokensForAA", || Ok([] as [Address; 0]))
.build();
let l2_client = Box::new(l2_client);

let eth_client = MockEthereum::default().with_call_handler(move |call, _| {
tracing::info!("L1 call: {call:?}");
if call.to == Some(diamond_proxy_addr) {
let call_signature = &call.data.as_ref().unwrap().0[..4];
let contract = zksync_contracts::hyperchain_contract();
let pricing_mode_sig = contract
.function("getPubdataPricingMode")
.unwrap()
.short_signature();
let protocol_version_sig = contract
.function("getProtocolVersion")
.unwrap()
.short_signature();
match call_signature {
sig if sig == pricing_mode_sig => {
return ethabi::Token::Uint(0.into()); // "rollup" mode encoding
}
sig if sig == protocol_version_sig => {
return ethabi::Token::Uint((ProtocolVersionId::latest() as u16).into())
}
_ => { /* unknown call; panic below */ }
}
}
panic!("Unexpected L1 call: {call:?}");
});
let eth_client = Box::new(eth_client);
let eth_client = Box::new(mock_eth_client(diamond_proxy_addr));

let (env, env_handles) = TestEnvironment::new();
let node_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -242,3 +244,65 @@ async fn external_node_basics(components_str: &'static str) {
assert_matches!(component_health.status(), HealthStatus::ShutDown);
}
}

#[tokio::test]
async fn node_reacts_to_stop_signal_during_initial_reorg_detection() {
let _guard = vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging
let temp_dir = tempfile::TempDir::new().unwrap();

let connection_pool = ConnectionPool::test_pool().await;
let singleton_pool_builder = ConnectionPool::singleton(connection_pool.database_url().clone());
let mut storage = connection_pool.connection().await.unwrap();
insert_genesis_batch(&mut storage, &GenesisParams::mock())
.await
.unwrap();
drop(storage);

let opt = Cli {
revert_pending_l1_batch: false,
enable_consensus: false,
components: "core".parse().unwrap(),
};
let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool);
if opt.components.0.contains(&Component::TreeApi) {
config.tree_component.api_port = Some(0);
}

let l2_client = MockClient::builder(L2::default())
.method("eth_chainId", || Ok(U64::from(270)))
.method("zks_L1ChainId", || Ok(U64::from(9)))
.method("zks_L1BatchNumber", || {
Err::<(), _>(ClientError::RequestTimeout)
})
.method("eth_blockNumber", || {
Err::<(), _>(ClientError::RequestTimeout)
})
.method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default()))
.method("en_whitelistedTokensForAA", || Ok([] as [Address; 0]))
.build();
let l2_client = Box::new(l2_client);
let diamond_proxy_addr = config.remote.diamond_proxy_addr;
let eth_client = Box::new(mock_eth_client(diamond_proxy_addr));

let (env, env_handles) = TestEnvironment::new();
let mut node_handle = tokio::spawn(async move {
run_node(
env,
&opt,
&config,
connection_pool,
singleton_pool_builder,
l2_client,
eth_client,
)
.await
});

// Check that the node doesn't stop on its own.
let timeout_result = tokio::time::timeout(Duration::from_millis(50), &mut node_handle).await;
assert_matches!(timeout_result, Err(tokio::time::error::Elapsed { .. }));

// Send a stop signal and check that the node reacts to it.
env_handles.sigint_sender.send(()).unwrap();
node_handle.await.unwrap().unwrap();
}
Loading
Loading