Skip to content

Commit

Permalink
feat(en): Support arbitrary genesis block for external nodes (matter-…
Browse files Browse the repository at this point in the history
…labs#537)

## What ❔

Support non-zero genesis block specified in executor configuration.
Check whether this block exists on initialization; validate its
correspondence if it does, and persist consensus fields if it doesn't.

## Why ❔

This is necessary to support gossip-based syncing in practice; we likely
won't back-sign all blocks in all envs.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Nov 30, 2023
1 parent e2c1b20 commit 15d7eaf
Show file tree
Hide file tree
Showing 12 changed files with 611 additions and 110 deletions.
10 changes: 6 additions & 4 deletions core/lib/types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct MiniblockHeader {
}

/// Consensus-related L2 block (= miniblock) fields.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct ConsensusBlockFields {
/// Hash of the previous consensus block.
pub parent: validator::BlockHeaderHash,
Expand All @@ -99,12 +99,14 @@ pub struct ConsensusBlockFields {

impl ProtoFmt for ConsensusBlockFields {
type Proto = crate::proto::ConsensusBlockFields;
fn read(r: &Self::Proto) -> anyhow::Result<Self> {

fn read(proto: &Self::Proto) -> anyhow::Result<Self> {
Ok(Self {
parent: read_required(&r.parent).context("parent")?,
justification: read_required(&r.justification).context("justification")?,
parent: read_required(&proto.parent).context("parent")?,
justification: read_required(&proto.justification).context("justification")?,
})
}

fn build(&self) -> Self::Proto {
Self::Proto {
parent: Some(self.parent.build()),
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/consensus/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use zksync_types::api::en::SyncBlock;
use zksync_types::{Address, L1BatchNumber, Transaction, H256};

/// L2 block (= miniblock) payload.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub(crate) struct Payload {
pub hash: H256,
pub l1_batch_number: L1BatchNumber,
Expand Down
31 changes: 25 additions & 6 deletions core/lib/zksync_core/src/sync_layer/external_io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use futures::future;

use std::{
collections::HashMap,
Expand Down Expand Up @@ -108,7 +109,6 @@ impl ExternalIO {

async fn load_previous_l1_batch_hash(&self) -> U256 {
let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap();

let wait_latency = KEEPER_METRICS.wait_for_prev_hash_time.start();
let (hash, _) =
extractors::wait_for_prev_l1_batch_params(&mut storage, self.current_l1_batch_number)
Expand All @@ -117,6 +117,18 @@ impl ExternalIO {
hash
}

async fn load_previous_miniblock_hash(&self) -> H256 {
let prev_miniblock_number = self.current_miniblock_number - 1;
let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap();
let header = storage
.blocks_dal()
.get_miniblock_header(prev_miniblock_number)
.await
.unwrap()
.unwrap_or_else(|| panic!("Miniblock #{prev_miniblock_number} is missing"));
header.hash
}

async fn load_base_system_contracts_by_version_id(
&self,
id: ProtocolVersionId,
Expand Down Expand Up @@ -307,15 +319,20 @@ impl StateKeeperIO for ExternalIO {
operator_address,
protocol_version,
first_miniblock_info: (miniblock_number, virtual_blocks),
prev_miniblock_hash,
}) => {
assert_eq!(
number, self.current_l1_batch_number,
"Batch number mismatch"
);
tracing::info!("Getting previous L1 batch hash");
let previous_l1_batch_hash = self.load_previous_l1_batch_hash().await;
tracing::info!("Previous L1 batch hash: {previous_l1_batch_hash}");
tracing::info!("Getting previous L1 batch hash and miniblock hash");
let (previous_l1_batch_hash, previous_miniblock_hash) = future::join(
self.load_previous_l1_batch_hash(),
self.load_previous_miniblock_hash(),
)
.await;
tracing::info!(
"Previous L1 batch hash: {previous_l1_batch_hash}, previous miniblock hash: {previous_miniblock_hash}"
);

let base_system_contracts = self
.load_base_system_contracts_by_version_id(protocol_version)
Expand All @@ -328,7 +345,7 @@ impl StateKeeperIO for ExternalIO {
l1_gas_price,
l2_fair_gas_price,
miniblock_number,
prev_miniblock_hash,
previous_miniblock_hash,
base_system_contracts,
self.validation_computational_gas_limit,
protocol_version,
Expand Down Expand Up @@ -539,6 +556,8 @@ impl StateKeeperIO for ExternalIO {
// Mimic the metric emitted by the main node to reuse existing Grafana charts.
APP_METRICS.block_number[&BlockStage::Sealed].set(self.current_l1_batch_number.0.into());

self.sync_state
.set_local_block(self.current_miniblock_number);
self.current_miniblock_number += 1; // Due to fictive miniblock being sealed.
self.current_l1_batch_number += 1;
Ok(())
Expand Down
16 changes: 4 additions & 12 deletions core/lib/zksync_core/src/sync_layer/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
use zksync_dal::StorageProcessor;
use zksync_types::{
api::en::SyncBlock, block::ConsensusBlockFields, Address, L1BatchNumber, MiniblockNumber,
ProtocolVersionId, H256,
ProtocolVersionId,
};
use zksync_web3_decl::jsonrpsee::core::Error as RpcError;

Expand All @@ -29,7 +29,6 @@ pub(super) struct FetchedBlock {
pub last_in_batch: bool,
pub protocol_version: ProtocolVersionId,
pub timestamp: u64,
pub hash: H256,
pub l1_gas_price: u64,
pub l2_fair_gas_price: u64,
pub virtual_blocks: u32,
Expand All @@ -38,15 +37,14 @@ pub(super) struct FetchedBlock {
pub consensus: Option<ConsensusBlockFields>,
}

impl FetchedBlock {
fn from_sync_block(block: SyncBlock) -> Self {
impl From<SyncBlock> for FetchedBlock {
fn from(block: SyncBlock) -> Self {
Self {
number: block.number,
l1_batch_number: block.l1_batch_number,
last_in_batch: block.last_in_batch,
protocol_version: block.protocol_version,
timestamp: block.timestamp,
hash: block.hash.unwrap_or_default(),
l1_gas_price: block.l1_gas_price,
l2_fair_gas_price: block.l2_fair_gas_price,
virtual_blocks: block.virtual_blocks.unwrap_or(0),
Expand All @@ -64,7 +62,6 @@ impl FetchedBlock {
pub struct FetcherCursor {
// Fields are public for testing purposes.
pub(super) next_miniblock: MiniblockNumber,
pub(super) prev_miniblock_hash: H256,
pub(super) l1_batch: L1BatchNumber,
}

Expand Down Expand Up @@ -93,7 +90,6 @@ impl FetcherCursor {

// Miniblocks are always fully processed.
let next_miniblock = last_miniblock_header.number + 1;
let prev_miniblock_hash = last_miniblock_header.hash;
// Decide whether the next batch should be explicitly opened or not.
let l1_batch = if was_new_batch_open {
// No `OpenBatch` action needed.
Expand All @@ -106,7 +102,6 @@ impl FetcherCursor {
Ok(Self {
next_miniblock,
l1_batch,
prev_miniblock_hash,
})
}

Expand Down Expand Up @@ -136,7 +131,6 @@ impl FetcherCursor {
protocol_version: block.protocol_version,
// `block.virtual_blocks` can be `None` only for old VM versions where it's not used, so it's fine to provide any number.
first_miniblock_info: (block.number, block.virtual_blocks),
prev_miniblock_hash: self.prev_miniblock_hash,
});
FETCHER_METRICS.l1_batch[&L1BatchStage::Open].set(block.l1_batch_number.0.into());
self.l1_batch += 1;
Expand Down Expand Up @@ -168,7 +162,6 @@ impl FetcherCursor {
new_actions.push(SyncAction::SealMiniblock(block.consensus));
}
self.next_miniblock += 1;
self.prev_miniblock_hash = block.hash;

new_actions
}
Expand Down Expand Up @@ -280,8 +273,7 @@ impl MainNodeFetcher {
request_latency.observe();

let block_number = block.number;
let fetched_block = FetchedBlock::from_sync_block(block);
let new_actions = self.cursor.advance(fetched_block);
let new_actions = self.cursor.advance(block.into());

tracing::info!(
"New miniblock: {block_number} / {}",
Expand Down
2 changes: 2 additions & 0 deletions core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use zksync_concurrency::{
ctx::{self, channel},
scope,
sync::{self, watch},
testonly::abort_on_panic,
time,
};
use zksync_consensus_roles::validator::{BlockHeader, BlockNumber, FinalBlock, Payload};
Expand Down Expand Up @@ -131,6 +132,7 @@ async fn test_buffered_storage(
block_interval: time::Duration,
shuffle_blocks: impl FnOnce(&mut StdRng, &mut [FinalBlock]),
) {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

Expand Down
1 change: 0 additions & 1 deletion core/lib/zksync_core/src/sync_layer/gossip/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl FetchedBlock {
last_in_batch,
protocol_version: ProtocolVersionId::latest(), // FIXME
timestamp: payload.timestamp,
hash: payload.hash,
l1_gas_price: payload.l1_gas_price,
l2_fair_gas_price: payload.l2_fair_gas_price,
virtual_blocks: payload.virtual_blocks,
Expand Down
4 changes: 3 additions & 1 deletion core/lib/zksync_core/src/sync_layer/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ async fn run_gossip_fetcher_inner(
let cursor = FetcherCursor::new(&mut storage).await?;
drop(storage);

let store = PostgresBlockStorage::new(pool, actions, cursor);
let store =
PostgresBlockStorage::new(ctx, pool, actions, cursor, &executor_config.genesis_block)
.await?;
let buffered = Arc::new(Buffered::new(store));
let store = buffered.inner();
let executor = Executor::new(executor_config, node_key, buffered.clone())
Expand Down
Loading

0 comments on commit 15d7eaf

Please sign in to comment.