diff --git a/Cargo.toml b/Cargo.toml index 34e5cb6141c7..443f85493865 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -283,4 +283,4 @@ zksync_node_consensus = { path = "core/node/consensus" } zksync_contract_verification_server = { path = "core/node/contract_verification_server" } zksync_node_api_server = { path = "core/node/api_server" } zksync_tee_verifier_input_producer = { path = "core/node/tee_verifier_input_producer" } -zksync_base_token_adjuster = {path = "core/node/base_token_adjuster"} +zksync_base_token_adjuster = { path = "core/node/base_token_adjuster" } diff --git a/core/lib/config/src/configs/consensus.rs b/core/lib/config/src/configs/consensus.rs index 433b05c954cf..ec4edd486ac0 100644 --- a/core/lib/config/src/configs/consensus.rs +++ b/core/lib/config/src/configs/consensus.rs @@ -12,6 +12,14 @@ pub struct ValidatorPublicKey(pub String); #[derive(Debug, Clone)] pub struct ValidatorSecretKey(pub Secret); +/// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::attester::PublicKey`. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct AttesterPublicKey(pub String); + +/// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::attester::SecretKey`. +#[derive(Debug, Clone)] +pub struct AttesterSecretKey(pub Secret); + /// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::node::PublicKey`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NodePublicKey(pub String); @@ -26,6 +34,12 @@ impl PartialEq for ValidatorSecretKey { } } +impl PartialEq for AttesterSecretKey { + fn eq(&self, other: &Self) -> bool { + self.0.expose_secret().eq(other.0.expose_secret()) + } +} + impl PartialEq for NodeSecretKey { fn eq(&self, other: &Self) -> bool { self.0.expose_secret().eq(other.0.expose_secret()) @@ -41,6 +55,15 @@ pub struct WeightedValidator { pub weight: u64, } +/// Copy-paste of `zksync_consensus_roles::attester::WeightedAttester`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WeightedAttester { + /// Attester key + pub key: AttesterPublicKey, + /// Attester weight inside the Committee. + pub weight: u64, +} + /// Copy-paste of `zksync_concurrency::net::Host`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Host(pub String); @@ -61,6 +84,8 @@ pub struct GenesisSpec { pub protocol_version: ProtocolVersion, /// The validator committee. Represents `zksync_consensus_roles::validator::Committee`. pub validators: Vec, + /// The attester committee. Represents `zksync_consensus_roles::attester::Committee`. + pub attesters: Vec, /// Leader of the committee. Represents /// `zksync_consensus_roles::validator::LeaderSelectionMode::Sticky`. pub leader: ValidatorPublicKey, @@ -119,9 +144,10 @@ impl ConsensusConfig { } } -/// Secrets need for consensus. +/// Secrets needed for consensus. #[derive(Debug, Clone, PartialEq)] pub struct ConsensusSecrets { pub validator_key: Option, + pub attester_key: Option, pub node_key: Option, } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 939b24ea8c76..c41180fe42b3 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -725,6 +725,16 @@ impl Distribution for EncodeDist { } } +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> configs::consensus::WeightedAttester { + use configs::consensus::{AttesterPublicKey, WeightedAttester}; + WeightedAttester { + key: AttesterPublicKey(self.sample(rng)), + weight: self.sample(rng), + } + } +} + impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::consensus::GenesisSpec { use configs::consensus::{GenesisSpec, ProtocolVersion, ValidatorPublicKey}; @@ -732,6 +742,7 @@ impl Distribution for EncodeDist { chain_id: L2ChainId::default(), protocol_version: ProtocolVersion(self.sample(rng)), validators: self.sample_collect(rng), + attesters: self.sample_collect(rng), leader: ValidatorPublicKey(self.sample(rng)), } } @@ -769,9 +780,12 @@ impl Distribution for EncodeDist { impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::consensus::ConsensusSecrets { - use configs::consensus::{ConsensusSecrets, NodeSecretKey, ValidatorSecretKey}; + use configs::consensus::{ + AttesterSecretKey, ConsensusSecrets, NodeSecretKey, ValidatorSecretKey, + }; ConsensusSecrets { validator_key: self.sample_opt(|| ValidatorSecretKey(String::into(self.sample(rng)))), + attester_key: self.sample_opt(|| AttesterSecretKey(String::into(self.sample(rng)))), node_key: self.sample_opt(|| NodeSecretKey(String::into(self.sample(rng)))), } } diff --git a/core/lib/dal/.sqlx/query-849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c.json b/core/lib/dal/.sqlx/query-849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c.json new file mode 100644 index 000000000000..5130763af73c --- /dev/null +++ b/core/lib/dal/.sqlx/query-849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n MAX(l1_batch_number) AS \"number\"\n FROM\n l1_batches_consensus\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c" +} diff --git a/core/lib/dal/.sqlx/query-8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637.json b/core/lib/dal/.sqlx/query-8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637.json new file mode 100644 index 000000000000..930c1c1a9fed --- /dev/null +++ b/core/lib/dal/.sqlx/query-8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n certificate\n FROM\n l1_batches_consensus\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "certificate", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637" +} diff --git a/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json b/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json new file mode 100644 index 000000000000..a42fbe98ff2f --- /dev/null +++ b/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at)\n VALUES\n ($1, $2, NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510" +} diff --git a/core/lib/dal/migrations/20240627142548_l1_batches_consensus.down.sql b/core/lib/dal/migrations/20240627142548_l1_batches_consensus.down.sql new file mode 100644 index 000000000000..45114088eaa8 --- /dev/null +++ b/core/lib/dal/migrations/20240627142548_l1_batches_consensus.down.sql @@ -0,0 +1 @@ +DROP TABLE l1_batches_consensus; diff --git a/core/lib/dal/migrations/20240627142548_l1_batches_consensus.up.sql b/core/lib/dal/migrations/20240627142548_l1_batches_consensus.up.sql new file mode 100644 index 000000000000..71c3854d640e --- /dev/null +++ b/core/lib/dal/migrations/20240627142548_l1_batches_consensus.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE l1_batches_consensus ( + l1_batch_number BIGINT PRIMARY KEY REFERENCES l1_batches (number) ON DELETE CASCADE, + certificate JSONB NOT NULL, + + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + + CHECK((certificate->'message'->'number')::jsonb::numeric = l1_batch_number) +); diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index d4178fa32e00..3efdf5ee577b 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -1,12 +1,13 @@ use anyhow::Context as _; -use zksync_consensus_roles::validator; +use bigdecimal::Zero; +use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{BlockStoreState, ReplicaState}; use zksync_db_connection::{ connection::Connection, error::{DalError, DalResult, SqlxContext}, instrument::{InstrumentExt, Instrumented}, }; -use zksync_types::L2BlockNumber; +use zksync_types::{L1BatchNumber, L2BlockNumber}; pub use crate::consensus::Payload; use crate::{Core, CoreDal}; @@ -20,7 +21,7 @@ pub struct ConsensusDal<'a, 'c> { /// Error returned by `ConsensusDal::insert_certificate()`. #[derive(thiserror::Error, Debug)] pub enum InsertCertificateError { - #[error("corresponding L2 block is missing")] + #[error("corresponding payload is missing")] MissingPayload, #[error("certificate doesn't match the payload")] PayloadMismatch, @@ -236,7 +237,7 @@ impl ConsensusDal<'_, '_> { /// Fetches the last consensus certificate. /// Currently, certificates are NOT generated synchronously with L2 blocks, /// so it might NOT be the certificate for the last L2 block. - pub async fn certificates_range(&mut self) -> anyhow::Result { + pub async fn block_certificates_range(&mut self) -> anyhow::Result { // It cannot be older than genesis first block. let mut start = self.genesis().await?.context("genesis()")?.first_block; start = start.max(self.first_block().await.context("first_block()")?); @@ -255,7 +256,7 @@ impl ConsensusDal<'_, '_> { "#, i64::try_from(start.0)?, ) - .instrument("last_certificate") + .instrument("block_certificate_range") .report_latency() .fetch_optional(self.storage) .await?; @@ -268,7 +269,7 @@ impl ConsensusDal<'_, '_> { } /// Fetches the consensus certificate for the L2 block with the given `block_number`. - pub async fn certificate( + pub async fn block_certificate( &mut self, block_number: validator::BlockNumber, ) -> anyhow::Result> { @@ -283,7 +284,33 @@ impl ConsensusDal<'_, '_> { "#, i64::try_from(block_number.0)? ) - .instrument("certificate") + .instrument("block_certificate") + .report_latency() + .fetch_optional(self.storage) + .await? + else { + return Ok(None); + }; + Ok(Some(zksync_protobuf::serde::deserialize(row.certificate)?)) + } + + /// Fetches the attester certificate for the L1 batch with the given `batch_number`. + pub async fn batch_certificate( + &mut self, + batch_number: attester::BatchNumber, + ) -> anyhow::Result> { + let Some(row) = sqlx::query!( + r#" + SELECT + certificate + FROM + l1_batches_consensus + WHERE + l1_batch_number = $1 + "#, + i64::try_from(batch_number.0)? + ) + .instrument("batch_certificate") .report_latency() .fetch_optional(self.storage) .await? @@ -345,7 +372,7 @@ impl ConsensusDal<'_, '_> { /// Inserts a certificate for the L2 block `cert.header().number`. /// Fails if certificate doesn't match the stored block. - pub async fn insert_certificate( + pub async fn insert_block_certificate( &mut self, cert: &validator::CommitQC, ) -> Result<(), InsertCertificateError> { @@ -370,22 +397,102 @@ impl ConsensusDal<'_, '_> { header.number.0 as i64, zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), ) - .instrument("insert_certificate") + .instrument("insert_block_certificate") .report_latency() .execute(&mut txn) .await?; txn.commit().await.context("commit")?; Ok(()) } + + /// Inserts a certificate for the L1 batch. + /// + /// Insertion is allowed even if it creates gaps in the L1 batch history. + /// + /// It fails if the batch payload is missing or it's not consistent with the QC. + pub async fn insert_batch_certificate( + &mut self, + cert: &attester::BatchQC, + ) -> Result<(), InsertCertificateError> { + use InsertCertificateError as E; + let mut txn = self.storage.start_transaction().await?; + + let l1_batch_number = L1BatchNumber(cert.message.number.0 as u32); + let _l1_batch_header = txn + .blocks_dal() + .get_l1_batch_header(l1_batch_number) + .await? + .ok_or(E::MissingPayload)?; + + // TODO: Verify that the certificate matches the stored batch: + // * add the hash of the batch to the `BatchQC` + // * find out which field in the `l1_batches` table contains the hash we need to match + // * ideally move the responsibility of validation outside this method + + // if header.payload != want_payload.encode().hash() { + // return Err(E::PayloadMismatch); + // } + + let res = sqlx::query!( + r#" + INSERT INTO + l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at) + VALUES + ($1, $2, NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO NOTHING + "#, + i64::from(l1_batch_number.0), + zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), + ) + .instrument("insert_batch_certificate") + .report_latency() + .execute(&mut txn) + .await?; + + if res.rows_affected().is_zero() { + tracing::debug!(%l1_batch_number, "duplicate batch certificate"); + } + + txn.commit().await.context("commit")?; + + Ok(()) + } + + /// Gets a number of the last L1 batch that was inserted. It might have gaps before it, + /// depending on the order in which votes have been collected over gossip by consensus. + pub async fn get_last_batch_certificate_number( + &mut self, + ) -> DalResult> { + let row = sqlx::query!( + r#" + SELECT + MAX(l1_batch_number) AS "number" + FROM + l1_batches_consensus + "# + ) + .instrument("get_last_batch_certificate_number") + .report_latency() + .fetch_one(self.storage) + .await?; + + Ok(row + .number + .map(|number| attester::BatchNumber(number as u64))) + } } #[cfg(test)] mod tests { use rand::Rng as _; - use zksync_consensus_roles::validator; + use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::ReplicaState; + use zksync_types::{L1BatchNumber, ProtocolVersion}; - use crate::{ConnectionPool, Core, CoreDal}; + use crate::{ + tests::{create_l1_batch_header, create_l2_block_header}, + ConnectionPool, Core, CoreDal, + }; #[tokio::test] async fn replica_state_read_write() { @@ -421,4 +528,89 @@ mod tests { } } } + + #[tokio::test] + async fn test_batch_certificate() { + let rng = &mut rand::thread_rng(); + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + + let mut mock_batch_qc = |number: L1BatchNumber| { + let mut cert: attester::BatchQC = rng.gen(); + cert.message.number.0 = u64::from(number.0); + cert.signatures.add(rng.gen(), rng.gen()); + cert + }; + + // Required for inserting l2 blocks + conn.protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); + + // Insert some mock L2 blocks and L1 batches + let mut block_number = 0; + let mut batch_number = 0; + for _ in 0..3 { + for _ in 0..3 { + block_number += 1; + let l2_block = create_l2_block_header(block_number); + conn.blocks_dal().insert_l2_block(&l2_block).await.unwrap(); + } + batch_number += 1; + let l1_batch = create_l1_batch_header(batch_number); + + conn.blocks_dal() + .insert_mock_l1_batch(&l1_batch) + .await + .unwrap(); + + conn.blocks_dal() + .mark_l2_blocks_as_executed_in_l1_batch(l1_batch.number) + .await + .unwrap(); + } + + let l1_batch_number = L1BatchNumber(batch_number); + + // Insert a batch certificate for the last L1 batch. + let cert1 = mock_batch_qc(l1_batch_number); + + conn.consensus_dal() + .insert_batch_certificate(&cert1) + .await + .unwrap(); + + // Try insert duplicate batch certificate for the same batch. + let cert2 = mock_batch_qc(l1_batch_number); + + conn.consensus_dal() + .insert_batch_certificate(&cert2) + .await + .unwrap(); + + // Retrieve the latest certificate. + let number = conn + .consensus_dal() + .get_last_batch_certificate_number() + .await + .unwrap() + .unwrap(); + + let cert = conn + .consensus_dal() + .batch_certificate(number) + .await + .unwrap() + .unwrap(); + + assert_eq!(cert, cert1, "duplicates are ignored"); + + // Try insert batch certificate for non-existing batch + let cert3 = mock_batch_qc(l1_batch_number.next()); + conn.consensus_dal() + .insert_batch_certificate(&cert3) + .await + .expect_err("missing payload"); + } } diff --git a/core/lib/dal/src/models/storage_eth_tx.rs b/core/lib/dal/src/models/storage_eth_tx.rs index 615b365d8533..2654ffe0e0a7 100644 --- a/core/lib/dal/src/models/storage_eth_tx.rs +++ b/core/lib/dal/src/models/storage_eth_tx.rs @@ -77,7 +77,7 @@ impl From for EthTx { .expect("Incorrect address in db"), raw_tx: tx.raw_tx.clone(), tx_type: AggregatedActionType::from_str(&tx.tx_type).expect("Wrong agg type"), - created_at_timestamp: tx.created_at.timestamp() as u64, + created_at_timestamp: tx.created_at.and_utc().timestamp() as u64, predicted_gas_cost: tx.predicted_gas_cost as u64, from_addr: tx.from_addr.map(|f| Address::from_slice(&f)), blob_sidecar: tx.blob_sidecar.map(|b| { diff --git a/core/lib/dal/src/models/storage_transaction.rs b/core/lib/dal/src/models/storage_transaction.rs index bce5e554f383..31a182a7eca0 100644 --- a/core/lib/dal/src/models/storage_transaction.rs +++ b/core/lib/dal/src/models/storage_transaction.rs @@ -296,7 +296,7 @@ impl From for Transaction { let hash = H256::from_slice(&tx.hash); let execute = serde_json::from_value::(tx.data.clone()) .unwrap_or_else(|_| panic!("invalid json in database for tx {:?}", hash)); - let received_timestamp_ms = tx.received_at.timestamp_millis() as u64; + let received_timestamp_ms = tx.received_at.and_utc().timestamp_millis() as u64; match tx.tx_format { Some(t) if t == i32::from(PRIORITY_OPERATION_L2_TX_TYPE) => Transaction { common_data: ExecuteTransactionCommon::L1(tx.into()), diff --git a/core/lib/protobuf_config/src/consensus.rs b/core/lib/protobuf_config/src/consensus.rs index 3d2c862d7639..c04120edcc54 100644 --- a/core/lib/protobuf_config/src/consensus.rs +++ b/core/lib/protobuf_config/src/consensus.rs @@ -1,8 +1,8 @@ use anyhow::Context as _; use zksync_basic_types::L2ChainId; use zksync_config::configs::consensus::{ - ConsensusConfig, GenesisSpec, Host, NodePublicKey, ProtocolVersion, RpcConfig, - ValidatorPublicKey, WeightedValidator, + AttesterPublicKey, ConsensusConfig, GenesisSpec, Host, NodePublicKey, ProtocolVersion, + RpcConfig, ValidatorPublicKey, WeightedAttester, WeightedValidator, }; use zksync_protobuf::{read_optional, repr::ProtoRepr, required, ProtoFmt}; @@ -24,6 +24,22 @@ impl ProtoRepr for proto::WeightedValidator { } } +impl ProtoRepr for proto::WeightedAttester { + type Type = WeightedAttester; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + key: AttesterPublicKey(required(&self.key).context("key")?.clone()), + weight: *required(&self.weight).context("weight")?, + }) + } + fn build(this: &Self::Type) -> Self { + Self { + key: Some(this.key.0.clone()), + weight: Some(this.weight), + } + } +} + impl ProtoRepr for proto::GenesisSpec { type Type = GenesisSpec; fn read(&self) -> anyhow::Result { @@ -41,6 +57,13 @@ impl ProtoRepr for proto::GenesisSpec { .map(|(i, x)| x.read().context(i)) .collect::>() .context("validators")?, + attesters: self + .attesters + .iter() + .enumerate() + .map(|(i, x)| x.read().context(i)) + .collect::>() + .context("attesters")?, leader: ValidatorPublicKey(required(&self.leader).context("leader")?.clone()), }) } @@ -49,6 +72,7 @@ impl ProtoRepr for proto::GenesisSpec { chain_id: Some(this.chain_id.as_u64()), protocol_version: Some(this.protocol_version.0), validators: this.validators.iter().map(ProtoRepr::build).collect(), + attesters: this.attesters.iter().map(ProtoRepr::build).collect(), leader: Some(this.leader.0.clone()), } } diff --git a/core/lib/protobuf_config/src/proto/config/secrets.proto b/core/lib/protobuf_config/src/proto/config/secrets.proto index fb328883f99d..b711d81d5754 100644 --- a/core/lib/protobuf_config/src/proto/config/secrets.proto +++ b/core/lib/protobuf_config/src/proto/config/secrets.proto @@ -16,6 +16,7 @@ message L1Secrets { message ConsensusSecrets { optional string validator_key = 1; // required for validator nodes; ValidatorSecretKey optional string node_key = 2; // required for any node; NodeSecretKey + optional string attester_key = 3; // required for attester nodes; AttesterSecretKey } message Secrets { diff --git a/core/lib/protobuf_config/src/proto/core/consensus.proto b/core/lib/protobuf_config/src/proto/core/consensus.proto index 5b59e5151cf7..2adc70886e9e 100644 --- a/core/lib/protobuf_config/src/proto/core/consensus.proto +++ b/core/lib/protobuf_config/src/proto/core/consensus.proto @@ -43,12 +43,19 @@ message WeightedValidator { optional uint64 weight = 2; // required } +// Weighted member of an attester committee. +message WeightedAttester { + optional string key = 1; // required; AttesterPublic + optional uint64 weight = 2; // required +} + // Consensus genesis specification. message GenesisSpec { optional uint64 chain_id = 1; // required; L2ChainId, should be the same as `l2_chain_id` in the `zksync.config.genesis.Genesis`. optional uint32 protocol_version = 2; // required; validator::ProtocolVersion repeated WeightedValidator validators = 3; // must be non-empty; validator committee. optional string leader = 4; // required; ValidatorPublicKey + repeated WeightedAttester attesters = 5; // can be empty; attester committee. } // Per peer connection RPC rate limits. diff --git a/core/lib/protobuf_config/src/secrets.rs b/core/lib/protobuf_config/src/secrets.rs index 91a05b31f196..43f537a5fbfa 100644 --- a/core/lib/protobuf_config/src/secrets.rs +++ b/core/lib/protobuf_config/src/secrets.rs @@ -4,7 +4,7 @@ use anyhow::Context; use secrecy::ExposeSecret; use zksync_basic_types::url::SensitiveUrl; use zksync_config::configs::{ - consensus::{ConsensusSecrets, NodeSecretKey, ValidatorSecretKey}, + consensus::{AttesterSecretKey, ConsensusSecrets, NodeSecretKey, ValidatorSecretKey}, secrets::Secrets, DatabaseSecrets, L1Secrets, }; @@ -98,6 +98,10 @@ impl ProtoRepr for proto::ConsensusSecrets { .validator_key .as_ref() .map(|x| ValidatorSecretKey(x.clone().into())), + attester_key: self + .attester_key + .as_ref() + .map(|x| AttesterSecretKey(x.clone().into())), node_key: self .node_key .as_ref() @@ -111,6 +115,10 @@ impl ProtoRepr for proto::ConsensusSecrets { .validator_key .as_ref() .map(|x| x.0.expose_secret().clone()), + attester_key: this + .attester_key + .as_ref() + .map(|x| x.0.expose_secret().clone()), node_key: this.node_key.as_ref().map(|x| x.0.expose_secret().clone()), } } diff --git a/core/node/consensus/src/config.rs b/core/node/consensus/src/config.rs index cac9e9296227..75e329d6c347 100644 --- a/core/node/consensus/src/config.rs +++ b/core/node/consensus/src/config.rs @@ -10,7 +10,7 @@ use zksync_config::{ }; use zksync_consensus_crypto::{Text, TextFmt}; use zksync_consensus_executor as executor; -use zksync_consensus_roles::{node, validator}; +use zksync_consensus_roles::{attester, node, validator}; fn read_secret_text(text: Option<&Secret>) -> anyhow::Result> { text.map(|text| Text::new(text.expose_secret()).decode()) @@ -24,6 +24,12 @@ pub(super) fn validator_key( read_secret_text(secrets.validator_key.as_ref().map(|x| &x.0)) } +pub(super) fn attester_key( + secrets: &ConsensusSecrets, +) -> anyhow::Result> { + read_secret_text(secrets.attester_key.as_ref().map(|x| &x.0)) +} + /// Consensus genesis specification. /// It is a digest of the `validator::Genesis`, /// which allows to initialize genesis (if not present) @@ -33,6 +39,7 @@ pub(super) struct GenesisSpec { pub(super) chain_id: validator::ChainId, pub(super) protocol_version: validator::ProtocolVersion, pub(super) validators: validator::Committee, + pub(super) attesters: Option, pub(super) leader_selection: validator::LeaderSelectionMode, } @@ -42,6 +49,7 @@ impl GenesisSpec { chain_id: g.chain_id, protocol_version: g.protocol_version, validators: g.validators.clone(), + attesters: g.attesters.clone(), leader_selection: g.leader_selection.clone(), } } @@ -59,6 +67,20 @@ impl GenesisSpec { }) .collect::>() .context("validators")?; + + let attesters: Vec<_> = x + .attesters + .iter() + .enumerate() + .map(|(i, v)| { + Ok(attester::WeightedAttester { + key: Text::new(&v.key.0).decode().context("key").context(i)?, + weight: v.weight, + }) + }) + .collect::>() + .context("attesters")?; + Ok(Self { chain_id: validator::ChainId(x.chain_id.as_u64()), protocol_version: validator::ProtocolVersion(x.protocol_version.0), @@ -66,6 +88,11 @@ impl GenesisSpec { Text::new(&x.leader.0).decode().context("leader")?, ), validators: validator::Committee::new(validators).context("validators")?, + attesters: if attesters.is_empty() { + None + } else { + Some(attester::Committee::new(attesters).context("attesters")?) + }, }) } } @@ -112,6 +139,7 @@ pub(super) fn executor( .context("gossip_static_inbound")?, gossip_static_outbound, rpc, + // TODO: Add to configuration debug_page: None, }) } diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 66326756fb77..077b4d64c524 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -39,18 +39,23 @@ impl EN { // Initialize genesis. let genesis = self.fetch_genesis(ctx).await.wrap("fetch_genesis()")?; let mut conn = self.pool.connection(ctx).await.wrap("connection()")?; + conn.try_update_genesis(ctx, &genesis) .await .wrap("set_genesis()")?; + let mut payload_queue = conn .new_payload_queue(ctx, actions, self.sync_state.clone()) .await .wrap("new_payload_queue()")?; + drop(conn); // Fetch blocks before the genesis. self.fetch_blocks(ctx, &mut payload_queue, Some(genesis.first_block)) - .await?; + .await + .wrap("fetch_blocks()")?; + // Monitor the genesis of the main node. // If it changes, it means that a hard fork occurred and we need to reset the consensus state. s.spawn_bg::<()>(async { @@ -69,15 +74,17 @@ impl EN { }); // Run consensus component. + // External nodes have a payload queue which they use to fetch data from the main node. let (store, runner) = Store::new(ctx, self.pool.clone(), Some(payload_queue)) .await .wrap("Store::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) .await .wrap("BlockStore::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); - // Dummy batch store - we don't gossip batches yet, but we need one anyway. + let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) .await .wrap("BatchStore::new()")?; @@ -87,7 +94,6 @@ impl EN { config: config::executor(&cfg, &secrets)?, block_store, batch_store, - attester: None, validator: config::validator_key(&secrets) .context("validator_key")? .map(|key| executor::Validator { @@ -95,8 +101,12 @@ impl EN { replica_store: Box::new(store.clone()), payload_manager: Box::new(store.clone()), }), + attester: config::attester_key(&secrets) + .context("attester_key")? + .map(|key| executor::Attester { key }), }; executor.run(ctx).await?; + Ok(()) }) .await; diff --git a/core/node/consensus/src/mn.rs b/core/node/consensus/src/mn.rs index 0aac43b8ef87..3e8f0f4778bb 100644 --- a/core/node/consensus/src/mn.rs +++ b/core/node/consensus/src/mn.rs @@ -1,7 +1,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; -use zksync_consensus_executor::{self as executor}; +use zksync_consensus_executor::{self as executor, Attester}; use zksync_consensus_roles::validator; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -23,6 +23,8 @@ pub async fn run_main_node( .context("validator_key")? .context("missing validator_key")?; + let attester_key_opt = config::attester_key(&secrets).context("attester_key")?; + scope::run!(&ctx, |ctx, s| async { if let Some(spec) = &cfg.genesis_spec { let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?; @@ -35,6 +37,7 @@ pub async fn run_main_node( .wrap("adjust_genesis()")?; } + // The main node doesn't have a payload queue as it produces all the L2 blocks itself. let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?; s.spawn_bg(runner.run(ctx)); @@ -49,22 +52,21 @@ pub async fn run_main_node( "unsupported leader selection mode - main node has to be the leader" ); - // Dummy batch store - we don't gossip batches yet, but we need one anyway. let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) .await .wrap("BatchStore::new()")?; - s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") }); + s.spawn_bg(runner.run(ctx)); let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, block_store, batch_store, - attester: None, validator: Some(executor::Validator { key: validator_key, replica_store: Box::new(store.clone()), payload_manager: Box::new(store.clone()), }), + attester: attester_key_opt.map(|key| Attester { key }), }; executor.run(ctx).await }) diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 673cb87d2f4e..1d8dfc3aed57 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -1,7 +1,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, time}; -use zksync_consensus_roles::validator; -use zksync_consensus_storage as storage; +use zksync_consensus_roles::{attester, validator}; +use zksync_consensus_storage::{self as storage, BatchStoreState}; use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError}; use zksync_node_sync::{fetcher::IoCursorExt as _, ActionQueueSender, SyncState}; use zksync_state_keeper::io::common::IoCursor; @@ -92,25 +92,36 @@ impl<'a> Connection<'a> { .map_err(DalError::generalize)?) } - /// Wrapper for `consensus_dal().certificate()`. - pub async fn certificate( + /// Wrapper for `consensus_dal().block_certificate()`. + pub async fn block_certificate( &mut self, ctx: &ctx::Ctx, number: validator::BlockNumber, ) -> ctx::Result> { Ok(ctx - .wait(self.0.consensus_dal().certificate(number)) + .wait(self.0.consensus_dal().block_certificate(number)) .await??) } - /// Wrapper for `consensus_dal().insert_certificate()`. - pub async fn insert_certificate( + /// Wrapper for `consensus_dal().insert_block_certificate()`. + pub async fn insert_block_certificate( &mut self, ctx: &ctx::Ctx, cert: &validator::CommitQC, ) -> Result<(), InsertCertificateError> { Ok(ctx - .wait(self.0.consensus_dal().insert_certificate(cert)) + .wait(self.0.consensus_dal().insert_block_certificate(cert)) + .await??) + } + + /// Wrapper for `consensus_dal().insert_batch_certificate()`. + pub async fn insert_batch_certificate( + &mut self, + ctx: &ctx::Ctx, + cert: &attester::BatchQC, + ) -> Result<(), InsertCertificateError> { + Ok(ctx + .wait(self.0.consensus_dal().insert_batch_certificate(cert)) .await??) } @@ -134,7 +145,7 @@ impl<'a> Connection<'a> { .context("sqlx")?) } - /// Wrapper for `consensus_dal().get_l1_batch_metadata()`. + /// Wrapper for `blocks_dal().get_l1_batch_metadata()`. pub async fn batch( &mut self, ctx: &ctx::Ctx, @@ -184,13 +195,13 @@ impl<'a> Connection<'a> { Ok(ctx.wait(self.0.consensus_dal().next_block()).await??) } - /// Wrapper for `consensus_dal().certificates_range()`. - pub(crate) async fn certificates_range( + /// Wrapper for `consensus_dal().block_certificates_range()`. + pub(crate) async fn block_certificates_range( &mut self, ctx: &ctx::Ctx, ) -> ctx::Result { Ok(ctx - .wait(self.0.consensus_dal().certificates_range()) + .wait(self.0.consensus_dal().block_certificates_range()) .await??) } @@ -239,17 +250,163 @@ impl<'a> Connection<'a> { ctx: &ctx::Ctx, number: validator::BlockNumber, ) -> ctx::Result> { - let Some(justification) = self.certificate(ctx, number).await.wrap("certificate()")? else { + let Some(justification) = self + .block_certificate(ctx, number) + .await + .wrap("block_certificate()")? + else { return Ok(None); }; + let payload = self .payload(ctx, number) .await .wrap("payload()")? .context("L2 block disappeared from storage")?; + Ok(Some(validator::FinalBlock { payload: payload.encode(), justification, })) } + + /// Wrapper for `blocks_dal().get_sealed_l1_batch_number()`. + pub async fn get_last_batch_number( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.blocks_dal().get_sealed_l1_batch_number()) + .await? + .context("get_sealed_l1_batch_number()")? + .map(|nr| attester::BatchNumber(nr.0 as u64))) + } + + /// Wrapper for `consensus_dal().get_last_batch_certificate_number()`. + pub async fn get_last_batch_certificate_number( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().get_last_batch_certificate_number()) + .await? + .context("get_last_batch_certificate_number()")?) + } + + /// Wrapper for `consensus_dal().batch_certificate()`. + pub async fn batch_certificate( + &mut self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().batch_certificate(number)) + .await? + .context("batch_certificate()")?) + } + + /// Wrapper for `blocks_dal().get_l2_block_range_of_l1_batch()`. + pub async fn get_l2_block_range_of_l1_batch( + &mut self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result> { + let number = L1BatchNumber(number.0.try_into().context("number")?); + + let range = ctx + .wait(self.0.blocks_dal().get_l2_block_range_of_l1_batch(number)) + .await? + .context("get_l2_block_range_of_l1_batch()")?; + + Ok(range.map(|(min, max)| { + let min = validator::BlockNumber(min.0 as u64); + let max = validator::BlockNumber(max.0 as u64); + (min, max) + })) + } + + /// Construct the [attester::SyncBatch] for a given batch number. + pub async fn get_batch( + &mut self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result> { + let Some((min, max)) = self + .get_l2_block_range_of_l1_batch(ctx, number) + .await + .context("get_l2_block_range_of_l1_batch()")? + else { + return Ok(None); + }; + + let payloads = self.payloads(ctx, min..max).await.wrap("payloads()")?; + let payloads = payloads.into_iter().map(|p| p.encode()).collect(); + + // TODO: Fill out the proof when we have the stateless L1 batch validation story finished. + // It is supposed to be a Merkle proof that the rolling hash of the batch has been included + // in the L1 state tree. The state root hash of L1 won't be available in the DB, it requires + // an API client. + let batch = attester::SyncBatch { + number, + payloads, + proof: Vec::new(), + }; + + Ok(Some(batch)) + } + + /// Construct the [storage::BatchStoreState] which contains the earliest batch and the last available [attester::SyncBatch]. + pub async fn batches_range(&mut self, ctx: &ctx::Ctx) -> ctx::Result { + let first = self + .0 + .blocks_dal() + .get_earliest_l1_batch_number() + .await + .context("get_earliest_l1_batch_number()")?; + + let first = if first.is_some() { + first + } else { + self.0 + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await + .context("get_earliest_l1_batch_number()")? + .map(|s| s.l1_batch_number) + }; + + // TODO: In the future when we start filling in the `SyncBatch::proof` field, + // we can only run `get_batch` expecting `Some` result on numbers where the + // L1 state root hash is already available, so that we can produce some + // Merkle proof that the rolling hash of the L2 blocks in the batch has + // been included in the L1 state tree. At that point we probably can't + // call `get_last_batch_number` here, but something that indicates that + // the hashes/commitments on the L1 batch are ready and the thing has + // been included in L1; that potentially requires an API client as well. + let last = self + .get_last_batch_number(ctx) + .await + .context("get_last_batch_number()")?; + + let last = if let Some(last) = last { + // For now it would be unexpected if we couldn't retrieve the payloads + // for the `last` batch number, as an L1 batch is only created if we + // have all the L2 miniblocks for it. + Some( + self.get_batch(ctx, last) + .await + .context("get_batch()")? + .context("last batch not available")?, + ) + } else { + None + }; + + Ok(BatchStoreState { + first: first + .map(|n| attester::BatchNumber(n.0 as u64)) + .unwrap_or(attester::BatchNumber(0)), + last, + }) + } } diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index 745ccce4bef3..c196989c300b 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -4,12 +4,12 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_bft::PayloadManager; use zksync_consensus_roles::{attester, validator}; -use zksync_consensus_storage as storage; +use zksync_consensus_storage::{self as storage, BatchStoreState}; use zksync_dal::consensus_dal::{self, Payload}; use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction}; use zksync_types::L2BlockNumber; -use super::PayloadQueue; +use super::{Connection, PayloadQueue}; use crate::storage::{ConnectionPool, InsertCertificateError}; fn to_fetched_block( @@ -51,20 +51,27 @@ fn to_fetched_block( #[derive(Clone, Debug)] pub(crate) struct Store { pub(super) pool: ConnectionPool, - payloads: Arc>>, - /// L2 block QCs received over gossip - certificates: ctx::channel::UnboundedSender, + /// Action queue to fetch/store L2 block payloads + block_payloads: Arc>>, + /// L2 block QCs received from consensus + block_certificates: ctx::channel::UnboundedSender, + /// L1 batch QCs received from consensus + batch_certificates: ctx::channel::UnboundedSender, /// Range of L2 blocks for which we have a QC persisted. - persisted: sync::watch::Receiver, + blocks_persisted: sync::watch::Receiver, + /// Range of L1 batches we have persisted. + batches_persisted: sync::watch::Receiver, } -struct PersistedState(sync::watch::Sender); +struct PersistedBlockState(sync::watch::Sender); /// Background task of the `Store`. pub struct StoreRunner { pool: ConnectionPool, - persisted: PersistedState, - certificates: ctx::channel::UnboundedReceiver, + blocks_persisted: PersistedBlockState, + batches_persisted: sync::watch::Sender, + block_certificates: ctx::channel::UnboundedReceiver, + batch_certificates: ctx::channel::UnboundedReceiver, } impl Store { @@ -73,32 +80,50 @@ impl Store { pool: ConnectionPool, payload_queue: Option, ) -> ctx::Result<(Store, StoreRunner)> { - let persisted = pool - .connection(ctx) - .await - .wrap("connection()")? - .certificates_range(ctx) + let mut conn = pool.connection(ctx).await.wrap("connection()")?; + + // Initial state of persisted blocks + let blocks_persisted = conn + .block_certificates_range(ctx) .await - .wrap("certificates_range()")?; - let persisted = sync::watch::channel(persisted).0; - let (certs_send, certs_recv) = ctx::channel::unbounded(); + .wrap("block_certificates_range()")?; + + // Initial state of persisted batches + let batches_persisted = conn.batches_range(ctx).await.wrap("batches_range()")?; + + drop(conn); + + let blocks_persisted = sync::watch::channel(blocks_persisted).0; + let batches_persisted = sync::watch::channel(batches_persisted).0; + let (block_certs_send, block_certs_recv) = ctx::channel::unbounded(); + let (batch_certs_send, batch_certs_recv) = ctx::channel::unbounded(); + Ok(( Store { pool: pool.clone(), - certificates: certs_send, - payloads: Arc::new(sync::Mutex::new(payload_queue)), - persisted: persisted.subscribe(), + block_certificates: block_certs_send, + batch_certificates: batch_certs_send, + block_payloads: Arc::new(sync::Mutex::new(payload_queue)), + blocks_persisted: blocks_persisted.subscribe(), + batches_persisted: batches_persisted.subscribe(), }, StoreRunner { pool, - persisted: PersistedState(persisted), - certificates: certs_recv, + blocks_persisted: PersistedBlockState(blocks_persisted), + batches_persisted, + block_certificates: block_certs_recv, + batch_certificates: batch_certs_recv, }, )) } + + /// Get a fresh connection from the pool. + async fn conn(&self, ctx: &ctx::Ctx) -> ctx::Result { + self.pool.connection(ctx).await.wrap("connection") + } } -impl PersistedState { +impl PersistedBlockState { /// Updates `persisted` to new. /// Ends of the range can only be moved forward. /// If `persisted.first` is moved forward, it means that blocks have been pruned. @@ -136,47 +161,120 @@ impl PersistedState { } impl StoreRunner { - pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let StoreRunner { + pool, + blocks_persisted, + batches_persisted, + mut block_certificates, + mut batch_certificates, + } = self; + let res = scope::run!(ctx, |ctx, s| async { s.spawn::<()>(async { - // Loop updating `persisted` whenever blocks get pruned. + // Loop updating `blocks_persisted` whenever blocks get pruned. const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); loop { - let range = self - .pool + let range = pool .connection(ctx) + .await? + .block_certificates_range(ctx) .await - .wrap("connection")? - .certificates_range(ctx) + .wrap("block_certificates_range()")?; + blocks_persisted.update(range); + ctx.sleep(POLL_INTERVAL).await?; + } + }); + + // NOTE: Running this update loop will trigger the gossip of `SyncBatches` which is currently + // pointless as there is no proof and we have to ignore them. We can disable it, but bear in + // mind that any node which gossips the availability will cause pushes and pulls in the consensus. + s.spawn::<()>(async { + // Loop updating `batches_persisted` whenever a new L1 batch is available in the database. + // We have to do this because the L1 batch is produced as L2 blocks are executed, + // which can happen on a different machine or in a different process, so we can't rely on some + // DAL method updating this memory construct. However I'm not sure that `BatchStoreState` + // really has to contain the full blown last batch, or whether it could have for example + // just the number of it. We can't just use the `attester::BatchQC`, which would make it + // analogous to the `BlockStoreState`, because the `SyncBatch` mechanism is for catching + // up with L1 batches from peers _without_ the QC, based on L1 inclusion proofs instead. + // Nevertheless since the `SyncBatch` contains all transactions for all L2 blocks, + // we can try to make it less frequent by querying just the last batch number first. + const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); + let mut next_batch_number = { batches_persisted.borrow().next() }; + loop { + let mut conn = pool.connection(ctx).await?; + if let Some(last_batch_number) = conn + .get_last_batch_number(ctx) .await - .wrap("certificates_range()")?; - self.persisted.update(range); + .wrap("last_batch_number()")? + { + if last_batch_number >= next_batch_number { + let range = conn.batches_range(ctx).await.wrap("batches_range()")?; + next_batch_number = last_batch_number.next(); + batches_persisted.send_replace(range); + } + } ctx.sleep(POLL_INTERVAL).await?; } }); - // Loop inserting certs to storage. + s.spawn::<()>(async { + // Loop inserting batch certificates into storage + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + loop { + let cert = batch_certificates.recv(ctx).await?; + + loop { + use consensus_dal::InsertCertificateError as E; + // Try to insert the cert. + let res = pool + .connection(ctx) + .await? + .insert_batch_certificate(ctx, &cert) + .await; + + match res { + Ok(()) => { + break; + } + Err(InsertCertificateError::Inner(E::MissingPayload)) => { + // The L1 batch isn't available yet. + // We can wait until it's produced/received, or we could modify gossip + // so that we don't even accept votes until we have the corresponding batch. + ctx.sleep(POLL_INTERVAL).await?; + } + Err(InsertCertificateError::Inner(err)) => { + return Err(ctx::Error::Internal(anyhow::Error::from(err))) + } + Err(InsertCertificateError::Canceled(err)) => { + return Err(ctx::Error::Canceled(err)) + } + } + } + } + }); + + // Loop inserting block certs to storage. const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); loop { - let cert = self.certificates.recv(ctx).await?; + let cert = block_certificates.recv(ctx).await?; // Wait for the block to be persisted, so that we can attach a cert to it. // We may exit this loop without persisting the certificate in case the // corresponding block has been pruned in the meantime. - while self.persisted.should_be_persisted(&cert) { + while blocks_persisted.should_be_persisted(&cert) { use consensus_dal::InsertCertificateError as E; // Try to insert the cert. - let res = self - .pool + let res = pool .connection(ctx) - .await - .wrap("connection")? - .insert_certificate(ctx, &cert) + .await? + .insert_block_certificate(ctx, &cert) .await; match res { Ok(()) => { // Insertion succeeded: update persisted state // and wait for the next cert. - self.persisted.advance(cert); + blocks_persisted.advance(cert); break; } Err(InsertCertificateError::Inner(E::MissingPayload)) => { @@ -195,6 +293,7 @@ impl StoreRunner { } }) .await; + match res { Err(ctx::Error::Canceled(_)) | Ok(()) => Ok(()), Err(ctx::Error::Internal(err)) => Err(err), @@ -206,17 +305,15 @@ impl StoreRunner { impl storage::PersistentBlockStore for Store { async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { Ok(self - .pool - .connection(ctx) - .await - .wrap("connection")? + .conn(ctx) + .await? .genesis(ctx) .await? .context("not found")?) } fn persisted(&self) -> sync::watch::Receiver { - self.persisted.clone() + self.blocks_persisted.clone() } async fn block( @@ -225,10 +322,8 @@ impl storage::PersistentBlockStore for Store { number: validator::BlockNumber, ) -> ctx::Result { Ok(self - .pool - .connection(ctx) - .await - .wrap("connection")? + .conn(ctx) + .await? .block(ctx, number) .await? .context("not found")?) @@ -247,14 +342,14 @@ impl storage::PersistentBlockStore for Store { ctx: &ctx::Ctx, block: validator::FinalBlock, ) -> ctx::Result<()> { - let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + let mut payloads = sync::lock(ctx, &self.block_payloads).await?.into_async(); if let Some(payloads) = &mut *payloads { payloads .send(to_fetched_block(block.number(), &block.payload).context("to_fetched_block")?) .await .context("payload_queue.send()")?; } - self.certificates.send(block.justification); + self.block_certificates.send(block.justification); Ok(()) } } @@ -262,20 +357,16 @@ impl storage::PersistentBlockStore for Store { #[async_trait::async_trait] impl storage::ReplicaStore for Store { async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result { - self.pool - .connection(ctx) - .await - .wrap("connection()")? + self.conn(ctx) + .await? .replica_state(ctx) .await .wrap("replica_state()") } async fn set_state(&self, ctx: &ctx::Ctx, state: &storage::ReplicaState) -> ctx::Result<()> { - self.pool - .connection(ctx) - .await - .wrap("connection()")? + self.conn(ctx) + .await? .set_replica_state(ctx, state) .await .wrap("set_replica_state()") @@ -321,7 +412,7 @@ impl PayloadManager for Store { block_number: validator::BlockNumber, payload: &validator::Payload, ) -> ctx::Result<()> { - let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + let mut payloads = sync::lock(ctx, &self.block_payloads).await?.into_async(); if let Some(payloads) = &mut *payloads { let block = to_fetched_block(block_number, payload).context("to_fetched_block")?; let n = block.number; @@ -346,44 +437,106 @@ impl PayloadManager for Store { } } -// Dummy implementation #[async_trait::async_trait] impl storage::PersistentBatchStore for Store { - async fn last_batch(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - unimplemented!() + /// Range of batches persisted in storage. + fn persisted(&self) -> sync::watch::Receiver { + // Normally we'd return this, but it causes the following test to run forever: + // RUST_LOG=info zk test rust test_full_nodes --no-capture + // + // The error seems to be related to the size of messages, although I'm not sure + // why it retries it forever. Since the gossip of SyncBatch is not fully functional + // yet, for now let's just return a fake response that never changes, which should + // disable gossiping on honest nodes. + let _ = self.batches_persisted.clone(); + + sync::watch::channel(storage::BatchStoreState { + first: attester::BatchNumber(0), + last: None, + }) + .1 + } + + /// Get the highest L1 batch number from storage. + async fn last_batch(&self, ctx: &ctx::Ctx) -> ctx::Result> { + self.conn(ctx) + .await? + .get_last_batch_number(ctx) + .await + .wrap("get_last_batch_number") } - async fn last_batch_qc(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - unimplemented!() + + /// Get the L1 batch QC from storage with the highest number. + /// + /// This might have gaps before it. Until there is a way to catch up with missing + /// certificates by fetching from the main node, returning the last inserted one + /// is the best we can do. + async fn last_batch_qc(&self, ctx: &ctx::Ctx) -> ctx::Result> { + let Some(number) = self + .conn(ctx) + .await? + .get_last_batch_certificate_number(ctx) + .await + .wrap("get_last_batch_certificate_number")? + else { + return Ok(None); + }; + + self.get_batch_qc(ctx, number).await } + + /// Returns the batch with the given number. async fn get_batch( &self, - _ctx: &ctx::Ctx, - _number: attester::BatchNumber, + ctx: &ctx::Ctx, + number: attester::BatchNumber, ) -> ctx::Result> { - Ok(None) + self.conn(ctx) + .await? + .get_batch(ctx, number) + .await + .wrap("get_batch") } + + /// Returns the QC of the batch with the given number. async fn get_batch_qc( &self, - _ctx: &ctx::Ctx, - _number: attester::BatchNumber, + ctx: &ctx::Ctx, + number: attester::BatchNumber, ) -> ctx::Result> { - Ok(None) - } - async fn store_qc(&self, _ctx: &ctx::Ctx, _qc: attester::BatchQC) -> ctx::Result<()> { - unimplemented!() + self.conn(ctx) + .await? + .batch_certificate(ctx, number) + .await + .wrap("batch_certificate") } - fn persisted(&self) -> sync::watch::Receiver { - sync::watch::channel(storage::BatchStoreState { - first: attester::BatchNumber(0), - last: None, - }) - .1 + + /// Store the given QC in the storage. + /// + /// Storing a QC is allowed even if it creates a gap in the L1 batch history. + /// If we need the last batch QC that still needs to be signed then the queries need to look for gaps. + async fn store_qc(&self, _ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { + // Storing asynchronously because we might get the QC before the L1 batch itself. + self.batch_certificates.send(qc); + Ok(()) } + + /// Queue the batch to be persisted in storage. + /// + /// The caller [BatchStore] ensures that this is only called when the batch is the next expected one. async fn queue_next_batch( &self, _ctx: &ctx::Ctx, _batch: attester::SyncBatch, ) -> ctx::Result<()> { - Err(anyhow::format_err!("unimplemented").into()) + // Currently the gossiping of `SyncBatch` and the `BatchStoreState` is unconditionally started by the `Network::run_stream` in consensus, + // and as long as any node reports new batches available by updating the `PersistentBatchStore::persisted` here, the other nodes + // will start pulling the corresponding batches, which will end up being passed to this method. + // If we return an error here or panic, it will stop the whole consensus task tree due to the way scopes work, so instead just return immediately. + // In the future we have to validate the proof agains the L1 state root hash, which IIUC we can't do just yet. + + // Err(anyhow::format_err!("unimplemented: queue_next_batch should not be called until we have the stateless L1 batch story completed.").into()) + + Ok(()) } } diff --git a/core/node/consensus/src/storage/testonly.rs b/core/node/consensus/src/storage/testonly.rs index 2f632b84a4d5..c73d20982c16 100644 --- a/core/node/consensus/src/storage/testonly.rs +++ b/core/node/consensus/src/storage/testonly.rs @@ -48,7 +48,7 @@ impl ConnectionPool { } /// Waits for the `number` L2 block to have a certificate. - pub async fn wait_for_certificate( + pub async fn wait_for_block_certificate( &self, ctx: &ctx::Ctx, number: validator::BlockNumber, @@ -58,9 +58,9 @@ impl ConnectionPool { .connection(ctx) .await .wrap("connection()")? - .certificate(ctx, number) + .block_certificate(ctx, number) .await - .wrap("certificate()")? + .wrap("block_certificate()")? .is_none() { ctx.sleep(POLL_INTERVAL).await?; @@ -119,15 +119,15 @@ impl ConnectionPool { } /// Waits for `want_last` block to have certificate then fetches all L2 blocks with certificates. - pub async fn wait_for_certificates( + pub async fn wait_for_block_certificates( &self, ctx: &ctx::Ctx, want_last: validator::BlockNumber, ) -> ctx::Result> { - self.wait_for_certificate(ctx, want_last).await?; + self.wait_for_block_certificate(ctx, want_last).await?; let mut conn = self.connection(ctx).await.wrap("connection()")?; let range = conn - .certificates_range(ctx) + .block_certificates_range(ctx) .await .wrap("certificates_range()")?; assert_eq!(want_last.next(), range.next()); @@ -141,12 +141,12 @@ impl ConnectionPool { } /// Same as `wait_for_certificates`, but additionally verifies all the blocks against genesis. - pub async fn wait_for_certificates_and_verify( + pub async fn wait_for_block_certificates_and_verify( &self, ctx: &ctx::Ctx, want_last: validator::BlockNumber, ) -> ctx::Result> { - let blocks = self.wait_for_certificates(ctx, want_last).await?; + let blocks = self.wait_for_block_certificates(ctx, want_last).await?; let genesis = self .connection(ctx) .await diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 81084b8f599a..7ca518a183a7 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -99,6 +99,12 @@ pub(super) fn config(cfg: &network::Config) -> (config::ConsensusConfig, config: key: config::ValidatorPublicKey(key.public().encode()), weight: 1, }], + // We only have access to the main node attester key in the `cfg`, which is fine + // for validators because at the moment there is only one leader. It doesn't + // allow us to form a full attester committee. However in the current tests + // the `new_configs` used to produce the array of `network::Config` doesn't + // assign an attester key, so it doesn't matter. + attesters: Vec::new(), leader: config::ValidatorPublicKey(key.public().encode()), }), rpc: None, @@ -109,6 +115,10 @@ pub(super) fn config(cfg: &network::Config) -> (config::ConsensusConfig, config: .validator_key .as_ref() .map(|k| config::ValidatorSecretKey(k.encode().into())), + attester_key: cfg + .attester_key + .as_ref() + .map(|k| config::AttesterSecretKey(k.encode().into())), }, ) } diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 3f57e4beeade..5506ec6ee8f4 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -1,5 +1,4 @@ use anyhow::Context as _; -use storage::Store; use test_casing::{test_casing, Product}; use tracing::Instrument as _; use zksync_concurrency::{ctx, error::Wrap, scope}; @@ -13,8 +12,11 @@ use zksync_consensus_roles::{ use zksync_consensus_storage::BlockStore; use zksync_types::{L1BatchNumber, ProtocolVersionId}; -use super::*; -use crate::{mn::run_main_node, storage::ConnectionPool}; +use crate::{ + mn::run_main_node, + storage::{ConnectionPool, Store}, + testonly, +}; const VERSIONS: [ProtocolVersionId; 2] = [ProtocolVersionId::latest(), ProtocolVersionId::next()]; const FROM_SNAPSHOT: [bool; 2] = [true, false]; @@ -71,7 +73,7 @@ async fn test_validator_block_store(version: ProtocolVersionId) { .await .unwrap(); let got = pool - .wait_for_certificates(ctx, block.number()) + .wait_for_block_certificates(ctx, block.number()) .await .unwrap(); assert_eq!(want[..=i], got); @@ -82,6 +84,68 @@ async fn test_validator_block_store(version: ProtocolVersionId) { } } +#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] +#[tokio::test] +async fn test_connection_get_batch(from_snapshot: bool, version: ProtocolVersionId) { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let pool = ConnectionPool::test(from_snapshot, version).await; + + // Fill storage with unsigned L2 blocks and L1 batches in a way that the + // last L1 batch is guaranteed to have some L2 blocks executed in it. + scope::run!(ctx, |ctx, s| async { + // Start state keeper. + let (mut sk, runner) = testonly::StateKeeper::new(ctx, pool.clone()).await?; + s.spawn_bg(runner.run(ctx)); + + for _ in 0..3 { + for _ in 0..2 { + sk.push_random_block(rng).await; + } + sk.seal_batch().await; + } + sk.push_random_block(rng).await; + + pool.wait_for_payload(ctx, sk.last_block()).await?; + + Ok(()) + }) + .await + .unwrap(); + + // Now we can try to retrieve the batch. + scope::run!(ctx, |ctx, _s| async { + let mut conn = pool.connection(ctx).await?; + let batches = conn.batches_range(ctx).await?; + let last = batches.last.expect("last is set"); + let (min, max) = conn + .get_l2_block_range_of_l1_batch(ctx, last.number) + .await? + .unwrap(); + + assert_eq!( + last.payloads.len(), + (max.0 - min.0) as usize, + "all block payloads present" + ); + + let first_payload = last.payloads.first().expect("last batch has payloads"); + + let want_payload = conn.payload(ctx, min).await?.expect("payload is in the DB"); + let want_payload = want_payload.encode(); + + assert_eq!( + first_payload, &want_payload, + "first payload is the right number" + ); + + anyhow::Ok(()) + }) + .await + .unwrap(); +} + // In the current implementation, consensus certificates are created asynchronously // for the L2 blocks constructed by the StateKeeper. This means that consensus actor // is effectively just back filling the consensus certificates for the L2 blocks in storage. @@ -119,24 +183,24 @@ async fn test_validator(from_snapshot: bool, version: ProtocolVersionId) { tracing::info!("Generate couple more blocks and wait for consensus to catch up."); sk.push_random_blocks(rng, 3).await; pool - .wait_for_certificate(ctx, sk.last_block()) + .wait_for_block_certificate(ctx, sk.last_block()) .await - .context("wait_for_certificate(<2nd phase>)")?; + .context("wait_for_block_certificate(<2nd phase>)")?; tracing::info!("Synchronously produce blocks one by one, and wait for consensus."); for _ in 0..2 { sk.push_random_blocks(rng, 1).await; pool - .wait_for_certificate(ctx, sk.last_block()) + .wait_for_block_certificate(ctx, sk.last_block()) .await - .context("wait_for_certificate(<3rd phase>)")?; + .context("wait_for_block_certificate(<3rd phase>)")?; } tracing::info!("Verify all certificates"); pool - .wait_for_certificates_and_verify(ctx, sk.last_block()) + .wait_for_block_certificates_and_verify(ctx, sk.last_block()) .await - .context("wait_for_certificates_and_verify()")?; + .context("wait_for_block_certificates_and_verify()")?; Ok(()) }) .await @@ -171,7 +235,7 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) { validator.push_random_blocks(rng, 5).await; validator.seal_batch().await; validator_pool - .wait_for_certificate(ctx, validator.last_block()) + .wait_for_block_certificate(ctx, validator.last_block()) .await?; tracing::info!("take snapshot and start a node from it"); @@ -189,7 +253,7 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) { validator.push_random_blocks(rng, 5).await; validator.seal_batch().await; node_pool - .wait_for_certificate(ctx, validator.last_block()) + .wait_for_block_certificate(ctx, validator.last_block()) .await?; tracing::info!("take another snapshot and start a node from it"); @@ -206,15 +270,15 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) { tracing::info!("produce more blocks and compare storages"); validator.push_random_blocks(rng, 5).await; let want = validator_pool - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?; // node stores should be suffixes for validator store. for got in [ node_pool - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?, node_pool2 - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?, ] { assert_eq!(want[want.len() - got.len()..], got[..]); @@ -296,12 +360,12 @@ async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId) { validator.push_random_blocks(rng, 5).await; let want_last = validator.last_block(); let want = validator_pool - .wait_for_certificates_and_verify(ctx, want_last) + .wait_for_block_certificates_and_verify(ctx, want_last) .await?; for pool in &node_pools { assert_eq!( want, - pool.wait_for_certificates_and_verify(ctx, want_last) + pool.wait_for_block_certificates_and_verify(ctx, want_last) .await? ); } @@ -382,12 +446,12 @@ async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId) { main_node.push_random_blocks(rng, 5).await; let want_last = main_node.last_block(); let want = main_node_pool - .wait_for_certificates_and_verify(ctx, want_last) + .wait_for_block_certificates_and_verify(ctx, want_last) .await?; for pool in &ext_node_pools { assert_eq!( want, - pool.wait_for_certificates_and_verify(ctx, want_last) + pool.wait_for_block_certificates_and_verify(ctx, want_last) .await? ); } @@ -429,7 +493,7 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV s.spawn_bg(node.run_consensus(ctx, client.clone(), &node_cfg)); validator.push_random_blocks(rng, 3).await; node_pool - .wait_for_certificate(ctx, validator.last_block()) + .wait_for_block_certificate(ctx, validator.last_block()) .await?; Ok(()) }) @@ -457,10 +521,10 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV s.spawn_bg(node.run_consensus(ctx, client.clone(), &node_cfg)); validator.push_random_blocks(rng, 3).await; let want = validator_pool - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?; let got = node_pool - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?; assert_eq!(want, got); Ok(()) @@ -549,9 +613,9 @@ async fn test_with_pruning(version: ProtocolVersionId) { .context("prune_batches")?; validator.push_random_blocks(rng, 5).await; node_pool - .wait_for_certificates(ctx, validator.last_block()) + .wait_for_block_certificates(ctx, validator.last_block()) .await - .context("wait_for_certificates()")?; + .context("wait_for_block_certificates()")?; Ok(()) }) .await