diff --git a/crates/consensus-logic/src/csm/client_transition.rs b/crates/consensus-logic/src/csm/client_transition.rs index 8d352dca6e..c57616dfed 100644 --- a/crates/consensus-logic/src/csm/client_transition.rs +++ b/crates/consensus-logic/src/csm/client_transition.rs @@ -23,14 +23,11 @@ use crate::{errors::*, genesis::make_genesis_block, l1_handler::verify_proof}; /// Processes the event given the current consensus state, producing some /// output. This can return database errors. pub fn process_event( - state: &ClientState, + state: &mut ClientStateMut, ev: &SyncEvent, database: &D, params: &Params, -) -> Result { - let mut writes = Vec::new(); - let mut actions = Vec::new(); - +) -> Result<(), Error> { match ev { SyncEvent::L1Block(height, l1blkid) => { // If the block is before the horizon we don't care about it. @@ -39,7 +36,7 @@ pub fn process_event( eprintln!("early L1 block at h={height}, you may have set up the test env wrong"); warn!(%height, "ignoring unexpected L1Block event before horizon"); - return Ok(ClientUpdateOutput::new(writes, actions)); + return Ok(()); } // FIXME this doesn't do any SPV checks to make sure we only go to @@ -49,14 +46,16 @@ pub fn process_event( .get_block_manifest(*height)? .ok_or(Error::MissingL1BlockHeight(*height))?; - let l1v = state.l1_view(); - let l1_vs = state.l1_view().tip_verification_state(); + let l1v = state.state().l1_view(); + let l1_vs = l1v.tip_verification_state(); + let cur_seen_tip_height = l1v.tip_height(); + let next_exp_height = l1v.next_expected_block(); // Do the consensus checks if let Some(l1_vs) = l1_vs { let l1_vs_height = l1_vs.last_verified_block_num as u64; let mut updated_l1vs = l1_vs.clone(); - for height in (l1_vs_height + 1..l1v.tip_height()) { + for height in (l1_vs_height + 1..cur_seen_tip_height) { let block_mf = l1_db .get_block_manifest(height)? .ok_or(Error::MissingL1BlockHeight(height))?; @@ -65,12 +64,10 @@ pub fn process_event( updated_l1vs = updated_l1vs.check_and_update_continuity_new(&header, &get_btc_params()); } - writes.push(ClientStateWrite::UpdateVerificationState(updated_l1vs)) + state.update_verification_state(updated_l1vs); } // Only accept the block if it's the next block in the chain we expect to accept. - let cur_seen_tip_height = l1v.tip_height(); - let next_exp_height = l1v.next_expected_block(); if next_exp_height > params.rollup().horizon_l1_height { // TODO check that the new block we're trying to add has the same parent as the tip // block @@ -80,7 +77,7 @@ pub fn process_event( } if *height == next_exp_height { - writes.push(ClientStateWrite::AcceptL1Block(*l1blkid)); + state.accept_l1_block(*l1blkid); } else { #[cfg(test)] eprintln!("not sure what to do here h={height} exp={next_exp_height}"); @@ -91,10 +88,10 @@ pub fn process_event( let safe_depth = params.rollup().l1_reorg_safe_depth as u64; let maturable_height = next_exp_height.saturating_sub(safe_depth); - if maturable_height > params.rollup().horizon_l1_height && state.is_chain_active() { - let (wrs, acts) = handle_mature_l1_height(maturable_height, state, database); - writes.extend(wrs); - actions.extend(acts); + if maturable_height > params.rollup().horizon_l1_height + && state.state().is_chain_active() + { + handle_mature_l1_height(state, maturable_height, database); } } @@ -115,21 +112,21 @@ pub fn process_event( let threshold = params.rollup.l1_reorg_safe_depth; let genesis_threshold = genesis_ht + threshold as u64; - debug!(%genesis_threshold, %genesis_ht, active=%state.is_chain_active(), "Inside activate chain"); + let active = state.state().is_chain_active(); + debug!(%genesis_threshold, %genesis_ht, %active, "Inside activate chain"); // If necessary, activate the chain! - if !state.is_chain_active() && *height >= genesis_threshold { + if !active && *height >= genesis_threshold { debug!("emitting chain activation"); let genesis_block = make_genesis_block(params); - writes.push(ClientStateWrite::ActivateChain); - writes.push(ClientStateWrite::UpdateVerificationState( - l1_verification_state.clone(), + state.activate_chain(); + state.update_verification_state(l1_verification_state.clone()); + state.set_sync_state(SyncState::from_genesis_blkid( + genesis_block.header().get_blockid(), )); - writes.push(ClientStateWrite::ReplaceSync(Box::new( - SyncState::from_genesis_blkid(genesis_block.header().get_blockid()), - ))); - actions.push(SyncAction::L2Genesis( + + state.push_action(SyncAction::L2Genesis( l1_verification_state.last_verified_block_hash, )); } @@ -138,46 +135,47 @@ pub fn process_event( SyncEvent::L1Revert(to_height) => { let l1_db = database.l1_db(); - let buried = state.l1_view().buried_l1_height(); + let buried = state.state().l1_view().buried_l1_height(); if *to_height < buried { error!(%to_height, %buried, "got L1 revert below buried height"); return Err(Error::ReorgTooDeep(*to_height, buried)); } - writes.push(ClientStateWrite::RollbackL1BlocksTo(*to_height)); + state.rollback_l1_blocks(*to_height); } SyncEvent::L1DABatch(height, checkpoints) => { debug!(%height, "received L1DABatch"); - if let Some(ss) = state.sync() { + if let Some(ss) = state.state().sync() { // TODO load it up and figure out what's there, see if we have to // load the state updates from L1 or something let l2_db = database.l2_db(); let proof_verified_checkpoints = - filter_verified_checkpoints(state, checkpoints, params.rollup()); + filter_verified_checkpoints(state.state(), checkpoints, params.rollup()); // When DABatch appears, it is only confirmed at the moment. These will be finalized // only when the corresponding L1 block is buried enough if !proof_verified_checkpoints.is_empty() { - writes.push(ClientStateWrite::CheckpointsReceived( - proof_verified_checkpoints - .iter() - .map(|batch_checkpoint_with_commitment| { - let batch_checkpoint = - &batch_checkpoint_with_commitment.batch_checkpoint; - L1Checkpoint::new( - batch_checkpoint.batch_info().clone(), - batch_checkpoint.bootstrap_state().clone(), - !batch_checkpoint.proof().is_empty(), - *height, - ) - }) - .collect(), - )); - - actions.push(SyncAction::WriteCheckpoints( + // Copy out all the basic checkpoint data into dedicated + // structures for it. + let ckpts = proof_verified_checkpoints + .iter() + .map(|batch_checkpoint_with_commitment| { + let batch_checkpoint = + &batch_checkpoint_with_commitment.batch_checkpoint; + L1Checkpoint::new( + batch_checkpoint.batch_info().clone(), + batch_checkpoint.bootstrap_state().clone(), + !batch_checkpoint.proof().is_empty(), + *height, + ) + }) + .collect::>(); + state.accept_checkpoints(&ckpts); + + state.push_action(SyncAction::WriteCheckpoints( *height, proof_verified_checkpoints, )); @@ -206,7 +204,7 @@ pub fn process_event( // height of last matured L1 block in chain state let chs_last_buried = chainstate.l1_view().safe_height().saturating_sub(1); // buried height in client state - let cls_last_buried = state.l1_view().buried_l1_height(); + let cls_last_buried = state.state().l1_view().buried_l1_height(); if chs_last_buried > cls_last_buried { // can bury till last matured block in chainstate @@ -215,25 +213,21 @@ pub fn process_event( let client_state_bury_height = min( chs_last_buried, // keep at least 1 item - state.l1_view().tip_height().saturating_sub(1), + state.state().l1_view().tip_height().saturating_sub(1), ); - writes.push(ClientStateWrite::UpdateBuried(client_state_bury_height)); + + state.update_buried(client_state_bury_height); } // TODO better checks here - writes.push(ClientStateWrite::AcceptL2Block( - *blkid, - block.block().header().blockidx(), - )); - actions.push(SyncAction::UpdateTip(*blkid)); - - let (wrs, acts) = handle_checkpoint_finalization(state, blkid, params, database); - writes.extend(wrs); - actions.extend(acts); + state.accept_l2_block(*blkid, block.block().header().blockidx()); + state.push_action(SyncAction::UpdateTip(*blkid)); + + handle_checkpoint_finalization(state, blkid, params, database)?; } } - Ok(ClientUpdateOutput::new(writes, actions)) + Ok(()) } /// Handles the maturation of L1 height by finalizing checkpoints and emitting @@ -248,8 +242,8 @@ pub fn process_event( /// /// # Arguments /// -/// * `maturable_height` - The height at which L1 blocks are considered mature. /// * `state` - A reference to the current client state. +/// * `maturable_height` - The height at which L1 blocks are considered mature. /// * `database` - A reference to the database interface. /// /// # Returns @@ -258,54 +252,58 @@ pub fn process_event( /// * A vector of [`ClientStateWrite`] representing the state changes to be written. /// * A vector of [`SyncAction`] representing the actions to be synchronized. fn handle_mature_l1_height( + state: &mut ClientStateMut, maturable_height: u64, - state: &ClientState, database: &impl Database, -) -> (Vec, Vec) { - let mut writes = Vec::new(); - let mut actions = Vec::new(); - - // If there are checkpoints at or before the maturable height, mark them as finalized - if state +) -> Result<(), Error> { + // If there are no checkpoints then return early. + if !state + .state() .l1_view() .has_verified_checkpoint_before(maturable_height) { - if let Some(checkpt) = state - .l1_view() - .get_last_verified_checkpoint_before(maturable_height) - { - // FinalizeBlock Should only be applied when l2_block is actually - // available in l2_db - // If l2 blocks is not in db then finalization will happen when - // l2Block is fetched from the network and the corresponding - //checkpoint is already finalized. - let l2_blockid = checkpt.batch_info.l2_blockid; - - match database.l2_db().get_block_data(l2_blockid) { - Ok(Some(_)) => { - debug!(%maturable_height, "Writing CheckpointFinalized"); - writes.push(ClientStateWrite::CheckpointFinalized(maturable_height)); - // Emit sync action for finalizing a l2 block - info!(%maturable_height, %l2_blockid, "l2 block found in db, push FinalizeBlock SyncAction"); - actions.push(SyncAction::FinalizeBlock(l2_blockid)); - } - Ok(None) => { - warn!( - %maturable_height,%l2_blockid, "l2 block not in db yet, skipping finalize" - ); - } - Err(e) => { - error!(%e, "error while fetching block data from l2_db"); - } + return Ok(()); + } + + // If there *are* checkpoints at or before the maturable height, mark them + // as finalized + if let Some(checkpt) = state + .state() + .l1_view() + .get_last_verified_checkpoint_before(maturable_height) + { + // FinalizeBlock Should only be applied when l2_block is actually + // available in l2_db + // If l2 blocks is not in db then finalization will happen when + // l2Block is fetched from the network and the corresponding + //checkpoint is already finalized. + let l2_blockid = checkpt.batch_info.l2_blockid; + + match database.l2_db().get_block_data(l2_blockid) { + Ok(Some(_)) => { + // Emit sync action for finalizing a l2 block + info!(%maturable_height, %l2_blockid, "l2 block found in db, push FinalizeBlock SyncAction"); + + state.finalize_checkpoint(maturable_height); + state.push_action(SyncAction::FinalizeBlock(l2_blockid)); + } + Ok(None) => { + warn!( + %maturable_height, %l2_blockid, "l2 block not in db yet, skipping finalize" + ); + } + Err(e) => { + error!(err = %e, "error while fetching block data from l2_db"); } - } else { - warn!( - %maturable_height, - "expected to find blockid corresponding to buried l1 height in confirmed_blocks but could not find" - ); } + } else { + warn!( + %maturable_height, + "expected to find blockid corresponding to buried l1 height in confirmed_blocks but could not find" + ); } - (writes, actions) + + Ok(()) } /// Handles the finalization of a checkpoint by processing the corresponding L2 @@ -330,14 +328,12 @@ fn handle_mature_l1_height( /// * A vector of [`ClientStateWrite`] representing the state changes to be written. /// * A vector of [`SyncAction`] representing the actions to be synchronized. fn handle_checkpoint_finalization( - state: &ClientState, + state: &mut ClientStateMut, blkid: &L2BlockId, params: &Params, database: &impl Database, -) -> (Vec, Vec) { - let mut writes = Vec::new(); - let mut actions = Vec::new(); - let verified_checkpoints: &[L1Checkpoint] = state.l1_view().verified_checkpoints(); +) -> Result<(), Error> { + let verified_checkpoints: &[L1Checkpoint] = state.state().l1_view().verified_checkpoints(); match find_l1_height_for_l2_blockid(verified_checkpoints, blkid) { Some(l1_height) => { let safe_depth = params.rollup().l1_reorg_safe_depth as u64; @@ -345,22 +341,22 @@ fn handle_checkpoint_finalization( // Maturable height is the height at which l1 blocks are sufficiently buried // and have negligible chance of reorg. let maturable_height = state + .state() .l1_view() .next_expected_block() .saturating_sub(safe_depth); // The l1 height should be handled only if it is less than maturable height if l1_height < maturable_height { - let (wrs, acts) = handle_mature_l1_height(l1_height, state, database); - writes.extend(wrs); - actions.extend(acts); + handle_mature_l1_height(state, l1_height, database)?; } } None => { debug!(%blkid, "L2 block not found in verified checkpoints, possibly not a last block in the checkpoint."); } } - (writes, actions) + + Ok(()) } /// Searches for a given [`L2BlockId`] within a slice of [`L1Checkpoint`] structs diff --git a/crates/consensus-logic/src/csm/state_tracker.rs b/crates/consensus-logic/src/csm/state_tracker.rs index 3fd4199620..f0067e0f7e 100644 --- a/crates/consensus-logic/src/csm/state_tracker.rs +++ b/crates/consensus-logic/src/csm/state_tracker.rs @@ -8,9 +8,10 @@ use strata_common::bail_manager::{check_bail_trigger, BAIL_SYNC_EVENT}; use strata_db::traits::*; use strata_primitives::params::Params; use strata_state::{ - client_state::ClientState, + client_state::{ClientState, ClientStateMut}, operation::{self, ClientUpdateOutput}, }; +use strata_storage::ClientStateManager; use tracing::*; use super::client_transition; @@ -20,6 +21,8 @@ pub struct StateTracker { params: Arc, database: Arc, + client_state_manager: Arc, + cur_state_idx: u64, cur_state: Arc, @@ -29,12 +32,14 @@ impl StateTracker { pub fn new( params: Arc, database: Arc, + client_state_manager: Arc, cur_state_idx: u64, cur_state: Arc, ) -> Self { Self { params, database, + client_state_manager, cur_state_idx, cur_state, } @@ -50,6 +55,7 @@ impl StateTracker { /// Given the next event index, computes the state application if the /// requisite data is available. Returns the output and the new state. + // TODO maybe remove output return value pub fn advance_consensus_state( &mut self, ev_idx: u64, @@ -62,40 +68,39 @@ impl StateTracker { // Load the event from the database. let db = self.database.as_ref(); let sync_event_db = db.sync_event_db(); - let client_state_db = db.client_state_db(); let ev = sync_event_db .get_sync_event(ev_idx)? .ok_or(Error::MissingSyncEvent(ev_idx))?; - debug!(?ev, "Processing event"); + debug!(?ev, "processing sync event"); #[cfg(feature = "debug-utils")] check_bail_trigger(BAIL_SYNC_EVENT); // Compute the state transition. - let outp = client_transition::process_event(&self.cur_state, &ev, db, &self.params)?; + let mut state_mut = ClientStateMut::new(self.cur_state.as_ref().clone()); + client_transition::process_event(&mut state_mut, &ev, db, &self.params)?; // Clone the state and apply the operations to it. - let mut new_state = self.cur_state.as_ref().clone(); - operation::apply_writes_to_state(&mut new_state, outp.writes().iter().cloned()); + let outp = state_mut.into_update(); + + // Store the outputs. + let state = self + .client_state_manager + .put_update_blocking(ev_idx, outp.clone())?; // Update bookkeeping. - self.cur_state = Arc::new(new_state); + self.cur_state = state; self.cur_state_idx = ev_idx; debug!(%ev_idx, "computed new consensus state"); - // Store the outputs. - // TODO ideally avoid clone - client_state_db.write_client_update_output(ev_idx, outp.clone())?; - Ok((outp, self.cur_state.clone())) } - /// Writes the current state to the database as a new checkpoint. + /// Does nothing. + // TODO remove this function pub fn store_checkpoint(&self) -> anyhow::Result<()> { - let client_state_db = self.database.client_state_db(); - let state = self.cur_state.as_ref().clone(); // TODO avoid clone - client_state_db.write_client_state_checkpoint(self.cur_state_idx, state)?; + warn!("tried to store client state checkpoint, we don't have this anymore"); Ok(()) } } @@ -113,71 +118,28 @@ impl StateTracker { /// - `cs_db`: An implementation of the [`ClientStateDatabase`] trait, used for retrieving /// checkpoint and state data. /// - `idx`: The index from which to replay state writes, starting from the last checkpoint. -pub fn reconstruct_cur_state( - cs_db: &impl ClientStateDatabase, -) -> anyhow::Result<(u64, ClientState)> { - let last_ckpt_idx = cs_db.get_last_checkpoint_idx()?; +pub fn reconstruct_cur_state(csman: &ClientStateManager) -> anyhow::Result<(u64, ClientState)> { + let last_state_idx = csman.get_last_state_idx_blocking()?; - // genesis state. - if last_ckpt_idx == 0 { + // We used to do something here, but now we just print a log. + if last_state_idx == 0 { debug!("starting from init state"); - let state = cs_db - .get_state_checkpoint(0)? - .ok_or(Error::MissingCheckpoint(0))?; - return Ok((0, state)); } - // If we're not in genesis, then we probably have to replay some writes. - let last_write_idx = cs_db.get_last_write_idx()?; - - let state = reconstruct_state(cs_db, last_write_idx)?; - - Ok((last_write_idx, state)) + let state = csman + .get_state_blocking(last_state_idx)? + .ok_or(Error::MissingConsensusWrites(last_state_idx))?; + Ok((last_state_idx, state)) } -/// Reconstructs the -/// [`ClientStateWrite`](strata_state::operation::ClientStateWrite) -/// -/// Under the hood fetches the nearest checkpoint before the reuested idx -/// and then replays all the [`ClientStateWrite`](strata_state::operation::ClientStateWrite)s -/// from that checkpoint up to the requested index `idx` -/// such that we have accurate [`ClientState`]. -/// -/// # Parameters -/// -/// - `cs_db`: anything that implements the [`ClientStateDatabase`] trait. -/// - `idx`: index to look ahead from. -pub fn reconstruct_state( - cs_db: &impl ClientStateDatabase, - idx: u64, -) -> anyhow::Result { - match cs_db.get_state_checkpoint(idx)? { - Some(cl) => { - // if the checkpoint was created at the idx itself, return the checkpoint - debug!(%idx, "no writes to replay"); - Ok(cl) - } +/// Fetches the client state at some idx from the database. +// TODO remove this +pub fn reconstruct_state(csman: &ClientStateManager, idx: u64) -> anyhow::Result { + match csman.get_state_blocking(idx)? { + Some(cl) => Ok(cl), None => { - // get the previously written checkpoint - let prev_ckpt_idx = cs_db.get_prev_checkpoint_at(idx)?; - - // get the previous checkpoint Client State - let mut state = cs_db - .get_state_checkpoint(prev_ckpt_idx)? - .ok_or(Error::MissingCheckpoint(idx))?; - - // write the client state - let write_replay_start = prev_ckpt_idx + 1; - debug!(%prev_ckpt_idx, %idx, "reconstructing state from checkpoint"); - - for i in write_replay_start..=idx { - let writes = cs_db - .get_client_state_writes(i)? - .ok_or(Error::MissingConsensusWrites(i))?; - operation::apply_writes_to_state(&mut state, writes.into_iter()); - } - - Ok(state) + error!("we don't support state reconstruction anymore"); + return Err(Error::MissingConsensusWrites(idx).into()); } } } diff --git a/crates/consensus-logic/src/csm/worker.rs b/crates/consensus-logic/src/csm/worker.rs index be3a1c7098..77cc4f241d 100644 --- a/crates/consensus-logic/src/csm/worker.rs +++ b/crates/consensus-logic/src/csm/worker.rs @@ -12,7 +12,7 @@ use strata_eectl::engine::ExecEngineCtl; use strata_primitives::prelude::*; use strata_state::{client_state::ClientState, csm_status::CsmStatus, operation::SyncAction}; use strata_status::StatusChannel; -use strata_storage::{CheckpointDbManager, L2BlockManager}; +use strata_storage::{CheckpointDbManager, ClientStateManager, L2BlockManager, NodeStorage}; use strata_tasks::ShutdownGuard; use tokio::{ sync::{broadcast, mpsc}, @@ -43,8 +43,8 @@ pub struct WorkerState { // TODO should we move this out? database: Arc, - /// L2 block manager. - l2_block_manager: Arc, + /// Node storage handle. + storage: Arc, /// Checkpoint manager. checkpoint_manager: Arc, @@ -62,15 +62,16 @@ impl WorkerState { pub fn open( params: Arc, database: Arc, - l2_block_manager: Arc, + storage: Arc, cupdate_tx: broadcast::Sender>, checkpoint_manager: Arc, ) -> anyhow::Result { - let client_state_db = database.client_state_db().as_ref(); - let (cur_state_idx, cur_state) = state_tracker::reconstruct_cur_state(client_state_db)?; + let (cur_state_idx, cur_state) = + state_tracker::reconstruct_cur_state(storage.client_state())?; let state_tracker = state_tracker::StateTracker::new( params.clone(), database.clone(), + client_state_manager.clone(), cur_state_idx, Arc::new(cur_state), ); @@ -87,7 +88,7 @@ impl WorkerState { params, config, database, - l2_block_manager, + storage, state_tracker, cupdate_tx, checkpoint_manager, @@ -240,17 +241,10 @@ fn handle_sync_event( // Make sure that the new state index is set as expected. assert_eq!(state.state_tracker.cur_state_idx(), ev_idx); - // Write the client state checkpoint periodically based on the event idx.. - if ev_idx % state.params.run.client_checkpoint_interval as u64 == 0 { - let client_state_db = state.database.client_state_db(); - client_state_db.write_client_state_checkpoint(ev_idx, new_state.as_ref().clone())?; - } - - // FIXME clean this up + // FIXME clean this up and make them take Arcs let mut status = CsmStatus::default(); status.set_last_sync_ev_idx(ev_idx); status.update_from_client_state(new_state.as_ref()); - status_channel.update_client_state(new_state.as_ref().clone()); trace!(?new_state, "sending client update notif"); @@ -281,7 +275,8 @@ fn apply_action( // TODO not sure what this should entail yet warn!(?blkid, "marking block invalid!"); state - .l2_block_manager + .storage() + .l2() .set_block_status_blocking(&blkid, BlockStatus::Invalid)?; } diff --git a/crates/consensus-logic/src/genesis.rs b/crates/consensus-logic/src/genesis.rs index 13f0778803..9d1a9ef6a8 100644 --- a/crates/consensus-logic/src/genesis.rs +++ b/crates/consensus-logic/src/genesis.rs @@ -15,15 +15,17 @@ use strata_state::{ genesis::GenesisStateData, header::L2BlockHeader, l1::{L1HeaderRecord, L1ViewState}, + operation::ClientUpdateOutput, prelude::*, }; +use strata_storage::{ClientStateManager, NodeStorage}; use tracing::*; use crate::errors::Error; /// Inserts into the database an initial basic client state that we can begin /// waiting for genesis with. -pub fn init_client_state(params: &Params, database: &impl Database) -> anyhow::Result<()> { +pub fn init_client_state(params: &Params, csman: &ClientStateManager) -> anyhow::Result<()> { debug!("initializing client state in database!"); let init_state = ClientState::from_genesis_params( @@ -32,8 +34,7 @@ pub fn init_client_state(params: &Params, database: &impl Database) -> anyhow::R ); // Write the state into the database. - let cs_db = database.client_state_db(); - cs_db.write_client_state_checkpoint(0, init_state)?; + csman.put_update_blocking(0, ClientUpdateOutput::new_state(init_state))?; Ok(()) } @@ -160,12 +161,10 @@ pub fn make_genesis_chainstate( } /// Check if the database needs to have client init done to it. -pub fn check_needs_client_init(database: &impl Database) -> anyhow::Result { - let cs_db = database.client_state_db(); - +pub fn check_needs_client_init(storage: &NodeStorage) -> anyhow::Result { // Check if we've written any genesis state checkpoint. These we perform a // bit more carefully and check errors more granularly. - match cs_db.get_last_checkpoint_idx() { + match storage.client_state().get_last_state_idx_blocking() { Ok(_) => {} Err(DbError::NotBootstrapped) => return Ok(true), @@ -176,11 +175,9 @@ pub fn check_needs_client_init(database: &impl Database) -> anyhow::Result Ok(false) } -pub fn check_needs_genesis(database: &impl Database) -> anyhow::Result { - let l2_db = database.l2_db(); - +pub fn check_needs_genesis(storage: &NodeStorage) -> anyhow::Result { // Check if there's any genesis block written. - match l2_db.get_blocks_at_height(0) { + match storage.l2().get_blocks_at_height_blocking(0) { Ok(blkids) => Ok(blkids.is_empty()), Err(DbError::NotBootstrapped) => Ok(true), diff --git a/crates/consensus-logic/src/sync_manager.rs b/crates/consensus-logic/src/sync_manager.rs index 0d07c04304..9000764d4e 100644 --- a/crates/consensus-logic/src/sync_manager.rs +++ b/crates/consensus-logic/src/sync_manager.rs @@ -79,7 +79,7 @@ pub fn start_sync_tasks< >( executor: &TaskExecutor, database: Arc, - storage: &NodeStorage, + storage: &Arc, engine: Arc, pool: threadpool::ThreadPool, params: Arc, @@ -122,7 +122,7 @@ pub fn start_sync_tasks< let client_worker_state = worker::WorkerState::open( params.clone(), database, - storage.l2().clone(), + storage.clone(), cupdate_tx, storage.checkpoint().clone(), )?; diff --git a/crates/db/src/traits.rs b/crates/db/src/traits.rs index 237a8f2d35..19a7148adb 100644 --- a/crates/db/src/traits.rs +++ b/crates/db/src/traits.rs @@ -11,9 +11,8 @@ use strata_primitives::{ proof::{ProofContext, ProofKey}, }; use strata_state::{ - block::L2BlockBundle, bridge_duties::BridgeDutyStatus, chain_state::Chainstate, - client_state::ClientState, l1::L1Tx, operation::*, prelude::*, state_op::WriteBatch, - sync_event::SyncEvent, + block::L2BlockBundle, bridge_duties::BridgeDutyStatus, chain_state::Chainstate, l1::L1Tx, + operation::*, prelude::*, state_op::WriteBatch, sync_event::SyncEvent, }; use strata_zkvm::ProofReceipt; @@ -124,33 +123,14 @@ pub trait ClientStateDatabase { /// [``SyncEventDatabase``]. Will error if `idx - 1` does not exist (unless /// `idx` is 0) or if trying to overwrite a state, as this is almost /// certainly a bug. - fn write_client_update_output(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()>; + fn put_client_update(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()>; - /// Writes a new consensus checkpoint that we can cheaply resume from. Will - /// error if trying to overwrite a state. - fn write_client_state_checkpoint(&self, idx: u64, state: ClientState) -> DbResult<()>; + /// Gets the output client state writes for some input index. + fn get_client_update(&self, idx: u64) -> DbResult>; /// Gets the idx of the last written state. Or returns error if a bootstrap /// state has not been written yet. - fn get_last_write_idx(&self) -> DbResult; - - /// Gets the output client state writes for some input index. - fn get_client_state_writes(&self, idx: u64) -> DbResult>>; - - /// Gets the actions output from a client state transition. - fn get_client_update_actions(&self, idx: u64) -> DbResult>>; - - /// Gets the last consensus checkpoint idx. - fn get_last_checkpoint_idx(&self) -> DbResult; - - /// Gets the idx of the last checkpoint up to the given input idx. This is - /// the idx we should resume at when playing out consensus writes since the - /// saved checkpoint, which may be the same as the given idx (if we didn't - /// receive any sync events since the last checkpoint. - fn get_prev_checkpoint_at(&self, idx: u64) -> DbResult; - - /// Gets a state checkpoint at a previously written index, if it exists. - fn get_state_checkpoint(&self, idx: u64) -> DbResult>; + fn get_last_state_idx(&self) -> DbResult; } /// L2 data store for CL blocks. Does not store anything about what we think diff --git a/crates/rocksdb-store/src/client_state/db.rs b/crates/rocksdb-store/src/client_state/db.rs index 5781237325..21b9c7803d 100644 --- a/crates/rocksdb-store/src/client_state/db.rs +++ b/crates/rocksdb-store/src/client_state/db.rs @@ -4,7 +4,7 @@ use rockbound::{OptimisticTransactionDB, Schema, SchemaDBOperationsExt}; use strata_db::{errors::*, traits::*, DbResult}; use strata_state::operation::*; -use super::schemas::{ClientStateSchema, ClientUpdateOutputSchema}; +use super::schemas::ClientUpdateOutputSchema; use crate::DbOpsConfig; pub struct ClientStateDb { @@ -38,7 +38,7 @@ impl ClientStateDb { } impl ClientStateDatabase for ClientStateDb { - fn write_client_update_output(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()> { + fn put_client_update(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()> { let expected_idx = match self.get_last_idx::()? { Some(last_idx) => last_idx + 1, None => 1, @@ -50,75 +50,16 @@ impl ClientStateDatabase for ClientStateDb { Ok(()) } - fn write_client_state_checkpoint( - &self, - idx: u64, - state: strata_state::client_state::ClientState, - ) -> DbResult<()> { - // FIXME this should probably be a transaction - if self.db.get::(&idx)?.is_some() { - return Err(DbError::OverwriteConsensusCheckpoint(idx)); - } - self.db.put::(&idx, &state)?; - Ok(()) + fn get_client_update(&self, idx: u64) -> DbResult> { + Ok(self.db.get::(&idx)?) } - fn get_last_write_idx(&self) -> DbResult { + fn get_last_state_idx(&self) -> DbResult { match self.get_last_idx::()? { Some(idx) => Ok(idx), None => Err(DbError::NotBootstrapped), } } - - fn get_client_state_writes(&self, idx: u64) -> DbResult>> { - let output = self.db.get::(&idx)?; - match output { - Some(out) => Ok(Some(out.writes().to_owned())), - None => Ok(None), - } - } - - fn get_client_update_actions(&self, idx: u64) -> DbResult>> { - let output = self.db.get::(&idx)?; - match output { - Some(out) => Ok(Some(out.actions().to_owned())), - None => Ok(None), - } - } - - fn get_last_checkpoint_idx(&self) -> DbResult { - match self.get_last_idx::()? { - Some(idx) => Ok(idx), - None => Err(DbError::NotBootstrapped), - } - } - - fn get_prev_checkpoint_at(&self, idx: u64) -> DbResult { - let mut iterator = self.db.iter::()?; - iterator.seek_to_last(); - let rev_iterator = iterator.rev(); - - for res in rev_iterator { - match res { - Ok(item) => { - let (tip, _) = item.into_tuple(); - if tip <= idx { - return Ok(tip); - } - } - Err(e) => return Err(DbError::Other(e.to_string())), - } - } - - Err(DbError::NotBootstrapped) - } - - fn get_state_checkpoint( - &self, - idx: u64, - ) -> DbResult> { - Ok(self.db.get::(&idx)?) - } } #[cfg(test)] diff --git a/crates/rocksdb-store/src/client_state/schemas.rs b/crates/rocksdb-store/src/client_state/schemas.rs index fb060bc226..2331518f24 100644 --- a/crates/rocksdb-store/src/client_state/schemas.rs +++ b/crates/rocksdb-store/src/client_state/schemas.rs @@ -1,4 +1,4 @@ -use strata_state::{client_state::ClientState, operation::ClientUpdateOutput}; +use strata_state::operation::ClientUpdateOutput; use crate::{define_table_with_seek_key_codec, define_table_without_codec, impl_borsh_value_codec}; @@ -7,9 +7,3 @@ define_table_with_seek_key_codec!( /// Table to store client state updates. (ClientUpdateOutputSchema) u64 => ClientUpdateOutput ); - -// Consensus State Schema and corresponding codecs implementation -define_table_with_seek_key_codec!( - /// Table to store client states. - (ClientStateSchema) u64 => ClientState -); diff --git a/crates/rocksdb-store/src/lib.rs b/crates/rocksdb-store/src/lib.rs index 7cd6766da8..840dcd3805 100644 --- a/crates/rocksdb-store/src/lib.rs +++ b/crates/rocksdb-store/src/lib.rs @@ -20,43 +20,6 @@ use strata_db::database::CommonDatabase; #[cfg(feature = "test_utils")] pub mod test_utils; -pub const ROCKSDB_NAME: &str = "strata-client"; - -pub const STORE_COLUMN_FAMILIES: &[ColumnFamilyName] = &[ - SequenceSchema::COLUMN_FAMILY_NAME, - ChainstateSchema::COLUMN_FAMILY_NAME, - ClientUpdateOutputSchema::COLUMN_FAMILY_NAME, - ClientStateSchema::COLUMN_FAMILY_NAME, - L1BlockSchema::COLUMN_FAMILY_NAME, - MmrSchema::COLUMN_FAMILY_NAME, - SyncEventSchema::COLUMN_FAMILY_NAME, - TxnSchema::COLUMN_FAMILY_NAME, - L2BlockSchema::COLUMN_FAMILY_NAME, - L2BlockStatusSchema::COLUMN_FAMILY_NAME, - L2BlockHeightSchema::COLUMN_FAMILY_NAME, - WriteBatchSchema::COLUMN_FAMILY_NAME, - // Seqdb schemas - SeqBlobIdSchema::COLUMN_FAMILY_NAME, - SeqBlobSchema::COLUMN_FAMILY_NAME, - // Bcast schemas - BcastL1TxIdSchema::COLUMN_FAMILY_NAME, - BcastL1TxSchema::COLUMN_FAMILY_NAME, - // Bridge relay schemas - BridgeMsgIdSchema::COLUMN_FAMILY_NAME, - ScopeMsgIdSchema::COLUMN_FAMILY_NAME, - // Bridge signature schemas - BridgeTxStateTxidSchema::COLUMN_FAMILY_NAME, - BridgeTxStateSchema::COLUMN_FAMILY_NAME, - // Bridge duty schemas - BridgeDutyTxidSchema::COLUMN_FAMILY_NAME, - BridgeDutyStatusSchema::COLUMN_FAMILY_NAME, - // Bridge duty checkpoint - BridgeDutyCheckpointSchema::COLUMN_FAMILY_NAME, - // Checkpoint schemas - BatchCheckpointSchema::COLUMN_FAMILY_NAME, - // TODO add col families for other store types -]; - use std::{fs, path::Path, sync::Arc}; use bridge::schemas::{ @@ -96,12 +59,48 @@ pub use sync_event::db::SyncEventDb; use crate::{ chain_state::schemas::{ChainstateSchema, WriteBatchSchema}, - client_state::schemas::{ClientStateSchema, ClientUpdateOutputSchema}, + client_state::schemas::ClientUpdateOutputSchema, l1::schemas::{L1BlockSchema, MmrSchema, TxnSchema}, sequence::SequenceSchema, sync_event::schemas::SyncEventSchema, }; +pub const ROCKSDB_NAME: &str = "strata-client"; + +pub const STORE_COLUMN_FAMILIES: &[ColumnFamilyName] = &[ + SequenceSchema::COLUMN_FAMILY_NAME, + ChainstateSchema::COLUMN_FAMILY_NAME, + ClientUpdateOutputSchema::COLUMN_FAMILY_NAME, + L1BlockSchema::COLUMN_FAMILY_NAME, + MmrSchema::COLUMN_FAMILY_NAME, + SyncEventSchema::COLUMN_FAMILY_NAME, + TxnSchema::COLUMN_FAMILY_NAME, + L2BlockSchema::COLUMN_FAMILY_NAME, + L2BlockStatusSchema::COLUMN_FAMILY_NAME, + L2BlockHeightSchema::COLUMN_FAMILY_NAME, + WriteBatchSchema::COLUMN_FAMILY_NAME, + // Seqdb schemas + SeqBlobIdSchema::COLUMN_FAMILY_NAME, + SeqBlobSchema::COLUMN_FAMILY_NAME, + // Bcast schemas + BcastL1TxIdSchema::COLUMN_FAMILY_NAME, + BcastL1TxSchema::COLUMN_FAMILY_NAME, + // Bridge relay schemas + BridgeMsgIdSchema::COLUMN_FAMILY_NAME, + ScopeMsgIdSchema::COLUMN_FAMILY_NAME, + // Bridge signature schemas + BridgeTxStateTxidSchema::COLUMN_FAMILY_NAME, + BridgeTxStateSchema::COLUMN_FAMILY_NAME, + // Bridge duty schemas + BridgeDutyTxidSchema::COLUMN_FAMILY_NAME, + BridgeDutyStatusSchema::COLUMN_FAMILY_NAME, + // Bridge duty checkpoint + BridgeDutyCheckpointSchema::COLUMN_FAMILY_NAME, + // Checkpoint schemas + BatchCheckpointSchema::COLUMN_FAMILY_NAME, + // TODO add col families for other store types +]; + /// database operations configuration #[derive(Clone, Copy, Debug)] pub struct DbOpsConfig { diff --git a/crates/state/src/client_state.rs b/crates/state/src/client_state.rs index b6d4a30a1a..1aa81bb4cf 100644 --- a/crates/state/src/client_state.rs +++ b/crates/state/src/client_state.rs @@ -7,11 +7,13 @@ use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; use serde::{Deserialize, Serialize}; use strata_primitives::buf::Buf32; +use tracing::*; use crate::{ batch::{BatchInfo, BootstrapState}, id::L2BlockId, l1::{HeaderVerificationState, L1BlockId}, + operation::{ClientUpdateOutput, SyncAction}, }; /// High level client's state of the network. This is local to the client, not @@ -327,3 +329,171 @@ impl L1Checkpoint { } } } + +/// Wrapper around [`ClientState`] used for modifying it and producing sync +/// actions. +pub struct ClientStateMut { + state: ClientState, + actions: Vec, +} + +impl ClientStateMut { + pub fn new(state: ClientState) -> Self { + Self { + state, + actions: Vec::new(), + } + } + + pub fn state(&self) -> &ClientState { + &self.state + } + + pub fn into_update(self) -> ClientUpdateOutput { + ClientUpdateOutput::new(self.state, self.actions) + } + + pub fn push_action(&mut self, a: SyncAction) { + self.actions.push(a); + } + + pub fn push_actions(&mut self, a: impl Iterator) { + self.actions.extend(a); + } + + // Semantical mutation fns. + // TODO remove logs from this, break down into simpler logical units + + // TODO remove sync state + pub fn set_sync_state(&mut self, ss: SyncState) { + self.state.set_sync_state(ss); + } + + pub fn activate_chain(&mut self) { + self.state.chain_active = true; + } + + pub fn update_verification_state(&mut self, l1_vs: HeaderVerificationState) { + debug!(?l1_vs, "received HeaderVerificationState"); + + if self.state.genesis_verification_hash().is_none() { + info!(?l1_vs, "Setting genesis L1 verification state"); + self.state.genesis_l1_verification_state_hash = Some(l1_vs.compute_hash().unwrap()); + } + + self.state.l1_view_mut().header_verification_state = Some(l1_vs); + } + + pub fn rollback_l1_blocks(&mut self, height: u64) { + let l1v = self.state.l1_view_mut(); + let buried_height = l1v.buried_l1_height(); + + if height < buried_height { + error!(%height, %buried_height, "unable to roll back past buried height"); + panic!("operation: emitted invalid write"); + } + + let new_unacc_len = (height - buried_height) as usize; + let l1_vs = l1v.tip_verification_state(); + if let Some(l1_vs) = l1_vs { + // TODO: handle other things + let mut rollbacked_l1_vs = l1_vs.clone(); + rollbacked_l1_vs.last_verified_block_num = height as u32; + rollbacked_l1_vs.last_verified_block_hash = l1v.local_unaccepted_blocks[new_unacc_len]; + } + l1v.local_unaccepted_blocks.truncate(new_unacc_len); + + // Keep pending checkpoints whose l1 height is less than or equal to rollback height + l1v.verified_checkpoints + .retain(|ckpt| ckpt.height <= height); + } + + // TODO convert to L1BlockCommitment? + pub fn accept_l1_block(&mut self, l1blkid: L1BlockId) { + debug!(?l1blkid, "received AcceptL1Block"); + // TODO make this also do something + let l1v = self.state.l1_view_mut(); + l1v.local_unaccepted_blocks.push(l1blkid); + l1v.next_expected_block += 1; + } + + // TODO convert to L2BlockCommitment? + pub fn accept_l2_block(&mut self, blkid: L2BlockId, height: u64) { + // TODO do any other bookkeeping + debug!(%height, %blkid, "received AcceptL2Block"); + let ss = self.state.expect_sync_mut(); + ss.tip_blkid = blkid; + ss.tip_height = height; + } + + pub fn update_buried(&mut self, new_idx: u64) { + let l1v = self.state.l1_view_mut(); + + // Check that it's increasing. + let old_idx = l1v.buried_l1_height(); + + if new_idx < old_idx { + panic!("operation: emitted non-greater buried height"); + } + + // Check that it's not higher than what we know about. + if new_idx > l1v.tip_height() { + panic!("operation: new buried height above known L1 tip"); + } + + // If everything checks out we can just remove them. + let diff = (new_idx - old_idx) as usize; + let _blocks = l1v + .local_unaccepted_blocks + .drain(..diff) + .collect::>(); + + // TODO merge these blocks into the L1 MMR in the client state if + // we haven't already + } + + pub fn accept_checkpoints(&mut self, ckpts: &[L1Checkpoint]) { + // Extend the pending checkpoints + self.state + .l1_view_mut() + .verified_checkpoints + .extend(ckpts.iter().cloned()); + } + + pub fn finalize_checkpoint(&mut self, l1height: u64) { + let l1v = self.state.l1_view_mut(); + + let finalized_checkpts: Vec<_> = l1v + .verified_checkpoints + .iter() + .take_while(|ckpt| ckpt.height <= l1height) + .collect(); + + let new_finalized = finalized_checkpts.last().cloned().cloned(); + let total_finalized = finalized_checkpts.len(); + debug!(?new_finalized, ?total_finalized, "Finalized checkpoints"); + + // Remove the finalized from pending and then mark the last one as last_finalized + // checkpoint + l1v.verified_checkpoints.drain(..total_finalized); + + if let Some(ckpt) = new_finalized { + // Check if heights match accordingly + if !l1v + .last_finalized_checkpoint + .as_ref() + .is_none_or(|prev_ckpt| ckpt.batch_info.idx() == prev_ckpt.batch_info.idx() + 1) + { + panic!("operation: mismatched indices of pending checkpoint"); + } + + let fin_blockid = *ckpt.batch_info.l2_blockid(); + l1v.last_finalized_checkpoint = Some(ckpt); + + // Update finalized blockid in StateSync + self.state.expect_sync_mut().finalized_blkid = fin_blockid; + } + } + + // TODO add operation stuff +} diff --git a/crates/state/src/operation.rs b/crates/state/src/operation.rs index 3394487029..dab8c84657 100644 --- a/crates/state/src/operation.rs +++ b/crates/state/src/operation.rs @@ -19,25 +19,33 @@ use crate::{ Clone, Debug, Eq, PartialEq, Arbitrary, BorshDeserialize, BorshSerialize, Deserialize, Serialize, )] pub struct ClientUpdateOutput { - writes: Vec, + state: ClientState, actions: Vec, } impl ClientUpdateOutput { - pub fn new(writes: Vec, actions: Vec) -> Self { - Self { writes, actions } + pub fn new(state: ClientState, actions: Vec) -> Self { + Self { state, actions } } - pub fn writes(&self) -> &[ClientStateWrite] { - &self.writes + pub fn new_state(state: ClientState) -> Self { + Self::new(state, Vec::new()) + } + + pub fn state(&self) -> &ClientState { + &self.state } pub fn actions(&self) -> &[SyncAction] { &self.actions } - pub fn into_parts(self) -> (Vec, Vec) { - (self.writes, self.actions) + pub fn into_state(self) -> ClientState { + self.state + } + + pub fn into_parts(self) -> (ClientState, Vec) { + (self.state, self.actions) } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 1b3c262014..2fa41adca1 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -13,16 +13,23 @@ pub use ops::l1tx_broadcast::BroadcastDbOps; use strata_db::traits::Database; /// A consolidation of database managers. +// TODO move this to its own module #[derive(Clone)] pub struct NodeStorage { l2_block_manager: Arc, + client_state_manager: Arc, checkpoint_manager: Arc, } + impl NodeStorage { pub fn l2(&self) -> &Arc { &self.l2_block_manager } + pub fn client_state(&self) -> &Arc { + &self.client_state_manager + } + pub fn checkpoint(&self) -> &Arc { &self.checkpoint_manager } @@ -33,9 +40,11 @@ where D: Database + Sync + Send + 'static, { let checkpoint_manager = Arc::new(CheckpointDbManager::new(pool.clone(), db.clone())); + let client_state_manager = Arc::new(ClientStateManager::new(pool.clone(), db.clone())); let l2_block_manager = Arc::new(L2BlockManager::new(pool.clone(), db.clone())); NodeStorage { checkpoint_manager, + client_state_manager, l2_block_manager, } } diff --git a/crates/storage/src/managers/client_state.rs b/crates/storage/src/managers/client_state.rs index 2e1cee2410..14a6df2f28 100644 --- a/crates/storage/src/managers/client_state.rs +++ b/crates/storage/src/managers/client_state.rs @@ -3,21 +3,54 @@ use std::sync::Arc; -use strata_db::traits::Database; -use strata_state::operation::ClientUpdateOutput; +use strata_db::{traits::Database, DbError, DbResult}; +use strata_state::{client_state::ClientState, operation::ClientUpdateOutput}; use threadpool::ThreadPool; use crate::{cache, ops}; pub struct ClientStateManager { ops: ops::client_state::ClientStateOps, - state_cache: cache::CacheTable>, + + // TODO actually use caches + update_cache: cache::CacheTable>, + state_cache: cache::CacheTable>, } impl ClientStateManager { pub fn new(pool: ThreadPool, db: Arc) -> Self { let ops = ops::client_state::Context::new(db.client_state_db().clone()).into_ops(pool); + let update_cache = cache::CacheTable::new(64.try_into().unwrap()); let state_cache = cache::CacheTable::new(64.try_into().unwrap()); - Self { ops, state_cache } + Self { + ops, + update_cache, + state_cache, + } + } + + pub fn get_last_state_idx_blocking(&self) -> DbResult { + self.ops.get_last_state_idx_blocking() + } + + // TODO convert to managing these with Arcs + pub fn get_state_blocking(&self, idx: u64) -> DbResult> { + self.ops + .get_client_update_blocking(idx) + .map(|res| res.map(|update| update.into_state())) + } + + pub fn put_update_blocking( + &self, + idx: u64, + update: ClientUpdateOutput, + ) -> DbResult> { + // FIXME this is a lot of cloning, good thing the type isn't gigantic, + // still feels bad though + let state = Arc::new(update.state().clone()); + self.ops.put_client_update_blocking(idx, update.clone())?; + self.update_cache.insert(idx, Some(update)); + self.state_cache.insert(idx, state.clone()); + Ok(state) } } diff --git a/crates/storage/src/ops/client_state.rs b/crates/storage/src/ops/client_state.rs index e0287fc7c2..be4b137970 100644 --- a/crates/storage/src/ops/client_state.rs +++ b/crates/storage/src/ops/client_state.rs @@ -3,22 +3,14 @@ use std::sync::Arc; use strata_db::traits::*; -use strata_state::{ - client_state::ClientState, - operation::{ClientStateWrite, ClientUpdateOutput, SyncAction}, -}; +use strata_state::operation::ClientUpdateOutput; use crate::exec::*; inst_ops_simple! { ( => ClientStateOps) { - write_client_update_output(idx: u64, output: ClientUpdateOutput) => (); - write_client_state_checkpoint(idx: u64, state: ClientState) => (); - get_last_write_idx() => u64; - get_client_state_writes(idx: u64) => Option>; - get_client_update_actions(idx: u64) => Option>; - get_last_checkpoint_idx() => u64; - get_prev_checkpoint_at(idx: u64) => u64; - get_state_checkpoint(idx: u64) => Option; + put_client_update(idx: u64, output: ClientUpdateOutput) => (); + get_client_update(idx: u64) => Option; + get_last_state_idx() => u64; } }