Skip to content

Commit

Permalink
Simplified block storage (#53)
Browse files Browse the repository at this point in the history
* simplified the storage traits, by moving the inmem caching
implementation to era-consensus
* extracted PayloadManager trait responsible for proposing/verifying
payloads.
* clarified the expected persistence guarantees of the
PersistentBlockStore trait
* simplified the block caching to maintain a continuous range of blocks
* simplified the sync_blocks implementation as a result of previous
simplifications
* moved rocksdb storage implementation to tools crate, given that the
executor binary is there anyway.
* Removed genesis block from config to simplify configuration
* Removed redundant header from FinalBlock.
* Made bft actor wait for past blocks to be stored before calling
propose/verify to more accurately reflect the expected implementation of
a stateful blockchain (and to simplify implementation of the
PayloadManager trait)

---------

Co-authored-by: Bruno França <[email protected]>
  • Loading branch information
pompon0 and brunoffranca authored Jan 9, 2024
1 parent cf8340d commit 5727a3e
Show file tree
Hide file tree
Showing 78 changed files with 3,399 additions and 4,121 deletions.
10 changes: 6 additions & 4 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,4 @@ wildcard_dependencies = "warn"
# Produces too many false positives.
redundant_locals = "allow"
needless_pass_by_ref_mut = "allow"
box_default = "allow"
33 changes: 16 additions & 17 deletions node/actors/bft/src/inner.rs → node/actors/bft/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
//! The inner data of the consensus state machine. This is shared between the different roles.
use crate::{io::OutputMessage, misc};
use crate::{misc, PayloadManager};
use std::sync::Arc;
use tracing::instrument;
use zksync_concurrency::ctx::channel;
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

/// The ConsensusInner struct, it contains data to be shared with the state machines. This is never supposed
/// to be modified, except by the Consensus struct.
/// Configuration of the bft actor.
#[derive(Debug)]
pub(crate) struct ConsensusInner {
/// The communication pipe. This is used to send outputs.
pub(crate) pipe: channel::UnboundedSender<OutputMessage>,
pub struct Config {
/// The validator's secret key.
pub(crate) secret_key: validator::SecretKey,
pub secret_key: validator::SecretKey,
/// A vector of public keys for all the validators in the network.
pub(crate) validator_set: validator::ValidatorSet,
pub validator_set: validator::ValidatorSet,
/// Block store.
pub block_store: Arc<storage::BlockStore>,
/// Replica store.
pub replica_store: Box<dyn storage::ReplicaStore>,
/// Payload manager.
pub payload_manager: Box<dyn PayloadManager>,
}

impl ConsensusInner {
impl Config {
/// The maximum size of the payload of a block, in bytes. We will
/// reject blocks with payloads larger than this.
pub(crate) const PAYLOAD_MAX_SIZE: usize = 500 * zksync_protobuf::kB;
Expand All @@ -33,16 +36,12 @@ impl ConsensusInner {
/// for a given number of replicas.
#[instrument(level = "trace", ret)]
pub fn threshold(&self) -> usize {
let num_validators = self.validator_set.len();

misc::consensus_threshold(num_validators)
misc::consensus_threshold(self.validator_set.len())
}

/// Calculate the maximum number of faulty replicas, for a given number of replicas.
#[instrument(level = "trace", ret)]
pub fn faulty_replicas(&self) -> usize {
let num_validators = self.validator_set.len();

misc::faulty_replicas(num_validators)
misc::faulty_replicas(self.validator_set.len())
}
}
14 changes: 7 additions & 7 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl StateMachine {

// Check that the message signer is in the validator set.
let validator_index =
self.inner
self.config
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
Expand All @@ -84,7 +84,7 @@ impl StateMachine {
}

// If the message is for a view when we are not a leader, we discard it.
if self.inner.view_leader(message.view) != self.inner.secret_key.public() {
if self.config.view_leader(message.view) != self.config.secret_key.public() {
return Err(Error::NotLeaderInView);
}

Expand All @@ -109,7 +109,7 @@ impl StateMachine {
// We add the message to the incrementally-constructed QC.
self.commit_qcs
.entry(message.view)
.or_insert(CommitQC::new(message, &self.inner.validator_set))
.or_insert(CommitQC::new(message, &self.config.validator_set))
.add(&signed_message.sig, validator_index);

// We store the message in our cache.
Expand All @@ -123,11 +123,11 @@ impl StateMachine {
}
let Some((_, replica_messages)) = by_proposal
.into_iter()
.find(|(_, v)| v.len() >= self.inner.threshold())
.find(|(_, v)| v.len() >= self.config.threshold())
else {
return Ok(());
};
debug_assert_eq!(replica_messages.len(), self.inner.threshold());
debug_assert_eq!(replica_messages.len(), self.config.threshold());

// ----------- Update the state machine --------------

Expand All @@ -151,7 +151,7 @@ impl StateMachine {
// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: self
.inner
.config
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderCommit(
validator::LeaderCommit {
Expand All @@ -161,7 +161,7 @@ impl StateMachine {
)),
recipient: Target::Broadcast,
};
self.inner.pipe.send(output_message.into());
self.pipe.send(output_message.into());

// Clean the caches.
self.prepare_message_cache.retain(|k, _| k >= &self.view);
Expand Down
12 changes: 6 additions & 6 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl StateMachine {

// Check that the message signer is in the validator set.
let validator_index =
self.inner
self.config
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
Expand All @@ -109,7 +109,7 @@ impl StateMachine {
}

// If the message is for a view when we are not a leader, we discard it.
if self.inner.view_leader(message.view) != self.inner.secret_key.public() {
if self.config.view_leader(message.view) != self.config.secret_key.public() {
return Err(Error::NotLeaderInView);
}

Expand All @@ -134,7 +134,7 @@ impl StateMachine {
// Verify the high QC.
message
.high_qc
.verify(&self.inner.validator_set, self.inner.threshold())
.verify(&self.config.validator_set, self.config.threshold())
.map_err(Error::InvalidHighQC)?;

// If the high QC is for a future view, we discard the message.
Expand All @@ -153,7 +153,7 @@ impl StateMachine {
self.prepare_qcs.entry(message.view).or_default().add(
&signed_message,
validator_index,
&self.inner.validator_set,
&self.config.validator_set,
);

// We store the message in our cache.
Expand All @@ -165,15 +165,15 @@ impl StateMachine {
// Now we check if we have enough messages to continue.
let num_messages = self.prepare_message_cache.get(&message.view).unwrap().len();

if num_messages < self.inner.threshold() {
if num_messages < self.config.threshold() {
return Ok(());
}

// Remove replica prepare messages for this view, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
self.prepare_message_cache.remove(&message.view);

debug_assert_eq!(num_messages, self.inner.threshold());
debug_assert_eq!(num_messages, self.config.threshold());

// ----------- Update the state machine --------------

Expand Down
52 changes: 30 additions & 22 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{metrics, ConsensusInner, PayloadSource};
use crate::{metrics, Config, OutputSender};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
Expand All @@ -7,14 +7,16 @@ use std::{
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, CommitQC, PrepareQC};
use zksync_consensus_roles::validator;

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
/// those messages. When participating in consensus we are not the leader most of the time.
pub(crate) struct StateMachine {
/// Consensus configuration and output channel.
pub(crate) inner: Arc<ConsensusInner>,
pub(crate) config: Arc<Config>,
/// Pipe through with leader sends network messages.
pub(crate) pipe: OutputSender,
/// The current view number. This might not match the replica's view number, we only have this here
/// to make the leader advance monotonically in time and stop it from accepting messages from the past.
pub(crate) view: validator::ViewNumber,
Expand All @@ -29,24 +31,25 @@ pub(crate) struct StateMachine {
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaPrepare>>,
>,
/// Prepare QCs indexed by view number.
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, PrepareQC>,
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, validator::PrepareQC>,
/// Newest prepare QC composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<PrepareQC>>,
pub(crate) prepare_qc: sync::watch::Sender<Option<validator::PrepareQC>>,
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaCommit>>,
>,
/// Commit QCs indexed by view number.
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, CommitQC>,
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, validator::CommitQC>,
}

impl StateMachine {
/// Creates a new StateMachine struct.
#[instrument(level = "trace")]
pub fn new(ctx: &ctx::Ctx, inner: Arc<ConsensusInner>) -> Self {
pub fn new(ctx: &ctx::Ctx, config: Arc<Config>, pipe: OutputSender) -> Self {
StateMachine {
inner,
config,
pipe,
view: validator::ViewNumber(0),
phase: validator::Phase::Prepare,
phase_start: ctx.now(),
Expand Down Expand Up @@ -106,9 +109,9 @@ impl StateMachine {
/// that the validator doesn't spend time on generating payloads for already expired views.
pub(crate) async fn run_proposer(
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
mut prepare_qc: sync::watch::Receiver<Option<PrepareQC>>,
config: &Config,
mut prepare_qc: sync::watch::Receiver<Option<validator::PrepareQC>>,
pipe: &OutputSender,
) -> ctx::Result<()> {
let mut next_view = validator::ViewNumber(0);
loop {
Expand All @@ -119,17 +122,17 @@ impl StateMachine {
continue;
};
next_view = prepare_qc.view().next();
Self::propose(ctx, inner, payload_source, prepare_qc).await?;
Self::propose(ctx, config, prepare_qc, pipe).await?;
}
}

/// Sends a LeaderPrepare for the given PrepareQC.
/// Uses `payload_source` to generate a payload if needed.
pub(crate) async fn propose(
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
justification: PrepareQC,
cfg: &Config,
justification: validator::PrepareQC,
pipe: &OutputSender,
) -> ctx::Result<()> {
// Get the highest block voted for and check if there's a quorum of votes for it. To have a quorum
// in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas.
Expand All @@ -141,11 +144,11 @@ impl StateMachine {
let highest_vote: Option<validator::BlockHeader> = count
.iter()
// We only take one value from the iterator because there can only be at most one block with a quorum of 2f+1 votes.
.find_map(|(h, v)| (*v > 2 * inner.faulty_replicas()).then_some(h))
.find_map(|(h, v)| (*v > 2 * cfg.faulty_replicas()).then_some(h))
.cloned();

// Get the highest CommitQC.
let highest_qc: &CommitQC = justification
// Get the highest validator::CommitQC.
let highest_qc: &validator::CommitQC = justification
.map
.keys()
.map(|s| &s.high_qc)
Expand All @@ -162,8 +165,13 @@ impl StateMachine {
Some(proposal) if proposal != highest_qc.message.proposal => (proposal, None),
// The previous block was finalized, so we can propose a new block.
_ => {
let payload = payload_source
.propose(ctx, highest_qc.message.proposal.number.next())
// Defensively assume that PayloadManager cannot propose until the previous block is stored.
cfg.block_store
.wait_until_persisted(ctx, highest_qc.header().number)
.await?;
let payload = cfg
.payload_manager
.propose(ctx, highest_qc.header().number.next())
.await?;
metrics::METRICS
.leader_proposal_payload_size
Expand All @@ -177,7 +185,7 @@ impl StateMachine {
// ----------- Prepare our message and send it --------------

// Broadcast the leader prepare message to all replicas (ourselves included).
let msg = inner
let msg = cfg
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderPrepare(
validator::LeaderPrepare {
Expand All @@ -188,7 +196,7 @@ impl StateMachine {
justification,
},
));
inner.pipe.send(
pipe.send(
ConsensusInputMessage {
message: msg,
recipient: Target::Broadcast,
Expand Down
Loading

0 comments on commit 5727a3e

Please sign in to comment.