From 99c4b6e082e78be68609a99926a71c3868353419 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 24 Jul 2024 15:19:27 +0200 Subject: [PATCH 01/11] test wip --- Cargo.lock | 1 + core/lib/dal/src/consensus_dal.rs | 2 +- core/lib/types/src/api/en.rs | 7 +++ core/lib/web3_decl/src/namespaces/en.rs | 7 +++ core/node/api_server/Cargo.toml | 1 + .../web3/backend_jsonrpsee/namespaces/en.rs | 6 +++ core/node/api_server/src/web3/mod.rs | 6 ++- .../node/api_server/src/web3/namespaces/en.rs | 46 +++++++++++++++++-- core/node/consensus/src/tests.rs | 32 +++++++++++++ core/node/node_sync/src/client.rs | 8 ++++ core/node/node_sync/src/testonly.rs | 4 ++ 11 files changed, 112 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7319999316be..60b837527aca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8908,6 +8908,7 @@ dependencies = [ "tracing", "vise", "zksync_config", + "zksync_consensus_roles", "zksync_contracts", "zksync_dal", "zksync_health_check", diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 7655abbe230c..6b8ec58c2ef5 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -459,7 +459,7 @@ impl ConsensusDal<'_, '_> { Ok(row .number - .map(|number| attester::BatchNumber(number as u64))) + .map(|number| attester::BatchNumber(number.into()))) } } diff --git a/core/lib/types/src/api/en.rs b/core/lib/types/src/api/en.rs index 7232071bd44b..f7e48461a87a 100644 --- a/core/lib/types/src/api/en.rs +++ b/core/lib/types/src/api/en.rs @@ -46,3 +46,10 @@ pub struct SyncBlock { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ConsensusGenesis(pub serde_json::Value); + +/// Status of L1 simulated by the main node. +/// Used for testing L1 batch signing by consensus attesters. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SimulatedL1Status { + pub next_batch_to_commit: L1BatchNumber, +} diff --git a/core/lib/web3_decl/src/namespaces/en.rs b/core/lib/web3_decl/src/namespaces/en.rs index 3bd55ecf936f..45fc26a10021 100644 --- a/core/lib/web3_decl/src/namespaces/en.rs +++ b/core/lib/web3_decl/src/namespaces/en.rs @@ -35,6 +35,13 @@ pub trait EnNamespace { #[method(name = "genesisConfig")] async fn genesis_config(&self) -> RpcResult; + /// INTERNAL RPC: + /// Gets the status of L1 simulated by the main node. + /// This is a temporary RPC used for testing L1 batch signing + /// by consensus attesters. + #[method(name = "simulated_l1_status")] + async fn simulated_l1_status(&self) -> RpcResult; + /// Get tokens that are white-listed and it can be used by paymasters. #[method(name = "whitelistedTokensForAA")] async fn whitelisted_tokens_for_aa(&self) -> RpcResult>; diff --git a/core/node/api_server/Cargo.toml b/core/node/api_server/Cargo.toml index 2a09ce5d176c..f7d40210b485 100644 --- a/core/node/api_server/Cargo.toml +++ b/core/node/api_server/Cargo.toml @@ -12,6 +12,7 @@ categories.workspace = true [dependencies] zksync_config.workspace = true +zksync_consensus_roles.workspace = true zksync_contracts.workspace = true zksync_types.workspace = true zksync_dal.workspace = true diff --git a/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/en.rs b/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/en.rs index ef5c6ee40dd9..35acf48ed046 100644 --- a/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/en.rs +++ b/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/en.rs @@ -25,6 +25,12 @@ impl EnNamespaceServer for EnNamespace { .map_err(|err| self.current_method().map_err(err)) } + async fn simulated_l1_status(&self) -> RpcResult { + self.simulated_l1_status_impl() + .await + .map_err(|err| self.current_method().map_err(err)) + } + async fn sync_tokens(&self, block_number: Option) -> RpcResult> { self.sync_tokens_impl(block_number) .await diff --git a/core/node/api_server/src/web3/mod.rs b/core/node/api_server/src/web3/mod.rs index bad1b493a5fd..fc735bd9cb18 100644 --- a/core/node/api_server/src/web3/mod.rs +++ b/core/node/api_server/src/web3/mod.rs @@ -405,8 +405,10 @@ impl ApiServer { .context("cannot merge zks namespace")?; } if namespaces.contains(&Namespace::En) { - rpc.merge(EnNamespace::new(rpc_state.clone()).into_rpc()) - .context("cannot merge en namespace")?; + let n = EnNamespace::new(rpc_state.clone()) + .await + .context("EnNamespace:::new()")?; + rpc.merge(n.into_rpc()).context("cannot merge en namespace")?; } if namespaces.contains(&Namespace::Snapshots) { rpc.merge(SnapshotsNamespace::new(rpc_state.clone()).into_rpc()) diff --git a/core/node/api_server/src/web3/namespaces/en.rs b/core/node/api_server/src/web3/namespaces/en.rs index 2661d9d81bf7..4656bdb71349 100644 --- a/core/node/api_server/src/web3/namespaces/en.rs +++ b/core/node/api_server/src/web3/namespaces/en.rs @@ -5,6 +5,7 @@ use zksync_types::{ api::en, protocol_version::ProtocolSemanticVersion, tokens::TokenInfo, Address, L1BatchNumber, L2BlockNumber, }; +use zksync_consensus_roles::attester; use zksync_web3_decl::error::Web3Error; use crate::web3::{backend_jsonrpsee::MethodTracer, state::RpcState}; @@ -14,16 +15,37 @@ use crate::web3::{backend_jsonrpsee::MethodTracer, state::RpcState}; #[derive(Debug)] pub(crate) struct EnNamespace { state: RpcState, + /// First batch to commit to L1 simulated by the main node. + /// This is temporary and used only for testing L1 batch signing by consensus attesters. + first_batch_to_commit: L1BatchNumber, +} + +fn to_l1_batch_number(n :attester::BatchNumber) -> anyhow::Result { + Ok(L1BatchNumber(n.0.try_into().context("L1BatchNumber overflow")?)) } impl EnNamespace { - pub fn new(state: RpcState) -> Self { - Self { state } + pub async fn new(state: RpcState) -> anyhow::Result { + let first_batch_to_commit = async { + let mut conn = state.acquire_connection().await.context("connection()")?; + // Try to continue from where we left. + if let Some(last) = conn.consensus_dal().get_last_batch_certificate_number().await.context("get_last_batch_certificate_number()")? { + return to_l1_batch_number(last+1); + } + // Otherwise start with the next sealed L1 batch. + if let Some(sealed) = conn.blocks_dal().get_sealed_l1_batch_number().await.context("get_sealed_l1_batch_number()")? { + return Ok(sealed+1); + } + // Otherwise start from the first non-pruned batch. + let info = conn.pruning_dal().get_pruning_info().await.context("get_pruning_info()")?; + Ok(info.last_soft_pruned_l1_batch.map(|n|n+1).unwrap_or(L1BatchNumber(0))) + }.await?; + Ok(Self { state, first_batch_to_commit }) } pub async fn consensus_genesis_impl(&self) -> Result, Web3Error> { - let mut storage = self.state.acquire_connection().await?; - let Some(genesis) = storage + let mut conn = self.state.acquire_connection().await?; + let Some(genesis) = conn .consensus_dal() .genesis() .await @@ -36,9 +58,23 @@ impl EnNamespace { ))) } + #[tracing::instrument(skip(self))] + pub async fn simulated_l1_status_impl(&self) -> Result { + let mut conn = self.state.acquire_connection().await?; + let next_batch_to_commit = match conn.consensus_dal().get_last_batch_certificate_number().await + .map_err(DalError::generalize)? + { + Some(n) => to_l1_batch_number(n)?, + None => self.first_batch_to_commit, + }; + Ok(en::SimulatedL1Status { + next_batch_to_commit, + }) + } + pub(crate) fn current_method(&self) -> &MethodTracer { &self.state.current_method - } + } pub async fn sync_l2_block_impl( &self, diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 5506ec6ee8f4..e7dd0d212625 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -11,6 +11,7 @@ use zksync_consensus_roles::{ }; use zksync_consensus_storage::BlockStore; use zksync_types::{L1BatchNumber, ProtocolVersionId}; +use zksync_node_sync::{MainNodeClient}; use crate::{ mn::run_main_node, @@ -661,6 +662,37 @@ async fn test_centralized_fetcher(from_snapshot: bool, version: ProtocolVersionI .unwrap(); } +#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] +#[tokio::test] +async fn test_simulated_l1_status_api(from_snapshot :bool, version: ProtocolVersionId) { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + //let rng = &mut ctx.rng(); + + scope::run!(ctx, |ctx, s| async { + let validator_pool = ConnectionPool::test(from_snapshot, version).await; + let (mut validator, runner) = + testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("validator"))); + + // API server needs at least 1 L1 batch to start. + validator.seal_batch().await; + let conn = validator.connect(ctx).await?; + + // If the main node has no L1 batch certificates, + // the first one to sign should be `last_sealed_batch + 1`. + let status = conn.fetch_simulated_l1_status().await?; + assert_eq!(status.next_batch_to_commit, validator.last_sealed_batch()+1); + + // TODO: sign some batch then check again + Ok(()) + }) + .await + .unwrap(); +} + + + /// Tests that generated L1 batch witnesses can be verified successfully. /// TODO: add tests for verification failures. #[test_casing(2, VERSIONS)] diff --git a/core/node/node_sync/src/client.rs b/core/node/node_sync/src/client.rs index 3d71d86f163b..d1c4cdba7e43 100644 --- a/core/node/node_sync/src/client.rs +++ b/core/node/node_sync/src/client.rs @@ -45,6 +45,8 @@ pub trait MainNodeClient: 'static + Send + Sync + fmt::Debug { async fn fetch_consensus_genesis(&self) -> EnrichedClientResult>; async fn fetch_genesis_config(&self) -> EnrichedClientResult; + + async fn fetch_simulated_l1_status(&self) -> EnrichedClientResult; } #[async_trait] @@ -136,6 +138,12 @@ impl MainNodeClient for Box> { .rpc_context("consensus_genesis") .await } + + async fn fetch_simulated_l1_status(&self) -> EnrichedClientResult { + self.simulated_l1_status() + .rpc_context("simulated_l1_status") + .await + } } /// Main node health check. diff --git a/core/node/node_sync/src/testonly.rs b/core/node/node_sync/src/testonly.rs index 502da17ffd98..ad8c49eac148 100644 --- a/core/node/node_sync/src/testonly.rs +++ b/core/node/node_sync/src/testonly.rs @@ -77,6 +77,10 @@ impl MainNodeClient for MockMainNodeClient { unimplemented!() } + async fn fetch_simulated_l1_status(&self) -> EnrichedClientResult { + unimplemented!() + } + async fn fetch_genesis_config(&self) -> EnrichedClientResult { Ok(mock_genesis_config()) } From 3953ffd838dc644371aebbf7f860809a4f33ecf1 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 24 Jul 2024 17:02:10 +0200 Subject: [PATCH 02/11] implemented --- ...26306b02e328d7b1b69c495443bd2ca7f7510.json | 15 ----- core/lib/dal/src/consensus_dal.rs | 36 ++++-------- core/node/api_server/src/web3/mod.rs | 3 +- .../node/api_server/src/web3/namespaces/en.rs | 58 ++++++++++++++----- core/node/consensus/src/storage/connection.rs | 51 +++++++++------- core/node/consensus/src/storage/mod.rs | 9 +++ core/node/consensus/src/storage/store.rs | 22 ++----- core/node/consensus/src/tests.rs | 49 ++++++++++++---- 8 files changed, 138 insertions(+), 105 deletions(-) delete mode 100644 core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json diff --git a/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json b/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json deleted file mode 100644 index a42fbe98ff2f..000000000000 --- a/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at)\n VALUES\n ($1, $2, NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO NOTHING\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Jsonb" - ] - }, - "nullable": [] - }, - "hash": "d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510" -} diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 6b8ec58c2ef5..3e93b9dab6dd 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -1,5 +1,4 @@ use anyhow::Context as _; -use bigdecimal::Zero; use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{BlockStoreState, ReplicaState}; use zksync_db_connection::{ @@ -378,9 +377,7 @@ impl ConsensusDal<'_, '_> { ) -> Result<(), InsertCertificateError> { use InsertCertificateError as E; let header = &cert.message.proposal; - let mut txn = self.storage.start_transaction().await?; - let want_payload = txn - .consensus_dal() + let want_payload = self .block_payload(cert.message.proposal.number) .await? .ok_or(E::MissingPayload)?; @@ -399,9 +396,8 @@ impl ConsensusDal<'_, '_> { ) .instrument("insert_block_certificate") .report_latency() - .execute(&mut txn) + .execute(self.storage) .await?; - txn.commit().await.context("commit")?; Ok(()) } @@ -410,32 +406,23 @@ impl ConsensusDal<'_, '_> { /// Insertion is allowed even if it creates gaps in the L1 batch history. /// /// This method assumes that all payload validation has been carried out by the caller. - pub async fn insert_batch_certificate( - &mut self, - cert: &attester::BatchQC, - ) -> Result<(), InsertCertificateError> { - let l1_batch_number = cert.message.number.0 as i64; - - let res = sqlx::query!( + /// Verification cannot be performed internally, due to circular dependency on + /// `zksync_l1_contract_interface`. + pub async fn insert_batch_certificate(&mut self, cert: &attester::BatchQC) -> DalResult<()> { + sqlx::query!( r#" INSERT INTO l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at) VALUES ($1, $2, NOW(), NOW()) - ON CONFLICT (l1_batch_number) DO NOTHING "#, - l1_batch_number, + cert.message.number.0 as i64, zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), ) .instrument("insert_batch_certificate") .report_latency() .execute(self.storage) .await?; - - if res.rows_affected().is_zero() { - tracing::debug!(l1_batch_number, "duplicate batch certificate"); - } - Ok(()) } @@ -443,7 +430,7 @@ impl ConsensusDal<'_, '_> { /// depending on the order in which votes have been collected over gossip by consensus. pub async fn get_last_batch_certificate_number( &mut self, - ) -> DalResult> { + ) -> anyhow::Result> { let row = sqlx::query!( r#" SELECT @@ -457,9 +444,10 @@ impl ConsensusDal<'_, '_> { .fetch_one(self.storage) .await?; - Ok(row - .number - .map(|number| attester::BatchNumber(number.into()))) + let Some(n) = row.number else { return Ok(None) }; + Ok(Some(attester::BatchNumber( + n.try_into().context("overflow")?, + ))) } } diff --git a/core/node/api_server/src/web3/mod.rs b/core/node/api_server/src/web3/mod.rs index fc735bd9cb18..39ad335b382b 100644 --- a/core/node/api_server/src/web3/mod.rs +++ b/core/node/api_server/src/web3/mod.rs @@ -408,7 +408,8 @@ impl ApiServer { let n = EnNamespace::new(rpc_state.clone()) .await .context("EnNamespace:::new()")?; - rpc.merge(n.into_rpc()).context("cannot merge en namespace")?; + rpc.merge(n.into_rpc()) + .context("cannot merge en namespace")?; } if namespaces.contains(&Namespace::Snapshots) { rpc.merge(SnapshotsNamespace::new(rpc_state.clone()).into_rpc()) diff --git a/core/node/api_server/src/web3/namespaces/en.rs b/core/node/api_server/src/web3/namespaces/en.rs index 4656bdb71349..d0dceb4490cf 100644 --- a/core/node/api_server/src/web3/namespaces/en.rs +++ b/core/node/api_server/src/web3/namespaces/en.rs @@ -1,11 +1,11 @@ use anyhow::Context as _; use zksync_config::{configs::EcosystemContracts, GenesisConfig}; +use zksync_consensus_roles::attester; use zksync_dal::{CoreDal, DalError}; use zksync_types::{ api::en, protocol_version::ProtocolSemanticVersion, tokens::TokenInfo, Address, L1BatchNumber, L2BlockNumber, }; -use zksync_consensus_roles::attester; use zksync_web3_decl::error::Web3Error; use crate::web3::{backend_jsonrpsee::MethodTracer, state::RpcState}; @@ -20,8 +20,10 @@ pub(crate) struct EnNamespace { first_batch_to_commit: L1BatchNumber, } -fn to_l1_batch_number(n :attester::BatchNumber) -> anyhow::Result { - Ok(L1BatchNumber(n.0.try_into().context("L1BatchNumber overflow")?)) +fn to_l1_batch_number(n: attester::BatchNumber) -> anyhow::Result { + Ok(L1BatchNumber( + n.0.try_into().context("L1BatchNumber overflow")?, + )) } impl EnNamespace { @@ -29,18 +31,39 @@ impl EnNamespace { let first_batch_to_commit = async { let mut conn = state.acquire_connection().await.context("connection()")?; // Try to continue from where we left. - if let Some(last) = conn.consensus_dal().get_last_batch_certificate_number().await.context("get_last_batch_certificate_number()")? { - return to_l1_batch_number(last+1); + if let Some(last) = conn + .consensus_dal() + .get_last_batch_certificate_number() + .await + .context("get_last_batch_certificate_number()")? + { + return to_l1_batch_number(last + 1); } // Otherwise start with the next sealed L1 batch. - if let Some(sealed) = conn.blocks_dal().get_sealed_l1_batch_number().await.context("get_sealed_l1_batch_number()")? { - return Ok(sealed+1); + if let Some(sealed) = conn + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .context("get_sealed_l1_batch_number()")? + { + return Ok(sealed + 1); } - // Otherwise start from the first non-pruned batch. - let info = conn.pruning_dal().get_pruning_info().await.context("get_pruning_info()")?; - Ok(info.last_soft_pruned_l1_batch.map(|n|n+1).unwrap_or(L1BatchNumber(0))) - }.await?; - Ok(Self { state, first_batch_to_commit }) + // Otherwise start from the first non-pruned batch. + let info = conn + .pruning_dal() + .get_pruning_info() + .await + .context("get_pruning_info()")?; + Ok(info + .last_soft_pruned_l1_batch + .map(|n| n + 1) + .unwrap_or(L1BatchNumber(0))) + } + .await?; + Ok(Self { + state, + first_batch_to_commit, + }) } pub async fn consensus_genesis_impl(&self) -> Result, Web3Error> { @@ -61,10 +84,13 @@ impl EnNamespace { #[tracing::instrument(skip(self))] pub async fn simulated_l1_status_impl(&self) -> Result { let mut conn = self.state.acquire_connection().await?; - let next_batch_to_commit = match conn.consensus_dal().get_last_batch_certificate_number().await - .map_err(DalError::generalize)? + let next_batch_to_commit = match conn + .consensus_dal() + .get_last_batch_certificate_number() + .await + .context("get_last_batch_certificate_number()")? { - Some(n) => to_l1_batch_number(n)?, + Some(n) => to_l1_batch_number(n + 1)?, None => self.first_batch_to_commit, }; Ok(en::SimulatedL1Status { @@ -74,7 +100,7 @@ impl EnNamespace { pub(crate) fn current_method(&self) -> &MethodTracer { &self.state.current_method - } + } pub async fn sync_l2_block_impl( &self, diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 5d76934d7005..6d9fb6073640 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -1,8 +1,9 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, time}; +use zksync_consensus_crypto::keccak256::Keccak256; use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{self as storage, BatchStoreState}; -use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError}; +use zksync_dal::{consensus_dal, consensus_dal::Payload, Core, CoreDal, DalError}; use zksync_l1_contract_interface::i_executor::structures::StoredBatchInfo; use zksync_node_sync::{fetcher::IoCursorExt as _, ActionQueueSender, SyncState}; use zksync_state_keeper::io::common::IoCursor; @@ -115,35 +116,26 @@ impl<'a> Connection<'a> { .await??) } - /// Wrapper for `consensus_dal().insert_batch_certificate()`. + /// Wrapper for `consensus_dal().insert_batch_certificate()`, + /// which additionally verifies that the batch hash matches the stored batch. pub async fn insert_batch_certificate( &mut self, ctx: &ctx::Ctx, cert: &attester::BatchQC, ) -> Result<(), InsertCertificateError> { - use crate::storage::consensus_dal::InsertCertificateError as E; - - let l1_batch_number = L1BatchNumber(cert.message.number.0 as u32); - - let Some(l1_batch) = self - .0 - .blocks_dal() - .get_l1_batch_metadata(l1_batch_number) + use consensus_dal::InsertCertificateError as E; + let want_hash = self + .batch_hash(ctx, cert.message.number) .await - .map_err(E::Dal)? - else { - return Err(E::MissingPayload.into()); - }; - - let l1_batch_info = StoredBatchInfo::from(&l1_batch); - - if l1_batch_info.hash().0 != *cert.message.hash.0.as_bytes() { + .wrap("batch_hash()")? + .ok_or(E::MissingPayload)?; + if want_hash != cert.message.hash { return Err(E::PayloadMismatch.into()); } - Ok(ctx .wait(self.0.consensus_dal().insert_batch_certificate(cert)) - .await??) + .await? + .map_err(E::Dal)?) } /// Wrapper for `consensus_dal().replica_state()`. @@ -166,6 +158,25 @@ impl<'a> Connection<'a> { .context("sqlx")?) } + /// Wrapper for `consensus_dal().batch_hash()`. + pub async fn batch_hash( + &mut self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result> { + let n = L1BatchNumber(number.0.try_into().context("overflow")?); + let Some(meta) = ctx + .wait(self.0.blocks_dal().get_l1_batch_metadata(n)) + .await? + .context("sqlx")? + else { + return Ok(None); + }; + Ok(Some(attester::BatchHash(Keccak256::from_bytes( + StoredBatchInfo::from(&meta).hash().0, + )))) + } + /// Wrapper for `blocks_dal().get_l1_batch_metadata()`. pub async fn batch( &mut self, diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index 6660f75332bc..424002603c7c 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -27,6 +27,15 @@ pub enum InsertCertificateError { Inner(#[from] consensus_dal::InsertCertificateError), } +impl From for InsertCertificateError { + fn from(err: ctx::Error) -> Self { + match err { + ctx::Error::Canceled(err) => Self::Canceled(err), + ctx::Error::Internal(err) => Self::Inner(err.into()), + } + } +} + #[derive(Debug)] pub(crate) struct PayloadQueue { inner: IoCursor, diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index ad8f4948831b..b051542d29f4 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -3,13 +3,11 @@ use std::sync::Arc; use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_bft::PayloadManager; -use zksync_consensus_crypto::keccak256::Keccak256; use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{self as storage, BatchStoreState}; use zksync_dal::consensus_dal::{self, Payload}; -use zksync_l1_contract_interface::i_executor::structures::StoredBatchInfo; use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction}; -use zksync_types::{L1BatchNumber, L2BlockNumber}; +use zksync_types::L2BlockNumber; use super::{Connection, PayloadQueue}; use crate::storage::{ConnectionPool, InsertCertificateError}; @@ -524,26 +522,16 @@ impl storage::PersistentBatchStore for Store { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::Result> { - let Some(batch) = self + let Some(hash) = self .conn(ctx) .await? - .batch( - ctx, - L1BatchNumber(u32::try_from(number.0).context("number")?), - ) + .batch_hash(ctx, number) .await - .wrap("batch")? + .wrap("batch_hash()")? else { return Ok(None); }; - - let info = StoredBatchInfo::from(&batch); - let hash = Keccak256::from_bytes(info.hash().0); - - Ok(Some(attester::Batch { - number, - hash: attester::BatchHash(hash), - })) + Ok(Some(attester::Batch { number, hash })) } /// Returns the QC of the batch with the given number. diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index e7dd0d212625..bcccb2060247 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -6,12 +6,12 @@ use zksync_config::configs::consensus::{ValidatorPublicKey, WeightedValidator}; use zksync_consensus_crypto::TextFmt as _; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; use zksync_consensus_roles::{ - validator, + attester, validator, validator::testonly::{Setup, SetupSpec}, }; use zksync_consensus_storage::BlockStore; +use zksync_node_sync::MainNodeClient; use zksync_types::{L1BatchNumber, ProtocolVersionId}; -use zksync_node_sync::{MainNodeClient}; use crate::{ mn::run_main_node, @@ -664,35 +664,60 @@ async fn test_centralized_fetcher(from_snapshot: bool, version: ProtocolVersionI #[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] #[tokio::test] -async fn test_simulated_l1_status_api(from_snapshot :bool, version: ProtocolVersionId) { +async fn test_simulated_l1_status_api(from_snapshot: bool, version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - //let rng = &mut ctx.rng(); + let rng = &mut ctx.rng(); scope::run!(ctx, |ctx, s| async { let validator_pool = ConnectionPool::test(from_snapshot, version).await; let (mut validator, runner) = testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("validator"))); - + // API server needs at least 1 L1 batch to start. validator.seal_batch().await; - let conn = validator.connect(ctx).await?; + let api = validator.connect(ctx).await?; // If the main node has no L1 batch certificates, // the first one to sign should be `last_sealed_batch + 1`. - let status = conn.fetch_simulated_l1_status().await?; - assert_eq!(status.next_batch_to_commit, validator.last_sealed_batch()+1); - - // TODO: sign some batch then check again + let status = api.fetch_simulated_l1_status().await?; + assert_eq!( + status.next_batch_to_commit, + validator.last_sealed_batch() + 1 + ); + + // Insert a cert, then check again. + validator.push_random_block(rng).await; + validator.seal_batch().await; + validator_pool + .wait_for_batch(ctx, status.next_batch_to_commit) + .await?; + { + let mut conn = validator_pool.connection(ctx).await?; + let number = attester::BatchNumber(status.next_batch_to_commit.0.try_into().unwrap()); + let hash = conn.batch_hash(ctx, number).await?.unwrap(); + let cert = attester::BatchQC { + signatures: attester::MultiSig::default(), + message: attester::Batch { number, hash }, + }; + conn.insert_batch_certificate(ctx, &cert) + .await + .context("insert_batch_certificate()")?; + } + let want = status.next_batch_to_commit + 1; + let got = api + .fetch_simulated_l1_status() + .await + .context("fetch_simulated_l1_status()")?; + assert_eq!(want, got.next_batch_to_commit); + Ok(()) }) .await .unwrap(); } - - /// Tests that generated L1 batch witnesses can be verified successfully. /// TODO: add tests for verification failures. #[test_casing(2, VERSIONS)] From 46cfdd8f7f2d8003a91355ee4c76ab6836d3c0d7 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 24 Jul 2024 17:14:34 +0200 Subject: [PATCH 03/11] missing file --- ...dbb354620aa5098683cf395827f30fd3cc2cd79f5.json | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 core/lib/dal/.sqlx/query-b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5.json diff --git a/core/lib/dal/.sqlx/query-b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5.json b/core/lib/dal/.sqlx/query-b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5.json new file mode 100644 index 000000000000..a5b5f5be8839 --- /dev/null +++ b/core/lib/dal/.sqlx/query-b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at)\n VALUES\n ($1, $2, NOW(), NOW())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5" +} From ab24bcfebdb56d5886a9f07c71c9fb324e866421 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 25 Jul 2024 10:05:37 +0200 Subject: [PATCH 04/11] applied comments --- ...9426306b02e328d7b1b69c495443bd2ca7f7510.json} | 4 ++-- core/lib/dal/src/consensus_dal.rs | 14 ++++++++------ core/lib/web3_decl/src/namespaces/en.rs | 2 +- core/node/api_server/src/web3/namespaces/en.rs | 16 ++++++---------- core/node/consensus/src/storage/connection.rs | 2 +- core/node/consensus/src/tests.rs | 8 +++++++- 6 files changed, 25 insertions(+), 21 deletions(-) rename core/lib/dal/.sqlx/{query-b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5.json => query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json} (71%) diff --git a/core/lib/dal/.sqlx/query-b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5.json b/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json similarity index 71% rename from core/lib/dal/.sqlx/query-b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5.json rename to core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json index a5b5f5be8839..a42fbe98ff2f 100644 --- a/core/lib/dal/.sqlx/query-b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5.json +++ b/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at)\n VALUES\n ($1, $2, NOW(), NOW())\n ", + "query": "\n INSERT INTO\n l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at)\n VALUES\n ($1, $2, NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO NOTHING\n ", "describe": { "columns": [], "parameters": { @@ -11,5 +11,5 @@ }, "nullable": [] }, - "hash": "b4debe6e01575c7cc56c7fadbb354620aa5098683cf395827f30fd3cc2cd79f5" + "hash": "d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510" } diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 3e93b9dab6dd..bc2b8968dc64 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -1,4 +1,5 @@ use anyhow::Context as _; +use bigdecimal::Zero as _; use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{BlockStoreState, ReplicaState}; use zksync_db_connection::{ @@ -402,19 +403,17 @@ impl ConsensusDal<'_, '_> { } /// Inserts a certificate for the L1 batch. - /// - /// Insertion is allowed even if it creates gaps in the L1 batch history. - /// - /// This method assumes that all payload validation has been carried out by the caller. - /// Verification cannot be performed internally, due to circular dependency on + /// Noop if a certificate for the same L1 batch is already present. + /// No verification is performed - it cannot be performed due to circular dependency on /// `zksync_l1_contract_interface`. pub async fn insert_batch_certificate(&mut self, cert: &attester::BatchQC) -> DalResult<()> { - sqlx::query!( + let res = sqlx::query!( r#" INSERT INTO l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at) VALUES ($1, $2, NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO NOTHING "#, cert.message.number.0 as i64, zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), @@ -423,6 +422,9 @@ impl ConsensusDal<'_, '_> { .report_latency() .execute(self.storage) .await?; + if res.rows_affected().is_zero() { + tracing::debug!(l1_batch_number = ?cert.message.number, "duplicate batch certificate"); + } Ok(()) } diff --git a/core/lib/web3_decl/src/namespaces/en.rs b/core/lib/web3_decl/src/namespaces/en.rs index 45fc26a10021..e1591ca5d107 100644 --- a/core/lib/web3_decl/src/namespaces/en.rs +++ b/core/lib/web3_decl/src/namespaces/en.rs @@ -39,7 +39,7 @@ pub trait EnNamespace { /// Gets the status of L1 simulated by the main node. /// This is a temporary RPC used for testing L1 batch signing /// by consensus attesters. - #[method(name = "simulated_l1_status")] + #[method(name = "simulatedL1Status")] async fn simulated_l1_status(&self) -> RpcResult; /// Get tokens that are white-listed and it can be used by paymasters. diff --git a/core/node/api_server/src/web3/namespaces/en.rs b/core/node/api_server/src/web3/namespaces/en.rs index d0dceb4490cf..f56548f80707 100644 --- a/core/node/api_server/src/web3/namespaces/en.rs +++ b/core/node/api_server/src/web3/namespaces/en.rs @@ -48,16 +48,12 @@ impl EnNamespace { { return Ok(sealed + 1); } - // Otherwise start from the first non-pruned batch. - let info = conn - .pruning_dal() - .get_pruning_info() - .await - .context("get_pruning_info()")?; - Ok(info - .last_soft_pruned_l1_batch - .map(|n| n + 1) - .unwrap_or(L1BatchNumber(0))) + // Otherwise start with 0. + // Note that the `simulated_l1_status()` RPC is + // served only by the main node, therefore we + // don't have to care about pruning (there is no + // pruning on the main node). + Ok(L1BatchNumber(0)) } .await?; Ok(Self { diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 6d9fb6073640..e0766f269c42 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -168,7 +168,7 @@ impl<'a> Connection<'a> { let Some(meta) = ctx .wait(self.0.blocks_dal().get_l1_batch_metadata(n)) .await? - .context("sqlx")? + .context("get_l1_batch_metadata()")? else { return Ok(None); }; diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index bcccb2060247..343b90b41302 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -687,9 +687,15 @@ async fn test_simulated_l1_status_api(from_snapshot: bool, version: ProtocolVers validator.last_sealed_batch() + 1 ); - // Insert a cert, then check again. + // Insert next batch, the response shouldn't change. validator.push_random_block(rng).await; validator.seal_batch().await; + assert_eq!( + status.next_batch_to_commit, + api.fetch_simulated_l1_status().await?.next_batch_to_commit, + ); + + // Insert a cert, then check again. validator_pool .wait_for_batch(ctx, status.next_batch_to_commit) .await?; From e20bf435b8f215a14891c863a75c26b79060a059 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 25 Jul 2024 10:16:00 +0200 Subject: [PATCH 05/11] more lines --- core/lib/dal/src/consensus_dal.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index bc2b8968dc64..ed16e22ada64 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -446,7 +446,9 @@ impl ConsensusDal<'_, '_> { .fetch_one(self.storage) .await?; - let Some(n) = row.number else { return Ok(None) }; + let Some(n) = row.number else { + return Ok(None); + }; Ok(Some(attester::BatchNumber( n.try_into().context("overflow")?, ))) From 4f3be9e1979e2e642af91cede3705aac91f1ae17 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 25 Jul 2024 10:33:48 +0200 Subject: [PATCH 06/11] try_from --- core/lib/dal/src/consensus_dal.rs | 10 +++++++--- core/node/consensus/src/storage/connection.rs | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index ed16e22ada64..0522afd38b55 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -392,7 +392,7 @@ impl ConsensusDal<'_, '_> { VALUES ($1, $2) "#, - header.number.0 as i64, + i64::try_from(header.number.0).context("overflow")?, zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), ) .instrument("insert_block_certificate") @@ -406,7 +406,10 @@ impl ConsensusDal<'_, '_> { /// Noop if a certificate for the same L1 batch is already present. /// No verification is performed - it cannot be performed due to circular dependency on /// `zksync_l1_contract_interface`. - pub async fn insert_batch_certificate(&mut self, cert: &attester::BatchQC) -> DalResult<()> { + pub async fn insert_batch_certificate( + &mut self, + cert: &attester::BatchQC, + ) -> anyhow::Result<()> { let res = sqlx::query!( r#" INSERT INTO @@ -415,7 +418,8 @@ impl ConsensusDal<'_, '_> { ($1, $2, NOW(), NOW()) ON CONFLICT (l1_batch_number) DO NOTHING "#, - cert.message.number.0 as i64, + i64::try_from(cert.message.number.0).context("overflow")?, + // Unwrap is ok, because serialization should always succeed. zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), ) .instrument("insert_batch_certificate") diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index e0766f269c42..1333eb16da3c 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -135,7 +135,7 @@ impl<'a> Connection<'a> { Ok(ctx .wait(self.0.consensus_dal().insert_batch_certificate(cert)) .await? - .map_err(E::Dal)?) + .map_err(E::Other)?) } /// Wrapper for `consensus_dal().replica_state()`. From 3f2abf70b243e8a6f6a855d626f044ea910e1023 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 25 Jul 2024 10:57:54 +0200 Subject: [PATCH 07/11] applied comments --- core/lib/types/src/api/en.rs | 6 ++--- core/lib/web3_decl/src/namespaces/en.rs | 8 +++--- .../web3/backend_jsonrpsee/namespaces/en.rs | 4 +-- .../node/api_server/src/web3/namespaces/en.rs | 27 ++++++++++--------- core/node/consensus/src/tests.rs | 26 +++++++++--------- core/node/node_sync/src/client.rs | 8 +++--- core/node/node_sync/src/testonly.rs | 2 +- 7 files changed, 41 insertions(+), 40 deletions(-) diff --git a/core/lib/types/src/api/en.rs b/core/lib/types/src/api/en.rs index f7e48461a87a..75de25ad80b2 100644 --- a/core/lib/types/src/api/en.rs +++ b/core/lib/types/src/api/en.rs @@ -47,9 +47,9 @@ pub struct SyncBlock { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ConsensusGenesis(pub serde_json::Value); -/// Status of L1 simulated by the main node. +/// AttestationStatus maintained by the main node. /// Used for testing L1 batch signing by consensus attesters. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SimulatedL1Status { - pub next_batch_to_commit: L1BatchNumber, +pub struct AttestationStatus { + pub next_batch_to_attest: L1BatchNumber, } diff --git a/core/lib/web3_decl/src/namespaces/en.rs b/core/lib/web3_decl/src/namespaces/en.rs index e1591ca5d107..0a4c8acb4c60 100644 --- a/core/lib/web3_decl/src/namespaces/en.rs +++ b/core/lib/web3_decl/src/namespaces/en.rs @@ -35,12 +35,12 @@ pub trait EnNamespace { #[method(name = "genesisConfig")] async fn genesis_config(&self) -> RpcResult; - /// INTERNAL RPC: - /// Gets the status of L1 simulated by the main node. + /// MAIN NODE ONLY: + /// Gets the AttestationStatus of L1 batches. /// This is a temporary RPC used for testing L1 batch signing /// by consensus attesters. - #[method(name = "simulatedL1Status")] - async fn simulated_l1_status(&self) -> RpcResult; + #[method(name = "attestationStatus")] + async fn attestation_status(&self) -> RpcResult; /// Get tokens that are white-listed and it can be used by paymasters. #[method(name = "whitelistedTokensForAA")] diff --git a/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/en.rs b/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/en.rs index 35acf48ed046..625d774465e5 100644 --- a/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/en.rs +++ b/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/en.rs @@ -25,8 +25,8 @@ impl EnNamespaceServer for EnNamespace { .map_err(|err| self.current_method().map_err(err)) } - async fn simulated_l1_status(&self) -> RpcResult { - self.simulated_l1_status_impl() + async fn attestation_status(&self) -> RpcResult { + self.attestation_status_impl() .await .map_err(|err| self.current_method().map_err(err)) } diff --git a/core/node/api_server/src/web3/namespaces/en.rs b/core/node/api_server/src/web3/namespaces/en.rs index f56548f80707..0958240a9a02 100644 --- a/core/node/api_server/src/web3/namespaces/en.rs +++ b/core/node/api_server/src/web3/namespaces/en.rs @@ -78,19 +78,20 @@ impl EnNamespace { } #[tracing::instrument(skip(self))] - pub async fn simulated_l1_status_impl(&self) -> Result { - let mut conn = self.state.acquire_connection().await?; - let next_batch_to_commit = match conn - .consensus_dal() - .get_last_batch_certificate_number() - .await - .context("get_last_batch_certificate_number()")? - { - Some(n) => to_l1_batch_number(n + 1)?, - None => self.first_batch_to_commit, - }; - Ok(en::SimulatedL1Status { - next_batch_to_commit, + pub async fn attestation_status_impl(&self) -> Result { + Ok(en::AttestationStatus { + next_batch_to_attest: match self + .state + .acquire_connection() + .await? + .consensus_dal() + .get_last_batch_certificate_number() + .await + .context("get_last_batch_certificate_number()")? + { + Some(n) => to_l1_batch_number(n + 1)?, + None => self.first_batch_to_commit, + }, }) } diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 343b90b41302..787b34f73727 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -662,15 +662,15 @@ async fn test_centralized_fetcher(from_snapshot: bool, version: ProtocolVersionI .unwrap(); } -#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] +#[test_casing(2, VERSIONS)] #[tokio::test] -async fn test_simulated_l1_status_api(from_snapshot: bool, version: ProtocolVersionId) { +async fn test_attestation_status_api(version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); scope::run!(ctx, |ctx, s| async { - let validator_pool = ConnectionPool::test(from_snapshot, version).await; + let validator_pool = ConnectionPool::test(false, version).await; let (mut validator, runner) = testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("validator"))); @@ -681,9 +681,9 @@ async fn test_simulated_l1_status_api(from_snapshot: bool, version: ProtocolVers // If the main node has no L1 batch certificates, // the first one to sign should be `last_sealed_batch + 1`. - let status = api.fetch_simulated_l1_status().await?; + let status = api.fetch_attestation_status().await?; assert_eq!( - status.next_batch_to_commit, + status.next_batch_to_attest, validator.last_sealed_batch() + 1 ); @@ -691,17 +691,17 @@ async fn test_simulated_l1_status_api(from_snapshot: bool, version: ProtocolVers validator.push_random_block(rng).await; validator.seal_batch().await; assert_eq!( - status.next_batch_to_commit, - api.fetch_simulated_l1_status().await?.next_batch_to_commit, + status.next_batch_to_attest, + api.fetch_attestation_status().await?.next_batch_to_attest, ); // Insert a cert, then check again. validator_pool - .wait_for_batch(ctx, status.next_batch_to_commit) + .wait_for_batch(ctx, status.next_batch_to_attest) .await?; { let mut conn = validator_pool.connection(ctx).await?; - let number = attester::BatchNumber(status.next_batch_to_commit.0.try_into().unwrap()); + let number = attester::BatchNumber(status.next_batch_to_attest.0.try_into().unwrap()); let hash = conn.batch_hash(ctx, number).await?.unwrap(); let cert = attester::BatchQC { signatures: attester::MultiSig::default(), @@ -711,12 +711,12 @@ async fn test_simulated_l1_status_api(from_snapshot: bool, version: ProtocolVers .await .context("insert_batch_certificate()")?; } - let want = status.next_batch_to_commit + 1; + let want = status.next_batch_to_attest + 1; let got = api - .fetch_simulated_l1_status() + .fetch_attestation_status() .await - .context("fetch_simulated_l1_status()")?; - assert_eq!(want, got.next_batch_to_commit); + .context("fetch_attestation_status()")?; + assert_eq!(want, got.next_batch_to_attest); Ok(()) }) diff --git a/core/node/node_sync/src/client.rs b/core/node/node_sync/src/client.rs index d1c4cdba7e43..c4aaa383bb0c 100644 --- a/core/node/node_sync/src/client.rs +++ b/core/node/node_sync/src/client.rs @@ -46,7 +46,7 @@ pub trait MainNodeClient: 'static + Send + Sync + fmt::Debug { async fn fetch_genesis_config(&self) -> EnrichedClientResult; - async fn fetch_simulated_l1_status(&self) -> EnrichedClientResult; + async fn fetch_attestation_status(&self) -> EnrichedClientResult; } #[async_trait] @@ -139,9 +139,9 @@ impl MainNodeClient for Box> { .await } - async fn fetch_simulated_l1_status(&self) -> EnrichedClientResult { - self.simulated_l1_status() - .rpc_context("simulated_l1_status") + async fn fetch_attestation_status(&self) -> EnrichedClientResult { + self.attestation_status() + .rpc_context("attestation_status") .await } } diff --git a/core/node/node_sync/src/testonly.rs b/core/node/node_sync/src/testonly.rs index ad8c49eac148..677f548c6281 100644 --- a/core/node/node_sync/src/testonly.rs +++ b/core/node/node_sync/src/testonly.rs @@ -77,7 +77,7 @@ impl MainNodeClient for MockMainNodeClient { unimplemented!() } - async fn fetch_simulated_l1_status(&self) -> EnrichedClientResult { + async fn fetch_attestation_status(&self) -> EnrichedClientResult { unimplemented!() } From 8f8e64365d8c199d83d26afe465ff9bbba079f61 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 25 Jul 2024 11:21:27 +0200 Subject: [PATCH 08/11] comment --- core/node/api_server/src/web3/namespaces/en.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/node/api_server/src/web3/namespaces/en.rs b/core/node/api_server/src/web3/namespaces/en.rs index 0958240a9a02..92f0ee32d323 100644 --- a/core/node/api_server/src/web3/namespaces/en.rs +++ b/core/node/api_server/src/web3/namespaces/en.rs @@ -40,6 +40,8 @@ impl EnNamespace { return to_l1_batch_number(last + 1); } // Otherwise start with the next sealed L1 batch. + // NOTE: we may start at arbitrary point, + // choice of `sealed + 1` is arbitrary. if let Some(sealed) = conn .blocks_dal() .get_sealed_l1_batch_number() From 78ddbb2e1cb1f6ec4a23bfac651044780707b56d Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 25 Jul 2024 15:21:59 +0200 Subject: [PATCH 09/11] applied comments --- core/lib/dal/src/consensus_dal.rs | 34 +++++++++++ core/node/api_server/src/web3/mod.rs | 5 +- .../node/api_server/src/web3/namespaces/en.rs | 61 ++++--------------- core/node/consensus/src/tests.rs | 15 +---- 4 files changed, 48 insertions(+), 67 deletions(-) diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 0522afd38b55..87285266d58e 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -457,6 +457,40 @@ impl ConsensusDal<'_, '_> { n.try_into().context("overflow")?, ))) } + + /// Next batch that the attesters should vote for. + /// This is a main node only query. + /// ENs should call the attestation_status RPC of the main node. + pub async fn next_batch_to_attest(&mut self) -> anyhow::Result { + // First batch that we don't have a certificate for. + if let Some(last) = self + .get_last_batch_certificate_number() + .await + .context("get_last_batch_certificate_number()")? + { + return Ok(last + 1); + } + // Otherwise start with the last sealed L1 batch. + // We don't want to backfill certificates for old batches. + // Note that there is a race condition in case the next + // batch is sealed before the certificate for the current + // last sealed batch is stored. This is only relevant + // for the first certificate though and anyway this is + // a test setup, so we are OK with that race condition. + if let Some(sealed) = self + .storage + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .context("get_sealed_l1_batch_number()")? + { + return Ok(attester::BatchNumber(sealed.0.into())); + } + // Otherwise start with 0. + // Note that main node doesn't start from snapshot + // and doesn't have prunning enabled. + Ok(attester::BatchNumber(0)) + } } #[cfg(test)] diff --git a/core/node/api_server/src/web3/mod.rs b/core/node/api_server/src/web3/mod.rs index 39ad335b382b..bad1b493a5fd 100644 --- a/core/node/api_server/src/web3/mod.rs +++ b/core/node/api_server/src/web3/mod.rs @@ -405,10 +405,7 @@ impl ApiServer { .context("cannot merge zks namespace")?; } if namespaces.contains(&Namespace::En) { - let n = EnNamespace::new(rpc_state.clone()) - .await - .context("EnNamespace:::new()")?; - rpc.merge(n.into_rpc()) + rpc.merge(EnNamespace::new(rpc_state.clone()).into_rpc()) .context("cannot merge en namespace")?; } if namespaces.contains(&Namespace::Snapshots) { diff --git a/core/node/api_server/src/web3/namespaces/en.rs b/core/node/api_server/src/web3/namespaces/en.rs index 92f0ee32d323..5f635b527b9d 100644 --- a/core/node/api_server/src/web3/namespaces/en.rs +++ b/core/node/api_server/src/web3/namespaces/en.rs @@ -15,9 +15,6 @@ use crate::web3::{backend_jsonrpsee::MethodTracer, state::RpcState}; #[derive(Debug)] pub(crate) struct EnNamespace { state: RpcState, - /// First batch to commit to L1 simulated by the main node. - /// This is temporary and used only for testing L1 batch signing by consensus attesters. - first_batch_to_commit: L1BatchNumber, } fn to_l1_batch_number(n: attester::BatchNumber) -> anyhow::Result { @@ -27,41 +24,8 @@ fn to_l1_batch_number(n: attester::BatchNumber) -> anyhow::Result } impl EnNamespace { - pub async fn new(state: RpcState) -> anyhow::Result { - let first_batch_to_commit = async { - let mut conn = state.acquire_connection().await.context("connection()")?; - // Try to continue from where we left. - if let Some(last) = conn - .consensus_dal() - .get_last_batch_certificate_number() - .await - .context("get_last_batch_certificate_number()")? - { - return to_l1_batch_number(last + 1); - } - // Otherwise start with the next sealed L1 batch. - // NOTE: we may start at arbitrary point, - // choice of `sealed + 1` is arbitrary. - if let Some(sealed) = conn - .blocks_dal() - .get_sealed_l1_batch_number() - .await - .context("get_sealed_l1_batch_number()")? - { - return Ok(sealed + 1); - } - // Otherwise start with 0. - // Note that the `simulated_l1_status()` RPC is - // served only by the main node, therefore we - // don't have to care about pruning (there is no - // pruning on the main node). - Ok(L1BatchNumber(0)) - } - .await?; - Ok(Self { - state, - first_batch_to_commit, - }) + pub fn new(state: RpcState) -> Self { + Self { state } } pub async fn consensus_genesis_impl(&self) -> Result, Web3Error> { @@ -82,18 +46,15 @@ impl EnNamespace { #[tracing::instrument(skip(self))] pub async fn attestation_status_impl(&self) -> Result { Ok(en::AttestationStatus { - next_batch_to_attest: match self - .state - .acquire_connection() - .await? - .consensus_dal() - .get_last_batch_certificate_number() - .await - .context("get_last_batch_certificate_number()")? - { - Some(n) => to_l1_batch_number(n + 1)?, - None => self.first_batch_to_commit, - }, + next_batch_to_attest: to_l1_batch_number( + self.state + .acquire_connection() + .await? + .consensus_dal() + .next_batch_to_attest() + .await + .context("next_batch_to_attest()")?, + )?, }) } diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index a8c02dcdd74c..8235534f4114 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -688,20 +688,9 @@ async fn test_attestation_status_api(version: ProtocolVersionId) { let api = validator.connect(ctx).await?; // If the main node has no L1 batch certificates, - // the first one to sign should be `last_sealed_batch + 1`. + // the first one to sign should be `last_sealed_batch`. let status = api.fetch_attestation_status().await?; - assert_eq!( - status.next_batch_to_attest, - validator.last_sealed_batch() + 1 - ); - - // Insert next batch, the response shouldn't change. - validator.push_random_block(rng).await; - validator.seal_batch().await; - assert_eq!( - status.next_batch_to_attest, - api.fetch_attestation_status().await?.next_batch_to_attest, - ); + assert_eq!(status.next_batch_to_attest, validator.last_sealed_batch()); // Insert a cert, then check again. validator_pool From 8694ee662e8507496e43b2458c7125d17bfe3293 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 25 Jul 2024 15:23:52 +0200 Subject: [PATCH 10/11] asdf --- core/node/consensus/src/tests.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 8235534f4114..59fcce189af4 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -675,8 +675,6 @@ async fn test_centralized_fetcher(from_snapshot: bool, version: ProtocolVersionI async fn test_attestation_status_api(version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { let validator_pool = ConnectionPool::test(false, version).await; let (mut validator, runner) = @@ -698,7 +696,7 @@ async fn test_attestation_status_api(version: ProtocolVersionId) { .await?; { let mut conn = validator_pool.connection(ctx).await?; - let number = attester::BatchNumber(status.next_batch_to_attest.0.try_into().unwrap()); + let number = attester::BatchNumber(status.next_batch_to_attest.0.into()); let hash = conn.batch_hash(ctx, number).await?.unwrap(); let cert = attester::BatchQC { signatures: attester::MultiSig::default(), From daf55a1d5683373becc46b66ca4bc1fe232aea71 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 26 Jul 2024 09:54:12 +0200 Subject: [PATCH 11/11] removed race condition --- core/node/consensus/src/tests.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 59fcce189af4..9890165ad81f 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -687,6 +687,9 @@ async fn test_attestation_status_api(version: ProtocolVersionId) { // If the main node has no L1 batch certificates, // the first one to sign should be `last_sealed_batch`. + validator_pool + .wait_for_batch(ctx, validator.last_sealed_batch()) + .await?; let status = api.fetch_attestation_status().await?; assert_eq!(status.next_batch_to_attest, validator.last_sealed_batch());