Skip to content

Commit

Permalink
misc: refactored client state DB interface, converted some things ove…
Browse files Browse the repository at this point in the history
…r to use client state manager where feasible
  • Loading branch information
delbonis committed Jan 29, 2025
1 parent 01c2811 commit 5dc2247
Show file tree
Hide file tree
Showing 14 changed files with 446 additions and 370 deletions.
216 changes: 106 additions & 110 deletions crates/consensus-logic/src/csm/client_transition.rs

Large diffs are not rendered by default.

108 changes: 35 additions & 73 deletions crates/consensus-logic/src/csm/state_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use strata_common::bail_manager::{check_bail_trigger, BAIL_SYNC_EVENT};
use strata_db::traits::*;
use strata_primitives::params::Params;
use strata_state::{
client_state::ClientState,
client_state::{ClientState, ClientStateMut},
operation::{self, ClientUpdateOutput},
};
use strata_storage::ClientStateManager;
use tracing::*;

use super::client_transition;
Expand All @@ -20,6 +21,8 @@ pub struct StateTracker<D: Database> {
params: Arc<Params>,
database: Arc<D>,

client_state_manager: Arc<ClientStateManager>,

cur_state_idx: u64,

cur_state: Arc<ClientState>,
Expand All @@ -29,12 +32,14 @@ impl<D: Database> StateTracker<D> {
pub fn new(
params: Arc<Params>,
database: Arc<D>,
client_state_manager: Arc<ClientStateManager>,
cur_state_idx: u64,
cur_state: Arc<ClientState>,
) -> Self {
Self {
params,
database,
client_state_manager,
cur_state_idx,
cur_state,
}
Expand All @@ -50,6 +55,7 @@ impl<D: Database> StateTracker<D> {

/// Given the next event index, computes the state application if the
/// requisite data is available. Returns the output and the new state.
// TODO maybe remove output return value
pub fn advance_consensus_state(
&mut self,
ev_idx: u64,
Expand All @@ -62,40 +68,39 @@ impl<D: Database> StateTracker<D> {
// Load the event from the database.
let db = self.database.as_ref();
let sync_event_db = db.sync_event_db();
let client_state_db = db.client_state_db();
let ev = sync_event_db
.get_sync_event(ev_idx)?
.ok_or(Error::MissingSyncEvent(ev_idx))?;

debug!(?ev, "Processing event");
debug!(?ev, "processing sync event");

#[cfg(feature = "debug-utils")]
check_bail_trigger(BAIL_SYNC_EVENT);

// Compute the state transition.
let outp = client_transition::process_event(&self.cur_state, &ev, db, &self.params)?;
let mut state_mut = ClientStateMut::new(self.cur_state.as_ref().clone());
client_transition::process_event(&mut state_mut, &ev, db, &self.params)?;

// Clone the state and apply the operations to it.
let mut new_state = self.cur_state.as_ref().clone();
operation::apply_writes_to_state(&mut new_state, outp.writes().iter().cloned());
let outp = state_mut.into_update();

// Store the outputs.
let state = self
.client_state_manager
.put_update_blocking(ev_idx, outp.clone())?;

// Update bookkeeping.
self.cur_state = Arc::new(new_state);
self.cur_state = state;
self.cur_state_idx = ev_idx;
debug!(%ev_idx, "computed new consensus state");

// Store the outputs.
// TODO ideally avoid clone
client_state_db.write_client_update_output(ev_idx, outp.clone())?;

Ok((outp, self.cur_state.clone()))
}

/// Writes the current state to the database as a new checkpoint.
/// Does nothing.
// TODO remove this function
pub fn store_checkpoint(&self) -> anyhow::Result<()> {
let client_state_db = self.database.client_state_db();
let state = self.cur_state.as_ref().clone(); // TODO avoid clone
client_state_db.write_client_state_checkpoint(self.cur_state_idx, state)?;
warn!("tried to store client state checkpoint, we don't have this anymore");
Ok(())
}
}
Expand All @@ -113,71 +118,28 @@ impl<D: Database> StateTracker<D> {
/// - `cs_db`: An implementation of the [`ClientStateDatabase`] trait, used for retrieving
/// checkpoint and state data.
/// - `idx`: The index from which to replay state writes, starting from the last checkpoint.
pub fn reconstruct_cur_state(
cs_db: &impl ClientStateDatabase,
) -> anyhow::Result<(u64, ClientState)> {
let last_ckpt_idx = cs_db.get_last_checkpoint_idx()?;
pub fn reconstruct_cur_state(csman: &ClientStateManager) -> anyhow::Result<(u64, ClientState)> {
let last_state_idx = csman.get_last_state_idx_blocking()?;

// genesis state.
if last_ckpt_idx == 0 {
// We used to do something here, but now we just print a log.
if last_state_idx == 0 {
debug!("starting from init state");
let state = cs_db
.get_state_checkpoint(0)?
.ok_or(Error::MissingCheckpoint(0))?;
return Ok((0, state));
}

// If we're not in genesis, then we probably have to replay some writes.
let last_write_idx = cs_db.get_last_write_idx()?;

let state = reconstruct_state(cs_db, last_write_idx)?;

Ok((last_write_idx, state))
let state = csman
.get_state_blocking(last_state_idx)?
.ok_or(Error::MissingConsensusWrites(last_state_idx))?;
Ok((last_state_idx, state))
}

/// Reconstructs the
/// [`ClientStateWrite`](strata_state::operation::ClientStateWrite)
///
/// Under the hood fetches the nearest checkpoint before the reuested idx
/// and then replays all the [`ClientStateWrite`](strata_state::operation::ClientStateWrite)s
/// from that checkpoint up to the requested index `idx`
/// such that we have accurate [`ClientState`].
///
/// # Parameters
///
/// - `cs_db`: anything that implements the [`ClientStateDatabase`] trait.
/// - `idx`: index to look ahead from.
pub fn reconstruct_state(
cs_db: &impl ClientStateDatabase,
idx: u64,
) -> anyhow::Result<ClientState> {
match cs_db.get_state_checkpoint(idx)? {
Some(cl) => {
// if the checkpoint was created at the idx itself, return the checkpoint
debug!(%idx, "no writes to replay");
Ok(cl)
}
/// Fetches the client state at some idx from the database.
// TODO remove this
pub fn reconstruct_state(csman: &ClientStateManager, idx: u64) -> anyhow::Result<ClientState> {
match csman.get_state_blocking(idx)? {
Some(cl) => Ok(cl),
None => {
// get the previously written checkpoint
let prev_ckpt_idx = cs_db.get_prev_checkpoint_at(idx)?;

// get the previous checkpoint Client State
let mut state = cs_db
.get_state_checkpoint(prev_ckpt_idx)?
.ok_or(Error::MissingCheckpoint(idx))?;

// write the client state
let write_replay_start = prev_ckpt_idx + 1;
debug!(%prev_ckpt_idx, %idx, "reconstructing state from checkpoint");

for i in write_replay_start..=idx {
let writes = cs_db
.get_client_state_writes(i)?
.ok_or(Error::MissingConsensusWrites(i))?;
operation::apply_writes_to_state(&mut state, writes.into_iter());
}

Ok(state)
error!("we don't support state reconstruction anymore");
return Err(Error::MissingConsensusWrites(idx).into());
}
}
}
Expand Down
27 changes: 11 additions & 16 deletions crates/consensus-logic/src/csm/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use strata_eectl::engine::ExecEngineCtl;
use strata_primitives::prelude::*;
use strata_state::{client_state::ClientState, csm_status::CsmStatus, operation::SyncAction};
use strata_status::StatusChannel;
use strata_storage::{CheckpointDbManager, L2BlockManager};
use strata_storage::{CheckpointDbManager, ClientStateManager, L2BlockManager, NodeStorage};
use strata_tasks::ShutdownGuard;
use tokio::{
sync::{broadcast, mpsc},
Expand Down Expand Up @@ -43,8 +43,8 @@ pub struct WorkerState<D: Database> {
// TODO should we move this out?
database: Arc<D>,

/// L2 block manager.
l2_block_manager: Arc<L2BlockManager>,
/// Node storage handle.
storage: Arc<NodeStorage>,

/// Checkpoint manager.
checkpoint_manager: Arc<CheckpointDbManager>,
Expand All @@ -62,15 +62,16 @@ impl<D: Database> WorkerState<D> {
pub fn open(
params: Arc<Params>,
database: Arc<D>,
l2_block_manager: Arc<L2BlockManager>,
storage: Arc<NodeStorage>,
cupdate_tx: broadcast::Sender<Arc<ClientUpdateNotif>>,
checkpoint_manager: Arc<CheckpointDbManager>,
) -> anyhow::Result<Self> {
let client_state_db = database.client_state_db().as_ref();
let (cur_state_idx, cur_state) = state_tracker::reconstruct_cur_state(client_state_db)?;
let (cur_state_idx, cur_state) =
state_tracker::reconstruct_cur_state(storage.client_state())?;
let state_tracker = state_tracker::StateTracker::new(
params.clone(),
database.clone(),
client_state_manager.clone(),
cur_state_idx,
Arc::new(cur_state),
);
Expand All @@ -87,7 +88,7 @@ impl<D: Database> WorkerState<D> {
params,
config,
database,
l2_block_manager,
storage,
state_tracker,
cupdate_tx,
checkpoint_manager,
Expand Down Expand Up @@ -240,17 +241,10 @@ fn handle_sync_event<D: Database>(
// Make sure that the new state index is set as expected.
assert_eq!(state.state_tracker.cur_state_idx(), ev_idx);

// Write the client state checkpoint periodically based on the event idx..
if ev_idx % state.params.run.client_checkpoint_interval as u64 == 0 {
let client_state_db = state.database.client_state_db();
client_state_db.write_client_state_checkpoint(ev_idx, new_state.as_ref().clone())?;
}

// FIXME clean this up
// FIXME clean this up and make them take Arcs
let mut status = CsmStatus::default();
status.set_last_sync_ev_idx(ev_idx);
status.update_from_client_state(new_state.as_ref());

status_channel.update_client_state(new_state.as_ref().clone());

trace!(?new_state, "sending client update notif");
Expand Down Expand Up @@ -281,7 +275,8 @@ fn apply_action<D: Database>(
// TODO not sure what this should entail yet
warn!(?blkid, "marking block invalid!");
state
.l2_block_manager
.storage()
.l2()
.set_block_status_blocking(&blkid, BlockStatus::Invalid)?;
}

Expand Down
19 changes: 8 additions & 11 deletions crates/consensus-logic/src/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ use strata_state::{
genesis::GenesisStateData,
header::L2BlockHeader,
l1::{L1HeaderRecord, L1ViewState},
operation::ClientUpdateOutput,
prelude::*,
};
use strata_storage::{ClientStateManager, NodeStorage};
use tracing::*;

use crate::errors::Error;

/// Inserts into the database an initial basic client state that we can begin
/// waiting for genesis with.
pub fn init_client_state(params: &Params, database: &impl Database) -> anyhow::Result<()> {
pub fn init_client_state(params: &Params, csman: &ClientStateManager) -> anyhow::Result<()> {
debug!("initializing client state in database!");

let init_state = ClientState::from_genesis_params(
Expand All @@ -32,8 +34,7 @@ pub fn init_client_state(params: &Params, database: &impl Database) -> anyhow::R
);

// Write the state into the database.
let cs_db = database.client_state_db();
cs_db.write_client_state_checkpoint(0, init_state)?;
csman.put_update_blocking(0, ClientUpdateOutput::new_state(init_state))?;

Ok(())
}
Expand Down Expand Up @@ -160,12 +161,10 @@ pub fn make_genesis_chainstate(
}

/// Check if the database needs to have client init done to it.
pub fn check_needs_client_init(database: &impl Database) -> anyhow::Result<bool> {
let cs_db = database.client_state_db();

pub fn check_needs_client_init(storage: &NodeStorage) -> anyhow::Result<bool> {
// Check if we've written any genesis state checkpoint. These we perform a
// bit more carefully and check errors more granularly.
match cs_db.get_last_checkpoint_idx() {
match storage.client_state().get_last_state_idx_blocking() {
Ok(_) => {}
Err(DbError::NotBootstrapped) => return Ok(true),

Expand All @@ -176,11 +175,9 @@ pub fn check_needs_client_init(database: &impl Database) -> anyhow::Result<bool>
Ok(false)
}

pub fn check_needs_genesis(database: &impl Database) -> anyhow::Result<bool> {
let l2_db = database.l2_db();

pub fn check_needs_genesis(storage: &NodeStorage) -> anyhow::Result<bool> {
// Check if there's any genesis block written.
match l2_db.get_blocks_at_height(0) {
match storage.l2().get_blocks_at_height_blocking(0) {
Ok(blkids) => Ok(blkids.is_empty()),

Err(DbError::NotBootstrapped) => Ok(true),
Expand Down
4 changes: 2 additions & 2 deletions crates/consensus-logic/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub fn start_sync_tasks<
>(
executor: &TaskExecutor,
database: Arc<D>,
storage: &NodeStorage,
storage: &Arc<NodeStorage>,
engine: Arc<E>,
pool: threadpool::ThreadPool,
params: Arc<Params>,
Expand Down Expand Up @@ -122,7 +122,7 @@ pub fn start_sync_tasks<
let client_worker_state = worker::WorkerState::open(
params.clone(),
database,
storage.l2().clone(),
storage.clone(),
cupdate_tx,
storage.checkpoint().clone(),
)?;
Expand Down
32 changes: 6 additions & 26 deletions crates/db/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use strata_primitives::{
proof::{ProofContext, ProofKey},
};
use strata_state::{
block::L2BlockBundle, bridge_duties::BridgeDutyStatus, chain_state::Chainstate,
client_state::ClientState, l1::L1Tx, operation::*, prelude::*, state_op::WriteBatch,
sync_event::SyncEvent,
block::L2BlockBundle, bridge_duties::BridgeDutyStatus, chain_state::Chainstate, l1::L1Tx,
operation::*, prelude::*, state_op::WriteBatch, sync_event::SyncEvent,
};
use strata_zkvm::ProofReceipt;

Expand Down Expand Up @@ -124,33 +123,14 @@ pub trait ClientStateDatabase {
/// [``SyncEventDatabase``]. Will error if `idx - 1` does not exist (unless
/// `idx` is 0) or if trying to overwrite a state, as this is almost
/// certainly a bug.
fn write_client_update_output(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()>;
fn put_client_update(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()>;

/// Writes a new consensus checkpoint that we can cheaply resume from. Will
/// error if trying to overwrite a state.
fn write_client_state_checkpoint(&self, idx: u64, state: ClientState) -> DbResult<()>;
/// Gets the output client state writes for some input index.
fn get_client_update(&self, idx: u64) -> DbResult<Option<ClientUpdateOutput>>;

/// Gets the idx of the last written state. Or returns error if a bootstrap
/// state has not been written yet.
fn get_last_write_idx(&self) -> DbResult<u64>;

/// Gets the output client state writes for some input index.
fn get_client_state_writes(&self, idx: u64) -> DbResult<Option<Vec<ClientStateWrite>>>;

/// Gets the actions output from a client state transition.
fn get_client_update_actions(&self, idx: u64) -> DbResult<Option<Vec<SyncAction>>>;

/// Gets the last consensus checkpoint idx.
fn get_last_checkpoint_idx(&self) -> DbResult<u64>;

/// Gets the idx of the last checkpoint up to the given input idx. This is
/// the idx we should resume at when playing out consensus writes since the
/// saved checkpoint, which may be the same as the given idx (if we didn't
/// receive any sync events since the last checkpoint.
fn get_prev_checkpoint_at(&self, idx: u64) -> DbResult<u64>;

/// Gets a state checkpoint at a previously written index, if it exists.
fn get_state_checkpoint(&self, idx: u64) -> DbResult<Option<ClientState>>;
fn get_last_state_idx(&self) -> DbResult<u64>;
}

/// L2 data store for CL blocks. Does not store anything about what we think
Expand Down
Loading

0 comments on commit 5dc2247

Please sign in to comment.