diff --git a/Cargo.toml b/Cargo.toml index 4c34d618d..6f62fa3b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,9 @@ resolver = "2" [workspace.dependencies] alpen-vertex-common = { path = "crates/common" } +alpen-vertex-consensus-logic = { path = "crates/consensus-logic" } +alpen-vertex-evmctl = { path = "crates/evmctl" } +alpen-vertex-db = { path = "crates/db" } alpen-vertex-mmr = { path = "crates/util/mmr" } alpen-vertex-primitives = { path = "crates/primitives" } alpen-vertex-rpc-api = { path = "crates/rpc/api" } @@ -29,11 +32,15 @@ alpen-vertex-state = { path = "crates/state" } anyhow = "1.0.86" arbitrary = { version = "1.3.2", features = ["derive"] } +argh = "0.1" async-trait = "0.1" borsh = { version = "1.5.0", features = ["derive"] } +digest = "0.10" +hex = { version = "0.4", features = ["serde"] } jsonrpsee = "0.22" jsonrpsee-core = "0.22" jsonrpsee-types = "0.22" +parking_lot = "0.12.3" reth-db = { git = "https://github.com/paradigmxyz/reth.git", tag = "v0.2.0-beta.6" } reth-ipc = { git = "https://github.com/paradigmxyz/reth.git", tag = "v0.2.0-beta.6" } reth-primitives = { git = "https://github.com/paradigmxyz/reth.git", tag = "v0.2.0-beta.6" } @@ -44,7 +51,8 @@ rockbound = { git = "https://github.com/sovereign-Labs/rockbound", tag = "v2.0.1 rocksdb = "0.21" serde = "1.0" serde_json = "1.0" +sha2 = "0.10" thiserror = "1.0" tokio = { version = "1.37", features = ["full"] } tracing = "0.1" -tracing-subscriber = "0.3" \ No newline at end of file +tracing-subscriber = "0.3" diff --git a/crates/consensus-logic/Cargo.toml b/crates/consensus-logic/Cargo.toml index 54b6ea9ca..9a89600e0 100644 --- a/crates/consensus-logic/Cargo.toml +++ b/crates/consensus-logic/Cargo.toml @@ -4,3 +4,15 @@ version = "0.1.0" edition = "2021" [dependencies] +alpen-vertex-db = { workspace = true } +alpen-vertex-evmctl = { workspace = true } +alpen-vertex-primitives = { workspace = true } +alpen-vertex-state = { workspace = true } + +anyhow = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +rand = { version = "*", features = ["getrandom"] } \ No newline at end of file diff --git a/crates/consensus-logic/src/chain_tip.rs b/crates/consensus-logic/src/chain_tip.rs new file mode 100644 index 000000000..a65e55429 --- /dev/null +++ b/crates/consensus-logic/src/chain_tip.rs @@ -0,0 +1,272 @@ +//! Chain tip tracking. Used to talk to the EL and pick the new chain tip. + +use std::collections::*; +use std::sync::Arc; + +use tokio::sync::mpsc; +use tracing::*; + +use alpen_vertex_db::errors::DbError; +use alpen_vertex_db::traits::{BlockStatus, Database, L2DataProvider, L2DataStore, SyncEventStore}; +use alpen_vertex_evmctl::engine::ExecEngineCtl; +use alpen_vertex_evmctl::messages::ExecPayloadData; +use alpen_vertex_primitives::params::Params; +use alpen_vertex_state::block::{L2Block, L2BlockId}; +use alpen_vertex_state::consensus::ConsensusState; +use alpen_vertex_state::operation::SyncAction; +use alpen_vertex_state::sync_event::SyncEvent; + +use crate::ctl::CsmController; +use crate::message::ChainTipMessage; +use crate::{credential, errors::*, reorg, unfinalized_tracker}; + +/// Tracks the parts of the chain that haven't been finalized on-chain yet. +pub struct ChainTipTrackerState { + /// Consensus parameters. + params: Arc, + + /// Underlying state database. + database: Arc, + + /// Current consensus state we're considering blocks against. + cur_state: Arc, + + /// Tracks unfinalized block tips. + chain_tracker: unfinalized_tracker::UnfinalizedBlockTracker, + + /// Current best block. + // TODO make sure we actually want to have this + cur_best_block: L2BlockId, +} + +impl ChainTipTrackerState { + /// Constructs a new instance we can run the tracker with. + pub fn new( + params: Arc, + database: Arc, + cur_state: Arc, + chain_tracker: unfinalized_tracker::UnfinalizedBlockTracker, + cur_best_block: L2BlockId, + ) -> Self { + Self { + params, + database, + cur_state, + chain_tracker, + cur_best_block, + } + } + + fn finalized_tip(&self) -> &L2BlockId { + self.chain_tracker.finalized_tip() + } + + fn set_block_status(&self, id: &L2BlockId, status: BlockStatus) -> Result<(), DbError> { + let l2store = self.database.l2_store(); + l2store.set_block_status(*id, status)?; + Ok(()) + } +} + +/// Main tracker task that takes a ready chain tip tracker and some IO stuff. +pub fn tracker_task( + state: ChainTipTrackerState, + engine: Arc, + ctm_rx: mpsc::Receiver, + csm_ctl: Arc>, +) { + if let Err(e) = tracker_task_inner(state, engine.as_ref(), ctm_rx, &csm_ctl) { + error!(err = %e, "tracker aborted"); + } +} + +fn tracker_task_inner( + mut state: ChainTipTrackerState, + engine: &E, + mut ctm_rx: mpsc::Receiver, + csm_ctl: &CsmController, +) -> anyhow::Result<()> { + loop { + let Some(m) = ctm_rx.blocking_recv() else { + break; + }; + + // TODO decide when errors are actually failures vs when they're okay + process_ct_msg(m, &mut state, engine, csm_ctl)?; + } + + Ok(()) +} + +fn process_ct_msg( + ctm: ChainTipMessage, + state: &mut ChainTipTrackerState, + engine: &E, + csm_ctl: &CsmController, +) -> anyhow::Result<()> { + match ctm { + ChainTipMessage::NewState(cs, output) => { + let l2_tip = cs.chain_state().chain_tip_blockid(); + + // Update the new state. + state.cur_state = cs; + + // TODO use output actions to clear out dangling states now + for act in output.actions() { + match act { + SyncAction::FinalizeBlock(blkid) => { + let fin_report = state.chain_tracker.update_finalized_tip(blkid)?; + info!(?blkid, ?fin_report, "finalized block") + // TODO do something with the finalization report + } + + // TODO + _ => {} + } + } + + // TODO recheck every remaining block's validity using the new state + // starting from the bottom up, putting into a new chain tracker + } + + ChainTipMessage::NewBlock(blkid) => { + let l2prov = state.database.l2_provider(); + let block = l2prov + .get_block_data(blkid)? + .ok_or(Error::MissingL2Block(blkid))?; + + // First, decide if the block seems correctly signed and we haven't + // already marked it as invalid. + let cstate = state.cur_state.clone(); + let correctly_signed = check_new_block(&blkid, &block, &cstate, state)?; + if !correctly_signed { + // It's invalid, write that and return. + state.set_block_status(&blkid, BlockStatus::Invalid)?; + return Ok(()); + } + + // Try to execute the payload, seeing if *that's* valid. + let exec_hash = block.header().exec_payload_hash(); + let exec_seg = block.exec_segment(); + let eng_payload = ExecPayloadData::new_simple(exec_seg.payload().to_vec()); + debug!(?blkid, ?exec_hash, "submitting execution payload"); + let res = engine.submit_payload(eng_payload)?; + + // If the payload is invalid then we should write the full block as + // being invalid and return too. + // TODO verify this is reasonable behavior, especially with regard + // to pre-sync + if res == alpen_vertex_evmctl::engine::BlockStatus::Invalid { + // It's invalid, write that and return. + state.set_block_status(&blkid, BlockStatus::Invalid)?; + return Ok(()); + } + + // Insert block into pending block tracker and figure out if we + // should switch to it as a potential head. This returns if we + // created a new tip instead of advancing an existing tip. + let cur_tip = cstate.chain_state().chain_tip_blockid(); + let new_tip = state.chain_tracker.attach_block(blkid, block.header())?; + if new_tip { + debug!(?blkid, "created new pending chain tip"); + } + + let best_block = pick_best_block( + &cur_tip, + state.chain_tracker.chain_tips_iter(), + state.database.as_ref(), + )?; + + // Figure out what our job is now. + let depth = 100; // TODO change this + let reorg = reorg::compute_reorg(&cur_tip, best_block, depth, &state.chain_tracker) + .ok_or(Error::UnableToFindReorg(cur_tip, *best_block))?; + + // TODO this shouldn't be called "reorg" here, make the types + // context aware so that we know we're not doing anything abnormal + // in the normal case + + // Insert the sync event and submit it to the executor. + let ev = SyncEvent::NewTipBlock(*reorg.new_tip()); + csm_ctl.submit_event(ev)?; + + // Apply the changes to our state. + state.cur_best_block = *reorg.new_tip(); + + // TODO is there anything else we have to do here? + } + } + + Ok(()) +} + +/// Considers if the block is plausibly valid and if we should attach it to the +/// pending unfinalized blocks tree. The block is assumed to already be +/// structurally consistent. +fn check_new_block( + blkid: &L2BlockId, + block: &L2Block, + cstate: &ConsensusState, + state: &mut ChainTipTrackerState, +) -> anyhow::Result { + let params = state.params.as_ref(); + + // Check that the block is correctly signed. + let cred_ok = credential::check_block_credential(block.header(), cstate.chain_state(), params); + if !cred_ok { + warn!(?blkid, "block has invalid credential"); + return Ok(false); + } + + // Check that we haven't already marked the block as invalid. + let l2prov = state.database.l2_provider(); + if let Some(status) = l2prov.get_block_status(*blkid)? { + if status == alpen_vertex_db::traits::BlockStatus::Invalid { + warn!(?blkid, "rejecting block that fails EL validation"); + return Ok(false); + } + } + + // TODO more stuff + + Ok(true) +} + +/// Returns if we should switch to the new fork. This is dependent on our +/// current tip and any of the competing forks. It's "sticky" in that it'll try +/// to stay where we currently are unless there's a definitely-better fork. +fn pick_best_block<'t, D: Database>( + cur_tip: &'t L2BlockId, + mut tips_iter: impl Iterator, + database: &D, +) -> Result<&'t L2BlockId, Error> { + let l2prov = database.l2_provider(); + + let mut best_tip = cur_tip; + let mut best_block = l2prov + .get_block_data(*best_tip)? + .ok_or(Error::MissingL2Block(*best_tip))?; + + // The implementation of this will only switch to a new tip if it's a higher + // height than our current tip. We'll make this more sophisticated in the + // future if we have a more sophisticated consensus protocol. + while let Some(other_tip) = tips_iter.next() { + if other_tip == cur_tip { + continue; + } + + let other_block = l2prov + .get_block_data(*other_tip)? + .ok_or(Error::MissingL2Block(*other_tip))?; + + let best_header = best_block.header(); + let other_header = other_block.header(); + + if other_header.blockidx() > best_header.blockidx() { + best_tip = other_tip; + best_block = other_block; + } + } + + Ok(best_tip) +} diff --git a/crates/consensus-logic/src/credential.rs b/crates/consensus-logic/src/credential.rs new file mode 100644 index 000000000..7c036d795 --- /dev/null +++ b/crates/consensus-logic/src/credential.rs @@ -0,0 +1,40 @@ +//! Logic to check block credentials. + +use tracing::*; + +use alpen_vertex_primitives::{ + block_credential::CredRule, + buf::{Buf32, Buf64}, + params::Params, +}; +use alpen_vertex_state::{block::L2BlockHeader, consensus::ConsensusChainState}; + +pub fn check_block_credential( + header: &L2BlockHeader, + cs: &ConsensusChainState, + params: &Params, +) -> bool { + let sigcom = compute_header_sig_commitment(header); + match ¶ms.rollup().cred_rule { + CredRule::Unchecked => true, + CredRule::SchnorrKey(pubkey) => verify_schnorr_sig(header.sig(), &sigcom, pubkey), + } +} + +fn compute_header_sig_commitment(header: &L2BlockHeader) -> Buf32 { + // TODO implement this, just concat all the components together aside from + // the sig, probably should be poseidon + warn!("header commitment generation still unimplemented"); + Buf32::from([0; 32]) +} + +pub fn sign_schnorr_sig(msg: &Buf32, sk: &Buf32) -> Buf64 { + warn!("block signature signing still unimplemented"); + Buf64::from([0; 64]) +} + +fn verify_schnorr_sig(sig: &Buf64, msg: &Buf32, pk: &Buf32) -> bool { + // TODO implement signature verification + warn!("block signature verification still unimplemented"); + true +} diff --git a/crates/consensus-logic/src/ctl.rs b/crates/consensus-logic/src/ctl.rs new file mode 100644 index 000000000..8f21092d9 --- /dev/null +++ b/crates/consensus-logic/src/ctl.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use tokio::sync::mpsc; +use tracing::*; + +use alpen_vertex_db::traits::*; +use alpen_vertex_state::sync_event::SyncEvent; + +use crate::message::CsmMessage; + +/// Controller handle for the consensus state machine. Used to submit new sync +/// events for persistence and processing. +pub struct CsmController { + database: Arc, + csm_tx: mpsc::Sender, +} + +impl CsmController { + pub fn new(database: Arc, csm_tx: mpsc::Sender) -> Self { + Self { database, csm_tx } + } + + /// Writes a sync event to the database and updates the watch channel to + /// trigger the CSM executor to process the event. + pub fn submit_event(&self, sync_event: SyncEvent) -> anyhow::Result<()> { + let se_store = self.database.sync_event_store(); + let idx = se_store.write_sync_event(sync_event)?; + let msg = CsmMessage::EventInput(idx); + if self.csm_tx.blocking_send(msg).is_err() { + warn!("sync event receiver closed"); + } + + Ok(()) + } + + /// Writes a sync event to the database and updates the watch channel to + /// trigger the CSM executor to process the event. + pub async fn submit_event_async(&self, sync_event: SyncEvent) -> anyhow::Result<()> { + let se_store = self.database.sync_event_store(); + let idx = tokio::task::block_in_place(|| se_store.write_sync_event(sync_event))?; + let msg = CsmMessage::EventInput(idx); + if self.csm_tx.send(msg).await.is_err() { + warn!("sync even receiver closed"); + } + + Ok(()) + } +} diff --git a/crates/consensus-logic/src/errors.rs b/crates/consensus-logic/src/errors.rs new file mode 100644 index 000000000..159e5c53d --- /dev/null +++ b/crates/consensus-logic/src/errors.rs @@ -0,0 +1,58 @@ +use alpen_vertex_evmctl::errors::EngineError; +use thiserror::Error; + +use alpen_vertex_state::block::L2BlockId; +use alpen_vertex_state::l1::L1BlockId; + +#[derive(Debug, Error)] +pub enum Error { + #[error("invalid sync event index {0}")] + MissingSyncEvent(u64), + + #[error("L2 blkid {0:?} missing from database")] + MissingL2Block(L2BlockId), + + #[error("L1 blkid {0:?} missing from database")] + MissingL1Block(L1BlockId), + + #[error("missing expected consensus writes at {0}")] + MissingConsensusWrites(u64), + + // This probably shouldn't happen, it would suggest the database is + // misbehaving. + #[error("missing expected state checkpoint at {0}")] + MissingCheckpoint(u64), + + #[error("unable to find reorg {0:?} -> {1:?})")] + UnableToFindReorg(L2BlockId, L2BlockId), + + #[error("tried to skip event index {0} (cur state idx {1})")] + SkippedEventIdx(u64, u64), + + #[error("chaintip: {0}")] + ChainTip(#[from] ChainTipError), + + #[error("engine: {0}")] + Engine(#[from] EngineError), + + #[error("db: {0}")] + Db(#[from] alpen_vertex_db::errors::DbError), + + #[error("not yet implemented")] + Unimplemented, + + #[error("{0}")] + Other(String), +} + +#[derive(Debug, Error)] +pub enum ChainTipError { + #[error("blockid {0:?} already attached")] + BlockAlreadyAttached(L2BlockId), + + #[error("tried to attach blkid {0:?} but missing parent blkid {1:?}")] + AttachMissingParent(L2BlockId, L2BlockId), + + #[error("tried to finalize unknown block {0:?}")] + MissingBlock(L2BlockId), +} diff --git a/crates/consensus-logic/src/lib.rs b/crates/consensus-logic/src/lib.rs index ec9b894d4..3c92a6a23 100644 --- a/crates/consensus-logic/src/lib.rs +++ b/crates/consensus-logic/src/lib.rs @@ -1 +1,14 @@ //! Consensus validation logic and core state machine + +pub mod chain_tip; +pub mod credential; +pub mod ctl; +pub mod message; +pub mod reorg; +pub mod state_tracker; +pub mod status; +pub mod transition; +pub mod unfinalized_tracker; +pub mod worker; + +pub mod errors; diff --git a/crates/consensus-logic/src/message.rs b/crates/consensus-logic/src/message.rs new file mode 100644 index 000000000..906c57667 --- /dev/null +++ b/crates/consensus-logic/src/message.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; + +use alpen_vertex_state::{ + block::L2BlockId, + consensus::ConsensusState, + operation::{ConsensusOutput, SyncAction}, +}; + +/// Sync control message. +#[derive(Copy, Clone, Debug)] +pub enum CsmMessage { + /// Process a sync event at a given index. + EventInput(u64), +} + +/// Message about a new block the tip tracker might do something with. +#[derive(Clone, Debug)] +pub enum ChainTipMessage { + /// New consensus state with the output that produced it. + NewState(Arc, Arc), + + /// New block coming in from over the network to be considered. + NewBlock(L2BlockId), +} diff --git a/crates/consensus-logic/src/reorg.rs b/crates/consensus-logic/src/reorg.rs new file mode 100644 index 000000000..7407784e1 --- /dev/null +++ b/crates/consensus-logic/src/reorg.rs @@ -0,0 +1,405 @@ +//! Reorg planning types. + +use alpen_vertex_state::block::L2BlockId; + +use crate::unfinalized_tracker; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Reorg { + /// Blocks we're removing, in the order we're removing them. + down: Vec, + + /// Pivot block that's shared on both chains. + pivot: L2BlockId, + + /// Blocks we're adding, in the order we're adding them. + up: Vec, +} + +impl Reorg { + pub fn revert_iter(&self) -> impl Iterator { + self.down.iter() + } + + pub fn pivot(&self) -> &L2BlockId { + &self.pivot + } + + pub fn apply_iter(&self) -> impl Iterator { + self.up.iter() + } + + pub fn old_tip(&self) -> &L2BlockId { + if self.down.is_empty() { + &self.pivot + } else { + &self.down[0] + } + } + + pub fn new_tip(&self) -> &L2BlockId { + if self.up.is_empty() { + &self.pivot + } else { + &self.up[self.up.len() - 1] + } + } + + /// If the reorg isn't really a reorg, it's just rolling back blocks or + /// adding new blocks. + pub fn is_weird(&self) -> bool { + self.up.is_empty() || self.down.is_empty() + } + + /// If the reorg describes no change in tip. + pub fn is_identity(&self) -> bool { + self.up.is_empty() && self.down.is_empty() + } +} + +/// Computes the reorg path from one block to a new tip, aborting at some reorg +/// depth. This behaves sensibly when one block is an ancestor of another or +/// are the same, although that might not be useful. +pub fn compute_reorg( + start: &L2BlockId, + dest: &L2BlockId, + limit_depth: usize, + tracker: &unfinalized_tracker::UnfinalizedBlockTracker, +) -> Option { + // Handle an "identity" reorg. + if start == dest { + return Some(Reorg { + down: Vec::new(), + pivot: *start, + up: Vec::new(), + }); + } + + let mut down_blocks: Vec<&L2BlockId> = vec![start]; + let mut up_blocks: Vec<&L2BlockId> = vec![dest]; + + loop { + // Check to see if we should abort. + if down_blocks.len() > limit_depth || up_blocks.len() > limit_depth { + return None; + } + + // Extend the "down" side down, see if it matches. + let down_at = &down_blocks[down_blocks.len() - 1]; + if *down_at != tracker.finalized_tip() { + let down_parent = tracker.get_parent(down_at).expect("reorg: get parent"); + + // This looks crazy but it's actually correct, and the clearest way + // to do it. + if let Some((idx, pivot)) = up_blocks + .iter() + .enumerate() + .filter(|(_, id)| **id == down_parent) + .next() + { + // Cool, now we have our pivot. + let pivot = **pivot; + let down = down_blocks.into_iter().copied().collect(); + let up = up_blocks.into_iter().take(idx).rev().copied().collect(); + return Some(Reorg { down, pivot, up }); + } + + down_blocks.push(down_parent); + } + + // Extend the "up" side down, see if it matches. + let up_at = &up_blocks[up_blocks.len() - 1]; + if *up_at != tracker.finalized_tip() { + let up_parent = tracker.get_parent(up_at).expect("reorg: get parent"); + + // Do this crazy thing again but in the other direction. + if let Some((idx, pivot)) = down_blocks + .iter() + .enumerate() + .filter(|(_, id)| **id == up_parent) + .next() + { + let pivot = **pivot; + let down = down_blocks.into_iter().take(idx).copied().collect(); + let up = up_blocks.into_iter().rev().copied().collect(); + return Some(Reorg { down, pivot, up }); + } + + up_blocks.push(up_parent); + } + } +} + +#[cfg(test)] +mod tests { + use alpen_vertex_state::block::L2BlockId; + use rand::RngCore; + + use crate::unfinalized_tracker; + + use super::{compute_reorg, Reorg}; + + fn rand_blkid() -> L2BlockId { + use rand::rngs::OsRng; + let mut rng = OsRng; + let mut buf = [0; 32]; + rng.fill_bytes(&mut buf); + L2BlockId::from(alpen_vertex_primitives::buf::Buf32::from(buf)) + } + + #[test] + fn test_eq_len() { + let base = rand_blkid(); + let mut tracker = unfinalized_tracker::UnfinalizedBlockTracker::new_empty(base); + + // Set up the two branches. + let side_1 = [base, rand_blkid(), rand_blkid(), rand_blkid()]; + let side_2 = [side_1[1], rand_blkid(), rand_blkid()]; + eprintln!("base {base:?}\nside1 {side_1:#?}\nside2 {side_2:#?}"); + + let exp_reorg = Reorg { + down: vec![side_1[3], side_1[2]], + pivot: side_1[1], + up: vec![side_2[1], side_2[2]], + }; + + // Insert them. + side_1 + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + side_2 + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + + let reorg = compute_reorg(side_1.last().unwrap(), side_2.last().unwrap(), 10, &tracker); + + let reorg = reorg.expect("test: reorg not found"); + eprintln!("expected {exp_reorg:#?}\nfound {reorg:#?}"); + assert_eq!(reorg, exp_reorg); + } + + #[test] + fn test_longer_down() { + let base = rand_blkid(); + let mut tracker = unfinalized_tracker::UnfinalizedBlockTracker::new_empty(base); + + // Set up the two branches. + let side_1 = [base, rand_blkid(), rand_blkid(), rand_blkid(), rand_blkid()]; + let side_2 = [side_1[1], rand_blkid(), rand_blkid()]; + eprintln!("base {base:?}\nside1 {side_1:#?}\nside2 {side_2:#?}"); + + let exp_reorg = Reorg { + down: vec![side_1[4], side_1[3], side_1[2]], + pivot: side_1[1], + up: vec![side_2[1], side_2[2]], + }; + + // Insert them. + side_1 + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + side_2 + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + + let reorg = compute_reorg(side_1.last().unwrap(), side_2.last().unwrap(), 10, &tracker); + + let reorg = reorg.expect("test: reorg not found"); + eprintln!("expected {exp_reorg:#?}\nfound {reorg:#?}"); + assert_eq!(reorg, exp_reorg); + } + + #[test] + fn test_longer_up() { + let base = rand_blkid(); + let mut tracker = unfinalized_tracker::UnfinalizedBlockTracker::new_empty(base); + + // Set up the two branches. + let side_1 = [base, rand_blkid(), rand_blkid(), rand_blkid()]; + let side_2 = [ + side_1[1], + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + ]; + eprintln!("base {base:?}\nside1 {side_1:#?}\nside2 {side_2:#?}"); + + let exp_reorg = Reorg { + down: vec![side_1[3], side_1[2]], + pivot: side_1[1], + up: vec![side_2[1], side_2[2], side_2[3], side_2[4]], + }; + + // Insert them. + side_1 + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + side_2 + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + + let reorg = compute_reorg(side_1.last().unwrap(), side_2.last().unwrap(), 10, &tracker); + + let reorg = reorg.expect("test: reorg not found"); + eprintln!("expected {exp_reorg:#?}\nfound {reorg:#?}"); + assert_eq!(reorg, exp_reorg); + } + + #[test] + fn test_too_deep() { + let base = rand_blkid(); + let mut tracker = unfinalized_tracker::UnfinalizedBlockTracker::new_empty(base); + + // Set up the two branches. + let side_1 = [ + base, + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + ]; + let side_2 = [ + side_1[1], + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + ]; + eprintln!("base {base:?}\nside1 {side_1:#?}\nside2 {side_2:#?}"); + + // Insert them. + side_1 + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + side_2 + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + + let reorg = compute_reorg(side_1.last().unwrap(), side_2.last().unwrap(), 3, &tracker); + + if let Some(reorg) = reorg { + eprintln!("reorg found wrongly {reorg:#?}"); + panic!("reorg found wrongly"); + } + } + + #[test] + fn test_start_ancestor() { + let base = rand_blkid(); + let mut tracker = unfinalized_tracker::UnfinalizedBlockTracker::new_empty(base); + + // Set up the two branches. + let chain = [ + base, + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + ]; + eprintln!("base {base:?}\nchain {chain:#?}"); + + // Insert them. + chain + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + + let src = &chain[3]; + let dest = chain.last().unwrap(); + let reorg = compute_reorg(src, dest, 10, &tracker); + + let exp_reorg = Reorg { + down: Vec::new(), + pivot: *src, + up: vec![chain[4], chain[5], chain[6]], + }; + + let reorg = reorg.expect("test: reorg not found"); + eprintln!("expected {exp_reorg:#?}\nfound {reorg:#?}"); + assert_eq!(reorg, exp_reorg); + assert!(reorg.is_weird()); + } + + #[test] + fn test_end_ancestor() { + let base = rand_blkid(); + let mut tracker = unfinalized_tracker::UnfinalizedBlockTracker::new_empty(base); + + // Set up the two branches. + let chain = [ + base, + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + ]; + eprintln!("base {base:?}\nchain {chain:#?}"); + + // Insert them. + chain + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + + let src = chain.last().unwrap(); + let dest = &chain[3]; + let reorg = compute_reorg(src, dest, 10, &tracker); + + let exp_reorg = Reorg { + down: vec![chain[6], chain[5], chain[4]], + pivot: *dest, + up: Vec::new(), + }; + + let reorg = reorg.expect("test: reorg not found"); + eprintln!("expected {exp_reorg:#?}\nfound {reorg:#?}"); + assert_eq!(reorg, exp_reorg); + assert!(reorg.is_weird()); + } + + #[test] + fn test_identity() { + let base = rand_blkid(); + let mut tracker = unfinalized_tracker::UnfinalizedBlockTracker::new_empty(base); + + // Set up the two branches. + let chain = [ + base, + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + rand_blkid(), + ]; + eprintln!("base {base:?}\nchain {chain:#?}"); + + // Insert them. + chain + .windows(2) + .for_each(|pair| tracker.insert_fake_block(pair[1], pair[0])); + + let src = chain.last().unwrap(); + let dest = src; + let reorg = compute_reorg(src, dest, 10, &tracker); + eprintln!("reorg found wrongly {reorg:#?}"); + + let exp_reorg = Reorg { + down: Vec::new(), + pivot: *dest, + up: Vec::new(), + }; + + let reorg = reorg.expect("test: reorg not found"); + eprintln!("expected {exp_reorg:#?}\nfound {reorg:#?}"); + assert_eq!(reorg, exp_reorg); + assert!(reorg.is_identity()); + } +} diff --git a/crates/consensus-logic/src/state_tracker.rs b/crates/consensus-logic/src/state_tracker.rs new file mode 100644 index 000000000..80c7e80ac --- /dev/null +++ b/crates/consensus-logic/src/state_tracker.rs @@ -0,0 +1,111 @@ +//! Tracker to manage authoritative consensus states as we compute the +//! transition outputs. + +use std::sync::Arc; + +use tracing::*; + +use alpen_vertex_db::traits::*; +use alpen_vertex_primitives::params::Params; +use alpen_vertex_state::{ + consensus::ConsensusState, + operation::{self, ConsensusOutput}, +}; + +use crate::errors::Error; +use crate::transition; + +pub struct StateTracker { + params: Arc, + database: Arc, + + cur_state_idx: u64, + + cur_state: Arc, +} + +impl StateTracker { + pub fn new( + params: Arc, + database: Arc, + cur_state_idx: u64, + cur_state: Arc, + ) -> Self { + Self { + params, + database, + cur_state_idx, + cur_state, + } + } + + pub fn cur_state_idx(&self) -> u64 { + self.cur_state_idx + } + + pub fn cur_state(&self) -> &Arc { + &self.cur_state + } + + /// Given the next event index, computes the state application if the + /// requisite data is available. + pub fn advance_consensus_state(&mut self, ev_idx: u64) -> anyhow::Result { + if ev_idx != self.cur_state_idx + 1 { + return Err(Error::SkippedEventIdx(ev_idx, self.cur_state_idx).into()); + } + + // Load the event from the database. + let db = self.database.as_ref(); + let ev_prov = db.sync_event_provider(); + let cs_store = db.consensus_state_store(); + let ev = ev_prov + .get_sync_event(ev_idx)? + .ok_or(Error::MissingSyncEvent(ev_idx))?; + + // Compute the state transition. + let outp = transition::process_event(&self.cur_state, &ev, db, &self.params)?; + + // Clone the state and make a new one. + let mut new_state = self.cur_state.as_ref().clone(); + operation::apply_writes_to_state(&mut new_state, outp.writes().iter().cloned()); + + // Store the outputs. + // TODO ideally avoid clone + cs_store.write_consensus_output(ev_idx, outp.clone())?; + + Ok(outp) + } + + /// Writes the current state to the database as a new checkpoint. + pub fn store_checkpoint(&self) -> anyhow::Result<()> { + let cs_store = self.database.consensus_state_store(); + let state = self.cur_state.as_ref().clone(); // TODO avoid clone + cs_store.write_consensus_checkpoint(self.cur_state_idx, state)?; + Ok(()) + } +} + +/// Reconstructs the last written consensus state from the last checkpoint and +/// any outputs, returning the state index and the consensus state. Used to +/// prepare the state for the state tracker. +// TODO tweak this to be able to reconstruct any state? +pub fn reconstruct_cur_state( + cs_prov: &impl ConsensusStateProvider, +) -> anyhow::Result<(u64, ConsensusState)> { + let last_write_idx = cs_prov.get_last_write_idx()?; + let last_ckpt_idx = cs_prov.get_last_checkpoint_idx()?; + debug!(%last_write_idx, %last_ckpt_idx, "reconstructing state from checkpoint"); + + let mut state = cs_prov + .get_state_checkpoint(last_ckpt_idx)? + .ok_or(Error::MissingCheckpoint(last_ckpt_idx))?; + + for i in last_ckpt_idx..=last_write_idx { + let writes = cs_prov + .get_consensus_writes(i)? + .ok_or(Error::MissingConsensusWrites(i))?; + operation::apply_writes_to_state(&mut state, writes.into_iter()); + } + + Ok((last_write_idx, state)) +} diff --git a/crates/consensus-logic/src/status.rs b/crates/consensus-logic/src/status.rs new file mode 100644 index 000000000..56c1bca5f --- /dev/null +++ b/crates/consensus-logic/src/status.rs @@ -0,0 +1,11 @@ +//! Handle to inspect the current consensus state and wait for updates when there are any. + +use tokio::sync::watch; + +pub struct StatusTracker { + state_rx: watch::Receiver<()>, +} + +pub struct StatusUpdater { + state_tx: watch::Sender<()>, +} diff --git a/crates/consensus-logic/src/transition.rs b/crates/consensus-logic/src/transition.rs new file mode 100644 index 000000000..966b02a53 --- /dev/null +++ b/crates/consensus-logic/src/transition.rs @@ -0,0 +1,65 @@ +//! Core state transition function. + +use alpen_vertex_db::errors::DbError; +use alpen_vertex_db::traits::{Database, L1DataProvider, L2DataProvider}; +use alpen_vertex_primitives::prelude::*; +use alpen_vertex_state::consensus::*; +use alpen_vertex_state::operation::*; +use alpen_vertex_state::sync_event::SyncEvent; + +use crate::errors::*; + +/// Processes the event given the current consensus state, producing some +/// output. This can return database errors. +pub fn process_event( + state: &ConsensusState, + ev: &SyncEvent, + database: &D, + params: &Params, +) -> Result { + let mut writes = Vec::new(); + let mut actions = Vec::new(); + + match ev { + SyncEvent::L1Block(height, l1blkid) => { + // FIXME this doesn't do any SPV checks to make sure we only go to + // a longer chain, it just does it unconditionally + let l1prov = database.l1_provider(); + let blkmf = l1prov.get_block_manifest(*height)?; + + // TODO do the consensus checks + + writes.push(ConsensusWrite::AcceptL1Block(*l1blkid)); + + // TODO if we have some number of L1 blocks finalized, also emit an + // `UpdateBuried` write + } + + SyncEvent::L1DABatch(blkids) => { + // TODO load it up and figure out what's there, see if we have to + // load diffs from L1 or something + let l2prov = database.l2_provider(); + + for id in blkids { + let block = l2prov + .get_block_data(*id)? + .ok_or(Error::MissingL2Block(*id))?; + + // TODO do whatever changes we have to to accept the new block + } + } + + SyncEvent::NewTipBlock(blkid) => { + let l2prov = database.l2_provider(); + let block = l2prov + .get_block_data(*blkid)? + .ok_or(Error::MissingL2Block(*blkid))?; + + // TODO better checks here + writes.push(ConsensusWrite::AcceptL2Block(*blkid)); + actions.push(SyncAction::UpdateTip(*blkid)); + } + } + + Ok(ConsensusOutput::new(writes, actions)) +} diff --git a/crates/consensus-logic/src/unfinalized_tracker.rs b/crates/consensus-logic/src/unfinalized_tracker.rs new file mode 100644 index 000000000..5ed238806 --- /dev/null +++ b/crates/consensus-logic/src/unfinalized_tracker.rs @@ -0,0 +1,249 @@ +//! Tracker for keeping track of the tree of unfinalized blocks. + +use std::collections::*; + +use alpen_vertex_primitives::buf::Buf32; +use alpen_vertex_state::block::{L2BlockHeader, L2BlockId}; + +use crate::errors::ChainTipError; + +/// Entry in block tracker table we use to relate a block with its immediate +/// relatives. +struct BlockEntry { + parent: L2BlockId, + children: HashSet, +} + +/// Tracks the unfinalized block tree on top of the finalized tip. +pub struct UnfinalizedBlockTracker { + /// Block that we treat as a base that all of the other blocks that we're + /// considering uses. + finalized_tip: L2BlockId, + + /// Table of pending blocks near the tip of the block tree. + pending_table: HashMap, + + /// Unfinalized chain tips. This also includes the finalized tip if there's + /// no pending blocks. + unfinalized_tips: HashSet, +} + +impl UnfinalizedBlockTracker { + /// Creates a new tracker with just a finalized tip and no pending blocks. + pub fn new_empty(finalized_tip: L2BlockId) -> Self { + let mut pending_tbl = HashMap::new(); + pending_tbl.insert( + finalized_tip, + BlockEntry { + parent: L2BlockId::from(Buf32::zero()), + children: HashSet::new(), + }, + ); + + let mut unf_tips = HashSet::new(); + unf_tips.insert(finalized_tip); + Self { + finalized_tip, + pending_table: pending_tbl, + unfinalized_tips: unf_tips, + } + } + + /// Returns the "fianlized tip", which is the base of the unfinalized tree. + pub fn finalized_tip(&self) -> &L2BlockId { + &self.finalized_tip + } + + /// Gets the parent of a block from within the tree. Returns `None` if the + /// block or its parent isn't in the tree. Returns `None` for the finalized + /// tip block, since its parent isn't in the tree. + pub fn get_parent(&self, id: &L2BlockId) -> Option<&L2BlockId> { + if *id == self.finalized_tip { + return None; + } + self.pending_table.get(id).map(|ent| &ent.parent) + } + + /// Returns an iterator over the chain tips. + pub fn chain_tips_iter(&self) -> impl Iterator { + self.unfinalized_tips.iter() + } + + /// Checks if the block is traceable all the way back to the finalized tip. + fn sanity_check_parent_seq(&self, blkid: &L2BlockId) -> bool { + if *blkid == self.finalized_tip { + return true; + } + + if let Some(ent) = self.pending_table.get(blkid) { + self.sanity_check_parent_seq(&ent.parent) + } else { + false + } + } + + /// Tries to attach a block to the tree. Does not verify the header + /// corresponds to the given blockid. + /// + /// Returns if this new block forks off and creates a new unfinalized tip + /// block. + // TODO do a `SealedL2BlockHeader` thing that includes the blkid + pub fn attach_block( + &mut self, + blkid: L2BlockId, + header: &L2BlockHeader, + ) -> Result { + if self.pending_table.contains_key(&blkid) { + return Err(ChainTipError::BlockAlreadyAttached(blkid)); + } + + let parent_blkid = header.parent(); + + if let Some(parent_ent) = self.pending_table.get_mut(parent_blkid) { + parent_ent.children.insert(blkid); + } else { + return Err(ChainTipError::AttachMissingParent(blkid, *header.parent())); + } + + let ent = BlockEntry { + parent: *header.parent(), + children: HashSet::new(), + }; + + self.pending_table.insert(blkid, ent); + + // Also update the tips table, removing the parent if it's there. + let did_replace = self.unfinalized_tips.remove(parent_blkid); + self.unfinalized_tips.insert(blkid); + + Ok(!did_replace) + } + + /// Updates the finalized block tip, returning a report that includes the + /// precise blocks that were finalized transatively and any blocks on + /// competing chains that were rejected. + pub fn update_finalized_tip( + &mut self, + blkid: &L2BlockId, + ) -> Result { + // Sanity check the block so we know it's here. + if !self.sanity_check_parent_seq(blkid) { + return Err(ChainTipError::MissingBlock(*blkid)); + } + + let mut path = vec![]; + let mut to_evict = Vec::new(); + let mut at = *blkid; + + // Walk down to the current finalized tip and put everything in the + // eviction table. + while at != self.finalized_tip { + path.push(at); + + // Get the parent of the block we're at, add all of the children + // other than the one we're at to the eviction table. + let ent = self.pending_table.get(&at).unwrap(); + for ch in &ent.children { + if *ch != at { + to_evict.push(*ch); + } + } + + at = ent.parent; + } + + // Now actually go through and evict all the blocks we said we were + // going to, adding more on as we add them. + let mut evicted = Vec::new(); + while !to_evict.is_empty() { + let evicting = to_evict.pop().unwrap(); + let ent = self + .pending_table + .remove(&evicting) + .expect("chaintip: evicting dangling child ref"); + self.unfinalized_tips.remove(&evicting); + + to_evict.extend(ent.children.into_iter()); + evicted.push(evicting); + } + + // And also remove blocks that we're finalizing, *except* the new + // finalized tip. + assert!(self.pending_table.remove(&self.finalized_tip).is_some()); + for pblk in &path { + if pblk != blkid { + assert!(self.pending_table.remove(pblk).is_some()); + } + } + + // Just update the finalized tip now. + let old_tip = self.finalized_tip; + self.finalized_tip = *blkid; + + // Sanity check and construct the report. + assert!(!path.is_empty(), "chaintip: somehow finalized no blocks"); + Ok(FinalizeReport { + prev_tip: old_tip, + finalized: path, + rejected: evicted, + }) + } + + #[cfg(test)] + pub fn unchecked_set_finalized_tip(&mut self, id: L2BlockId) { + self.finalized_tip = id; + } + + #[cfg(test)] + pub fn insert_fake_block(&mut self, id: L2BlockId, parent: L2BlockId) { + let ent = BlockEntry { + parent, + children: HashSet::new(), + }; + + self.pending_table.insert(id, ent); + } +} + +/// Report of blocks that we finalized when finalizing a new tip and blocks that +/// we've permanently rejected. +#[derive(Clone, Debug)] +pub struct FinalizeReport { + /// Previous tip. + prev_tip: L2BlockId, + + /// Block we've newly finalized. The first one of this + finalized: Vec, + + /// Any blocks that were on competing chains than the one we finalized. + rejected: Vec, +} + +impl FinalizeReport { + /// Returns the blkid that was the previously finalized tip. It's still + /// finalized, but there's newer blocks that are also finalized now. + pub fn prev_tip(&self) -> &L2BlockId { + &self.prev_tip + } + + /// The new chain tip that's finalized now. + pub fn new_tip(&self) -> &L2BlockId { + if self.finalized.is_empty() { + &self.prev_tip + } else { + &self.finalized[0] + } + } + + /// Returns a slice of the blkids that were rejected. + pub fn rejected(&self) -> &[L2BlockId] { + &self.rejected + } + + /// Returns an iterator over the blkids that were rejected. + pub fn rejected_iter(&self) -> impl Iterator { + self.rejected.iter() + } +} + +// TODO unit tests diff --git a/crates/consensus-logic/src/worker.rs b/crates/consensus-logic/src/worker.rs new file mode 100644 index 000000000..c0f9bae63 --- /dev/null +++ b/crates/consensus-logic/src/worker.rs @@ -0,0 +1,131 @@ +//! Consensus logic worker task. + +use std::sync::Arc; + +use tokio::sync::mpsc; +use tracing::*; + +use alpen_vertex_db::{traits::*, DbResult}; +use alpen_vertex_evmctl::engine::ExecEngineCtl; +use alpen_vertex_primitives::prelude::*; +use alpen_vertex_state::{ + block::L2BlockId, + consensus::ConsensusState, + operation::{ConsensusOutput, ConsensusWrite, SyncAction}, + sync_event::SyncEvent, +}; + +use crate::{errors::Error, message::CsmMessage, state_tracker, transition}; + +/// Mutatble worker state that we modify in the consensus worker task. +/// +/// Unable to be shared across threads. Any data we want to export we'll do +/// through another handle. +pub struct WorkerState { + /// Consensus parameters. + params: Arc, + + /// Underlying database hierarchy that writes ultimately end up on. + // TODO should we move this out? + database: Arc, + + /// Tracker used to remember the current consensus state. + state_tracker: state_tracker::StateTracker, +} + +impl WorkerState { + /// Constructs a new instance by reconstructing the current consensus state + /// from the provided database layer. + pub fn open(params: Arc, database: Arc) -> anyhow::Result { + let cs_prov = database.consensus_state_provider().as_ref(); + let (cur_state_idx, cur_state) = state_tracker::reconstruct_cur_state(cs_prov)?; + let state_tracker = state_tracker::StateTracker::new( + params.clone(), + database.clone(), + cur_state_idx, + Arc::new(cur_state), + ); + + Ok(Self { + params, + database, + state_tracker, + }) + } + + /// Gets a ref to the consensus state from the inner state tracker. + pub fn cur_state(&self) -> &Arc { + self.state_tracker.cur_state() + } +} + +/// Receives messages from channel to update consensus state with. +pub fn consensus_worker_task( + mut state: WorkerState, + engine: Arc, + mut inp_msg_ch: mpsc::Receiver, +) -> Result<(), Error> { + while let Some(msg) = inp_msg_ch.blocking_recv() { + if let Err(e) = process_msg(&mut state, engine.as_ref(), &msg) { + error!(err = %e, "failed to process sync message, skipping"); + } + } + + info!("consensus task exiting"); + + Ok(()) +} + +fn process_msg( + state: &mut WorkerState, + engine: &E, + msg: &CsmMessage, +) -> anyhow::Result<()> { + match msg { + CsmMessage::EventInput(idx) => { + // TODO ensure correct event index ordering + handle_sync_event(state, engine, *idx)?; + Ok(()) + } + } +} + +fn handle_sync_event( + state: &mut WorkerState, + engine: &E, + ev_idx: u64, +) -> anyhow::Result<()> { + // Perform the main step of deciding what the output we're operating on. + let outp = state.state_tracker.advance_consensus_state(ev_idx)?; + + for action in outp.actions() { + match action { + SyncAction::UpdateTip(blkid) => { + // Tell the EL that this block does indeed look good. + debug!(?blkid, "updating EL safe block"); + engine.update_safe_block(*blkid)?; + + // TODO update the tip we report in RPCs and whatnot + } + + SyncAction::MarkInvalid(blkid) => { + // TODO not sure what this should entail yet + warn!(?blkid, "marking block invalid!"); + let store = state.database.l2_store(); + store.set_block_status(*blkid, BlockStatus::Invalid)?; + } + + SyncAction::FinalizeBlock(blkid) => { + // For the tip tracker this gets picked up later. We don't have + // to do anything here *necessarily*. + // TODO we should probably emit a state checkpoint here if we + // aren't already + info!(?blkid, "finalizing block"); + } + } + } + + // TODO broadcast the new state somehow + + Ok(()) +} diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index 6f4daf445..57b55367e 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -11,6 +11,7 @@ alpen-vertex-state = { workspace = true } anyhow = { workspace = true } arbitrary = { workspace = true } borsh = { workspace = true } +parking_lot = { workspace = true } reth-db = { workspace = true } rockbound = { workspace = true } rocksdb = { workspace = true } diff --git a/crates/db/src/consensus_state/db.rs b/crates/db/src/consensus_state/db.rs index 23e050895..1eeba7be6 100644 --- a/crates/db/src/consensus_state/db.rs +++ b/crates/db/src/consensus_state/db.rs @@ -1,19 +1,23 @@ +use std::sync::Arc; + use rockbound::{Schema, DB}; +use alpen_vertex_state::operation::*; + use super::schemas::{ConsensusOutputSchema, ConsensusStateSchema}; use crate::errors::*; use crate::traits::*; -pub struct ConsensusStateDB { - db: DB, +pub struct ConsensusStateDb { + db: Arc, } -impl ConsensusStateDB { +impl ConsensusStateDb { /// Wraps an existing database handle. /// /// Assumes it was opened with column families as defined in `STORE_COLUMN_FAMILIES`. // FIXME Make it better/generic. - pub fn new(db: DB) -> Self { + pub fn new(db: Arc) -> Self { Self { db } } @@ -33,7 +37,7 @@ impl ConsensusStateDB { } } -impl ConsensusStateStore for ConsensusStateDB { +impl ConsensusStateStore for ConsensusStateDb { fn write_consensus_output(&self, idx: u64, output: ConsensusOutput) -> DbResult<()> { let expected_idx = match self.get_last_idx::()? { Some(last_idx) => last_idx + 1, @@ -60,7 +64,7 @@ impl ConsensusStateStore for ConsensusStateDB { } } -impl ConsensusStateProvider for ConsensusStateDB { +impl ConsensusStateProvider for ConsensusStateDb { fn get_last_write_idx(&self) -> DbResult { match self.get_last_idx::()? { Some(idx) => Ok(idx), @@ -68,10 +72,7 @@ impl ConsensusStateProvider for ConsensusStateDB { } } - fn get_consensus_writes( - &self, - idx: u64, - ) -> DbResult>> { + fn get_consensus_writes(&self, idx: u64) -> DbResult>> { let output = self.db.get::(&idx)?; match output { Some(out) => Ok(Some(out.writes().to_owned())), @@ -79,10 +80,7 @@ impl ConsensusStateProvider for ConsensusStateDB { } } - fn get_consensus_actions( - &self, - idx: u64, - ) -> DbResult>> { + fn get_consensus_actions(&self, idx: u64) -> DbResult>> { let output = self.db.get::(&idx)?; match output { Some(out) => Ok(Some(out.actions().to_owned())), @@ -116,6 +114,13 @@ impl ConsensusStateProvider for ConsensusStateDB { Err(DbError::NotBootstrapped) } + + fn get_state_checkpoint( + &self, + idx: u64, + ) -> DbResult> { + Ok(self.db.get::(&idx)?) + } } #[cfg(test)] @@ -130,11 +135,11 @@ mod tests { use alpen_vertex_state::consensus::ConsensusState; use super::*; - use crate::{traits::ConsensusOutput, STORE_COLUMN_FAMILIES}; + use crate::STORE_COLUMN_FAMILIES; const DB_NAME: &str = "consensus_state_db"; - fn get_new_db(path: &Path) -> anyhow::Result { + fn get_new_db(path: &Path) -> anyhow::Result> { // TODO: add other options as appropriate. let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); @@ -148,12 +153,13 @@ mod tests { .collect::>(), &db_opts, ) + .map(Arc::new) } - fn setup_db() -> ConsensusStateDB { + fn setup_db() -> ConsensusStateDb { let temp_dir = TempDir::new().expect("failed to create temp dir"); let db = get_new_db(&temp_dir.into_path()).unwrap(); - ConsensusStateDB::new(db) + ConsensusStateDb::new(db) } fn generate_arbitrary<'a, T: Arbitrary<'a> + Clone>(bytes: &'a [u8]) -> T { diff --git a/crates/db/src/consensus_state/schemas.rs b/crates/db/src/consensus_state/schemas.rs index da66b597b..b17478cb2 100644 --- a/crates/db/src/consensus_state/schemas.rs +++ b/crates/db/src/consensus_state/schemas.rs @@ -1,5 +1,5 @@ use alpen_vertex_state::consensus::ConsensusState; -use crate::traits::ConsensusOutput; +use alpen_vertex_state::operation::ConsensusOutput; use crate::define_table_with_default_codec; use crate::define_table_without_codec; @@ -15,4 +15,4 @@ define_table_with_default_codec!( define_table_with_default_codec!( /// A table to store Consesus State (ConsensusStateSchema) u64 => ConsensusState -); \ No newline at end of file +); diff --git a/crates/db/src/database.rs b/crates/db/src/database.rs new file mode 100644 index 000000000..e45943d17 --- /dev/null +++ b/crates/db/src/database.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use super::traits::*; + +/// Shim database type that assumes that all the database impls are wrapped in +/// `Arc`s and that the provider and stores are actually the same types. We +/// might actually use this in practice, it's just for testing. +pub struct CommonDatabase { + l1db: Arc, + l2db: Arc, + sedb: Arc, + csdb: Arc, +} + +impl CommonDatabase { + pub fn new(l1db: Arc, l2db: Arc, sedb: Arc, csdb: Arc) -> Self { + Self { + l1db, + l2db, + sedb, + csdb, + } + } +} + +impl< + L1: L1DataStore + L1DataProvider, + L2: L2DataStore + L2DataProvider, + S: SyncEventStore + SyncEventProvider, + C: ConsensusStateStore + ConsensusStateProvider, + > Database for CommonDatabase +{ + type L1Store = L1; + type L1Prov = L1; + type L2Store = L2; + type L2Prov = L2; + type SeStore = S; + type SeProv = S; + type CsStore = C; + type CsProv = C; + + fn l1_store(&self) -> &Arc { + &self.l1db + } + + fn l1_provider(&self) -> &Arc { + &self.l1db + } + + fn l2_store(&self) -> &Arc { + &self.l2db + } + + fn l2_provider(&self) -> &Arc { + &self.l2db + } + + fn sync_event_store(&self) -> &Arc { + &self.sedb + } + + fn sync_event_provider(&self) -> &Arc { + &self.sedb + } + + fn consensus_state_store(&self) -> &Arc { + &self.csdb + } + + fn consensus_state_provider(&self) -> &Arc { + &self.csdb + } +} diff --git a/crates/db/src/l1/db.rs b/crates/db/src/l1/db.rs index 6d1e8fd5c..9d14881c0 100644 --- a/crates/db/src/l1/db.rs +++ b/crates/db/src/l1/db.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use rockbound::{schema::KeyEncoder, SchemaBatch, DB}; use rocksdb::ReadOptions; use tracing::*; @@ -13,13 +15,13 @@ use crate::errors::*; use crate::traits::{L1BlockManifest, L1DataProvider, L1DataStore}; pub struct L1Db { - db: DB, + db: Arc, } impl L1Db { // NOTE: db is expected to open all the column families defined in STORE_COLUMN_FAMILIES. // FIXME: Make it better/generic. - pub fn new(db: DB) -> Self { + pub fn new(db: Arc) -> Self { Self { db } } @@ -208,7 +210,7 @@ mod tests { } } - fn get_new_db(path: &Path) -> anyhow::Result { + fn get_new_db(path: &Path) -> anyhow::Result> { // TODO: add other options as appropriate. let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); @@ -222,6 +224,7 @@ mod tests { .collect::>(), &db_opts, ) + .map(Arc::new) } fn setup_db() -> L1Db { diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 61782e415..bb31b40c2 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -7,13 +7,19 @@ use crate::l1::schemas::{L1BlockSchema, MmrSchema, TxnSchema}; use crate::sync_event::schemas::SyncEventSchema; pub mod consensus_state; +pub mod database; pub mod l1; +pub mod stubs; pub mod sync_event; pub mod errors; pub mod macros; pub mod traits; +pub type DbResult = anyhow::Result; + +pub const ROCKSDB_NAME: &str = "vertex"; + pub const STORE_COLUMN_FAMILIES: &[ColumnFamilyName] = &[ ConsensusOutputSchema::COLUMN_FAMILY_NAME, ConsensusStateSchema::COLUMN_FAMILY_NAME, @@ -23,3 +29,8 @@ pub const STORE_COLUMN_FAMILIES: &[ColumnFamilyName] = &[ TxnSchema::COLUMN_FAMILY_NAME, // TODO add col families for other store types ]; + +// Re-exports +pub use consensus_state::db::ConsensusStateDb; +pub use l1::db::L1Db; +pub use sync_event::db::SyncEventDb; diff --git a/crates/db/src/stubs/l2.rs b/crates/db/src/stubs/l2.rs new file mode 100644 index 000000000..f754b1c14 --- /dev/null +++ b/crates/db/src/stubs/l2.rs @@ -0,0 +1,75 @@ +use std::collections::*; + +use parking_lot::Mutex; + +use alpen_vertex_state::block::*; + +use crate::errors::*; +use crate::traits::*; + +/// Dummy implementation that isn't really compliant with the spec, but we don't +/// care because we just want to get something running. :sunglasses:. +pub struct StubL2Db { + blocks: Mutex>, + statuses: Mutex>, + heights: Mutex>, +} + +impl StubL2Db { + pub fn new() -> Self { + Self { + blocks: Mutex::new(HashMap::new()), + statuses: Mutex::new(HashMap::new()), + heights: Mutex::new(HashMap::new()), + } + } +} + +impl L2DataStore for StubL2Db { + fn put_block_data(&self, block: L2Block) -> DbResult<()> { + let blkid = block.header().get_blockid(); + let idx = block.header().blockidx(); + + { + let mut tbl = self.blocks.lock(); + tbl.insert(blkid, block); + } + + { + let mut tbl = self.heights.lock(); + tbl.insert(idx, blkid); + } + + Ok(()) + } + + fn del_block_data(&self, id: L2BlockId) -> DbResult { + let mut tbl = self.blocks.lock(); + Ok(tbl.remove(&id).is_some()) + } + + fn set_block_status(&self, id: L2BlockId, status: BlockStatus) -> DbResult<()> { + let mut tbl = self.statuses.lock(); + tbl.insert(id, status); + Ok(()) + } +} + +impl L2DataProvider for StubL2Db { + fn get_block_data(&self, id: L2BlockId) -> DbResult> { + let tbl = self.blocks.lock(); + Ok(tbl.get(&id).cloned()) + } + + /// This isn't compliant with what was intended because it only returns the + /// last block that was written for a height, *not* all of them. + fn get_blocks_at_height(&self, idx: u64) -> DbResult> { + let tbl = self.heights.lock(); + Ok(tbl.get(&idx).map(|id| vec![*id]).unwrap_or_default()) + } + + fn get_block_status(&self, id: L2BlockId) -> DbResult> { + let tbl = self.statuses.lock(); + Ok(tbl.get(&id).cloned()) + } +} diff --git a/crates/db/src/stubs/mod.rs b/crates/db/src/stubs/mod.rs new file mode 100644 index 000000000..a9f32026d --- /dev/null +++ b/crates/db/src/stubs/mod.rs @@ -0,0 +1 @@ +pub mod l2; diff --git a/crates/db/src/sync_event/db.rs b/crates/db/src/sync_event/db.rs index 4ffa1810e..a0e357e23 100644 --- a/crates/db/src/sync_event/db.rs +++ b/crates/db/src/sync_event/db.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use rockbound::{SchemaBatch, DB}; use alpen_vertex_state::sync_event::SyncEvent; @@ -7,14 +9,14 @@ use crate::errors::{DbError, DbResult}; use crate::traits::SyncEventProvider; use crate::traits::SyncEventStore; -pub struct SyncEventDB { - db: DB, +pub struct SyncEventDb { + db: Arc, } -impl SyncEventDB { +impl SyncEventDb { // NOTE: db is expected to open all the column families defined in STORE_COLUMN_FAMILIES. // FIXME: Make it better/generic. - pub fn new(db: DB) -> Self { + pub fn new(db: Arc) -> Self { Self { db } } @@ -31,7 +33,7 @@ impl SyncEventDB { } } -impl SyncEventStore for SyncEventDB { +impl SyncEventStore for SyncEventDb { fn write_sync_event(&self, ev: SyncEvent) -> DbResult { let last_id = self.get_last_key()?.unwrap_or(0); let id = last_id + 1; @@ -79,7 +81,7 @@ impl SyncEventStore for SyncEventDB { } } -impl SyncEventProvider for SyncEventDB { +impl SyncEventProvider for SyncEventDb { fn get_last_idx(&self) -> DbResult> { self.get_last_key() } @@ -121,7 +123,7 @@ mod tests { T::arbitrary(&mut u).expect("failed to generate arbitrary instance") } - fn get_new_db(path: &Path) -> anyhow::Result { + fn get_new_db(path: &Path) -> anyhow::Result> { // TODO: add other options as appropriate. let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); @@ -135,15 +137,16 @@ mod tests { .collect::>(), &db_opts, ) + .map(Arc::new) } - fn setup_db() -> SyncEventDB { + fn setup_db() -> SyncEventDb { let temp_dir = TempDir::new().expect("failed to create temp dir"); let db = get_new_db(&temp_dir.into_path()).unwrap(); - SyncEventDB::new(db) + SyncEventDb::new(db) } - fn insert_event(db: &SyncEventDB) -> SyncEvent { + fn insert_event(db: &SyncEventDb) -> SyncEvent { let ev: SyncEvent = generate_arbitrary(); let res = db.write_sync_event(ev.clone()); assert!(res.is_ok()); diff --git a/crates/db/src/traits.rs b/crates/db/src/traits.rs index b7d7ff944..c6ee93a3f 100644 --- a/crates/db/src/traits.rs +++ b/crates/db/src/traits.rs @@ -1,14 +1,17 @@ //! Trait definitions for low level database interfaces. This borrows some of //! its naming conventions from reth. +use std::sync::Arc; + use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; use alpen_vertex_mmr::CompactMmr; use alpen_vertex_primitives::{l1::*, prelude::*}; use alpen_vertex_state::block::{L2Block, L2BlockId}; -use alpen_vertex_state::consensus::{ConsensusState, ConsensusWrite}; -use alpen_vertex_state::sync_event::{SyncAction, SyncEvent}; +use alpen_vertex_state::consensus::ConsensusState; +use alpen_vertex_state::operation::*; +use alpen_vertex_state::sync_event::SyncEvent; use crate::errors::*; @@ -25,7 +28,14 @@ pub trait Database { type CsStore: ConsensusStateStore; type CsProv: ConsensusStateProvider; - // TODO accessors as needed + fn l1_store(&self) -> &Arc; + fn l1_provider(&self) -> &Arc; + fn l2_store(&self) -> &Arc; + fn l2_provider(&self) -> &Arc; + fn sync_event_store(&self) -> &Arc; + fn sync_event_provider(&self) -> &Arc; + fn consensus_state_store(&self) -> &Arc; + fn consensus_state_provider(&self) -> &Arc; } /// Storage interface to control our view of L1 data. @@ -161,36 +171,25 @@ pub trait ConsensusStateProvider { /// saved checkpoint, which may be the same as the given idx (if we didn't /// receive any sync events since the last checkpoint. fn get_prev_checkpoint_at(&self, idx: u64) -> DbResult; -} - -/// Output of a consensus state transition. Both the consensus state writes and -/// sync actions. -#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] -pub struct ConsensusOutput { - writes: Vec, - actions: Vec, -} - -impl ConsensusOutput { - pub fn writes(&self) -> &Vec { - &self.writes - } - pub fn actions(&self) -> &Vec { - &self.actions - } + /// Gets a state checkpoint at a previously written index, if it exists. + fn get_state_checkpoint(&self, idx: u64) -> DbResult>; } /// L2 data store for CL blocks. Does not store anything about what we think /// the L2 chain tip is, that's controlled by the consensus state. pub trait L2DataStore { - /// Stores an L2 block, does not care about the block height of the L2 block. + /// Stores an L2 bloc=k, does not care about the block height of the L2 + /// block. Also sets the block's status to "unchecked". fn put_block_data(&self, block: L2Block) -> DbResult<()>; /// Tries to delete an L2 block from the store, returning if it really /// existed or not. This should only be used for blocks well before some /// buried L1 finalization horizon. fn del_block_data(&self, id: L2BlockId) -> DbResult; + + /// Sets the block's validity status. + fn set_block_status(&self, id: L2BlockId, status: BlockStatus) -> DbResult<()>; } /// Data provider for L2 blocks. @@ -202,4 +201,21 @@ pub trait L2DataProvider { /// than one on competing forks. // TODO do we even want to permit this as being a possible thing? fn get_blocks_at_height(&self, idx: u64) -> DbResult>; + + /// Gets the validity status of a block. + fn get_block_status(&self, id: L2BlockId) -> DbResult>; +} + +/// Gets the status of a block. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +pub enum BlockStatus { + /// Block's validity hasn't been checked yet. + Unchecked, + + /// Block is valid, although this doesn't mean it's in the canonical chain. + Valid, + + /// Block is invalid, for no particular reason. We'd have to look somewhere + /// else for that. + Invalid, } diff --git a/crates/evmctl/src/engine.rs b/crates/evmctl/src/engine.rs index b06dabcd8..57e57c1ef 100644 --- a/crates/evmctl/src/engine.rs +++ b/crates/evmctl/src/engine.rs @@ -9,6 +9,7 @@ // inconsistent with the remote state. use alpen_vertex_primitives::buf::Buf32; +use alpen_vertex_state::block::L2BlockId; use crate::errors::*; use crate::messages::*; @@ -17,13 +18,11 @@ use crate::messages::*; /// Vertex semantics which will be produced inside the EL impl according to /// whatever semantics it has. pub trait ExecEngineCtl { - /// Updates the EL payload chain tip that we should be trying to execute to - /// determine validity. - fn update_head_block(&self, id: Buf32) -> EngineResult; - - /// Updates the block that we've considered full buried. This means it's - /// been proven on-chain sufficiently that it will never be rolled back. - fn update_finalized_block(&self, id: Buf32) -> EngineResult<()>; + /// Execute a block payload to determine its validity and if it extends the + /// current chain tip. + /// + /// Corresponds to `engine_newPayloadVX`. + fn submit_payload(&self, payload: ExecPayloadData) -> EngineResult; /// Tries to prepare a payload using the current state of the chain, /// returning an ID to query pending payload build jobs. If this completes @@ -34,8 +33,17 @@ pub trait ExecEngineCtl { /// Tries to get a payload that we were working on. fn get_payload_status(&self, id: u64) -> EngineResult; - // TODO more stuff to ensure that the EL actually gets the payloads and the - // CL context it needs to execute the blocks + /// Updates the (L2) block that we treat as the chain tip and build new + /// blocks on. + fn update_head_block(&self, id: L2BlockId) -> EngineResult<()>; + + /// Updates the (L2) block that we treat as the safe chain tip that we + /// respond to RPCs with. + fn update_safe_block(&self, id: L2BlockId) -> EngineResult<()>; + + /// Updates the (L2) block that we treat as being deeply buried and won't + /// reorg. + fn update_finalized_block(&self, id: L2BlockId) -> EngineResult<()>; } /// The status of a block that we've just set fork choice fork. diff --git a/crates/evmctl/src/errors.rs b/crates/evmctl/src/errors.rs index 35d809a99..d2ff041f7 100644 --- a/crates/evmctl/src/errors.rs +++ b/crates/evmctl/src/errors.rs @@ -4,6 +4,9 @@ pub type EngineResult = Result; #[derive(Debug, Error)] pub enum EngineError { + #[error("unknown payload ID {0}")] + UnknownPayloadId(u64), + #[error("not yet implemented")] Unimplemented, diff --git a/crates/evmctl/src/lib.rs b/crates/evmctl/src/lib.rs index 0e371bab1..722f9be43 100644 --- a/crates/evmctl/src/lib.rs +++ b/crates/evmctl/src/lib.rs @@ -4,5 +4,6 @@ pub mod engine; pub mod messages; +pub mod stub; pub mod errors; diff --git a/crates/evmctl/src/messages.rs b/crates/evmctl/src/messages.rs index 7935aa492..6b31fbbbb 100644 --- a/crates/evmctl/src/messages.rs +++ b/crates/evmctl/src/messages.rs @@ -5,15 +5,24 @@ use alpen_vertex_primitives::prelude::*; // should we consolidate? #[derive(Clone, Debug)] pub struct ExecPayloadData { - /// Payload commitment, probably a hash of the EL block header. - payload_commitment: Buf32, + /// Encoded EL payload, minus any operations we push to it. + el_payload: Vec, - /// Payload state root that we can make commitments to and whatnot. - state_root: Buf32, + /// CL operations pushed into the EL, such as deposits from L1. This + /// corresponds to the "withdrawals" field in the `ExecutionPayloadVX` + /// type(s), but is seperated here because we control it ourselves. + ops: Vec, +} + +impl ExecPayloadData { + pub fn new(el_payload: Vec, ops: Vec) -> Self { + Self { el_payload, ops } + } - /// Withdrawals initiated from within EL to L1. This might be generalized - /// to permit more types of EL->L1 operations. - new_el_withdrawals: Vec, + /// Creates a new instance with some specific payload no ops. + pub fn new_simple(el_payload: Vec) -> Self { + Self::new(el_payload, Vec::new()) + } } /// L1 withdrawal data. @@ -34,7 +43,7 @@ pub struct PayloadEnv { timestamp: u64, /// State root of the previous CL block. - prev_state_root: Buf32, + prev_global_state_root: Buf32, /// Safe L1 block we're exposing into the EL that's not likely to reorg. safe_l1_block: Buf32, diff --git a/crates/evmctl/src/stub.rs b/crates/evmctl/src/stub.rs new file mode 100644 index 000000000..a7be5d67c --- /dev/null +++ b/crates/evmctl/src/stub.rs @@ -0,0 +1,89 @@ +//! Stub engine controller that we use for testing without having to plug in a +//! full EVM runtime. +//! +//! This just simulates producing a payload by waiting some amount before +//! returning `Ready` with dummy state. We might extend this slightly to make +//! it more believable. + +use std::collections::*; +use std::sync::Mutex; +use std::time; + +use alpen_vertex_primitives::prelude::*; +use alpen_vertex_state::block::*; + +use crate::engine::*; +use crate::errors::*; +use crate::messages::*; + +struct State { + next_idx: u64, + payload_jobs: HashMap, +} + +impl State { + fn new() -> Self { + Self { + next_idx: 1, + payload_jobs: HashMap::new(), + } + } +} + +pub struct StubController { + payload_prep_dur: time::Duration, + state: Mutex, +} + +impl StubController { + pub fn new(payload_prep_dur: time::Duration) -> Self { + Self { + payload_prep_dur, + state: Mutex::new(State::new()), + } + } +} + +impl ExecEngineCtl for StubController { + fn submit_payload(&self, _payload: ExecPayloadData) -> EngineResult { + Ok(BlockStatus::Valid) + } + + fn prepare_payload(&self, _env: PayloadEnv) -> EngineResult { + // TODO do something with the payloads to make the status more belivable + let mut state = self.state.lock().unwrap(); + let idx = state.next_idx; + state.next_idx += 1; + state.payload_jobs.insert(idx, time::Instant::now()); + Ok(idx) + } + + fn get_payload_status(&self, id: u64) -> EngineResult { + let state = self.state.lock().unwrap(); + let created_at = state + .payload_jobs + .get(&id) + .ok_or(EngineError::UnknownPayloadId(id))?; + + let now = time::Instant::now(); + if *created_at + self.payload_prep_dur > now { + Ok(PayloadStatus::Working) + } else { + // TODO make up a more plausible payload + let exec = ExecPayloadData::new_simple(Vec::new()); + Ok(PayloadStatus::Ready(exec)) + } + } + + fn update_head_block(&self, _id: L2BlockId) -> EngineResult<()> { + Ok(()) + } + + fn update_safe_block(&self, _id: L2BlockId) -> EngineResult<()> { + Ok(()) + } + + fn update_finalized_block(&self, _id: L2BlockId) -> EngineResult<()> { + Ok(()) + } +} diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 6febbe9ed..f2843e338 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -borsh = { workspace = true } arbitrary = { workspace = true } +borsh = { workspace = true } +hex = { workspace = true } reth-primitives = { workspace = true } diff --git a/crates/primitives/src/block_credential.rs b/crates/primitives/src/block_credential.rs new file mode 100644 index 000000000..2a8bb6b07 --- /dev/null +++ b/crates/primitives/src/block_credential.rs @@ -0,0 +1,13 @@ +//! Types relating to block credentials and signing. + +use crate::prelude::*; + +/// Rule we use to decide how to identify if a L2 block is correcty signed. +#[derive(Clone, Debug)] +pub enum CredRule { + /// Any block gets accepted, unconditionally. + Unchecked, + + /// Just sign every block with a static BIP340 schnorr pubkey. + SchnorrKey(Buf32), +} diff --git a/crates/primitives/src/buf.rs b/crates/primitives/src/buf.rs index dcdd1f592..d5beab8e7 100644 --- a/crates/primitives/src/buf.rs +++ b/crates/primitives/src/buf.rs @@ -1,3 +1,6 @@ +use std::fmt; +use std::str; + use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; use reth_primitives::alloy_primitives::FixedBytes; @@ -6,14 +9,58 @@ use reth_primitives::alloy_primitives::FixedBytes; #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Buf20(pub FixedBytes<20>); +impl Buf20 { + pub fn zero() -> Self { + Self([0; 20].into()) + } +} + +impl From<[u8; 20]> for Buf20 { + fn from(value: [u8; 20]) -> Self { + Self(FixedBytes::from(value)) + } +} + // 32-byte buf, useful for hashes and schnorr pubkeys -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Buf32(pub FixedBytes<32>); +impl Buf32 { + pub fn zero() -> Self { + Self([0; 32].into()) + } +} + +impl From<[u8; 32]> for Buf32 { + fn from(value: [u8; 32]) -> Self { + Self(FixedBytes::from(value)) + } +} + +impl fmt::Debug for Buf32 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut buf = [0; 64]; + hex::encode_to_slice(self.0, &mut buf).expect("buf: enc hex"); + f.write_str(unsafe { str::from_utf8_unchecked(&buf) }) + } +} + // 64-byte buf, useful for schnorr signatures #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Buf64(pub FixedBytes<64>); +impl Buf64 { + pub fn zero() -> Self { + Self([0; 64].into()) + } +} + +impl From<[u8; 64]> for Buf64 { + fn from(value: [u8; 64]) -> Self { + Self(FixedBytes::from(value)) + } +} + impl BorshSerialize for Buf32 { fn serialize(&self, writer: &mut W) -> std::io::Result<()> { let bytes = self.0.as_ref(); diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 546e821c0..d4e32b5c2 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -4,7 +4,9 @@ // TODO import address types // TODO import generic account types +pub mod block_credential; pub mod buf; pub mod l1; +pub mod params; pub mod prelude; diff --git a/crates/primitives/src/params.rs b/crates/primitives/src/params.rs new file mode 100644 index 000000000..919ee3a7b --- /dev/null +++ b/crates/primitives/src/params.rs @@ -0,0 +1,40 @@ +//! Global consensus parameters for the rollup. + +use crate::block_credential::CredRule; + +/// Consensus parameters that don't change for the lifetime of the network +/// (unless there's some weird hard fork). +#[derive(Clone, Debug)] +pub struct RollupParams { + /// Block time in milliseconds. + pub block_time: u64, + + /// Rule we use to decide if a block is correctly signed. + pub cred_rule: CredRule, +} + +/// Client sync parameters that are used to make the network work but don't +/// strictly have to be pre-agreed. These have to do with grace periods in +/// message delivery and whatnot. +#[derive(Clone, Debug)] +pub struct RunParams { + /// Number of blocks that we follow the L1 from. + pub l1_follow_distance: usize, +} + +/// Combined set of parameters across all the consensus logic. +#[derive(Clone, Debug)] +pub struct Params { + pub rollup: RollupParams, + pub run: RunParams, +} + +impl Params { + pub fn rollup(&self) -> &RollupParams { + &self.rollup + } + + pub fn run(&self) -> &RunParams { + &self.run + } +} diff --git a/crates/primitives/src/prelude.rs b/crates/primitives/src/prelude.rs index 638c1280a..f4c7c7791 100644 --- a/crates/primitives/src/prelude.rs +++ b/crates/primitives/src/prelude.rs @@ -1,2 +1,3 @@ -// Reexports Imports from elsewhere in the crate. +// Reexports from elsewhere in the crate. pub use crate::buf::*; +pub use crate::params::*; diff --git a/crates/rpc/api/Cargo.toml b/crates/rpc/api/Cargo.toml index fc76ddfe0..d417d9ee3 100644 --- a/crates/rpc/api/Cargo.toml +++ b/crates/rpc/api/Cargo.toml @@ -5,3 +5,4 @@ edition = "2021" [dependencies] jsonrpsee = { workspace = true, features = ["server", "macros"] } +serde = { workspace = true } \ No newline at end of file diff --git a/crates/rpc/api/src/lib.rs b/crates/rpc/api/src/lib.rs index 6b68c8be9..9bd3a5050 100644 --- a/crates/rpc/api/src/lib.rs +++ b/crates/rpc/api/src/lib.rs @@ -1,9 +1,22 @@ //! Macro trait def for the `alp_` RPC namespace using jsonrpsee. use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use serde::{Deserialize, Serialize}; -#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))] -#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))] +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct L1Status { + /// Current block height. + pub cur_height: u64, + + /// Current tip block ID as string. + pub cur_tip_blkid: String, + + /// UNIX millis time of the last time we got a new update from the L1 connector. + pub last_update: u64, +} + +#[cfg_attr(not(feature = "client"), rpc(server, namespace = "alp"))] +#[cfg_attr(feature = "client", rpc(server, client, namespace = "alp"))] pub trait AlpenApi { // TODO the rest of these #[method(name = "alp_protocolVersion")] @@ -12,4 +25,7 @@ pub trait AlpenApi { // TODO make this under the admin RPC interface #[method(name = "alp_stop")] async fn stop(&self) -> RpcResult<()>; + + #[method(name = "alp_l1status")] + async fn get_l1_status(&self) -> RpcResult; } diff --git a/crates/state/Cargo.toml b/crates/state/Cargo.toml index cd28a500f..9e1799007 100644 --- a/crates/state/Cargo.toml +++ b/crates/state/Cargo.toml @@ -8,5 +8,8 @@ edition = "2021" [dependencies] alpen-vertex-primitives = { workspace = true } + arbitrary = { workspace = true } borsh = { workspace = true } +digest = { workspace = true } +sha2 = { workspace = true } \ No newline at end of file diff --git a/crates/state/src/block.rs b/crates/state/src/block.rs index 818658473..58147043e 100644 --- a/crates/state/src/block.rs +++ b/crates/state/src/block.rs @@ -1,3 +1,5 @@ +use std::fmt::{self, Debug}; + use alpen_vertex_primitives::l1::L1Tx; use alpen_vertex_primitives::prelude::*; use arbitrary::Arbitrary; @@ -7,20 +9,22 @@ use crate::l1::L1HeaderPayload; /// ID of an L2 block, usually the hash of its root header. #[derive( - Copy, - Clone, - Debug, - Eq, - PartialEq, - Ord, - PartialOrd, - Hash, - Arbitrary, - BorshSerialize, - BorshDeserialize, + Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Arbitrary, BorshSerialize, BorshDeserialize, )] pub struct L2BlockId(Buf32); +impl From for L2BlockId { + fn from(value: Buf32) -> Self { + Self(value) + } +} + +impl fmt::Debug for L2BlockId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.0, f) + } +} + /// Full contents of the bare L2 block. #[derive(Clone, Debug)] pub struct L2Block { @@ -32,20 +36,38 @@ pub struct L2Block { body: L2BlockBody, } +impl L2Block { + pub fn header(&self) -> &L2BlockHeader { + &self.header + } + + pub fn l1_segment(&self) -> &L1Segment { + &self.body.l1_segment + } + + pub fn exec_segment(&self) -> &ExecSegment { + &self.body.exec_segment + } +} + /// Block header that forms the chain we use to reach consensus. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, BorshDeserialize, BorshSerialize)] pub struct L2BlockHeader { /// Block index, obviously. block_idx: u64, + /// Timestamp the block was (intended to be) published at. + timestamp: u64, + /// Hash of the previous block, to form the blockchain. prev_block: L2BlockId, - /// Hash of the L1 payload header. - l1_payload_hash: Buf32, + /// Hash of the L1 segment. + l1_segment_hash: Buf32, - /// Hash of the exec payload header. - exec_payload_hash: Buf32, + /// Hash of the exec segment. + // TODO ideally this is just the EL header hash, not the hash of the full payload + exec_segment_hash: Buf32, /// State root that commits to the overall state of the rollup, commits to /// both the CL state and EL state. @@ -56,17 +78,55 @@ pub struct L2BlockHeader { signature: Buf64, } +impl L2BlockHeader { + pub fn blockidx(&self) -> u64 { + self.block_idx + } + + pub fn timestamp(&self) -> u64 { + self.timestamp + } + + pub fn parent(&self) -> &L2BlockId { + &self.prev_block + } + + pub fn l1_payload_hash(&self) -> &Buf32 { + &self.l1_segment_hash + } + + pub fn exec_payload_hash(&self) -> &Buf32 { + &self.exec_segment_hash + } + + pub fn state_root(&self) -> &Buf32 { + &self.state_root + } + + pub fn sig(&self) -> &Buf64 { + &self.signature + } + + /// Computes the blockid with SHA256. + // TODO should this be poseidon? + pub fn get_blockid(&self) -> L2BlockId { + let buf = borsh::to_vec(self).expect("block: compute blkid"); + let h = ::digest(&buf); + L2BlockId::from(Buf32::from(<[u8; 32]>::from(h))) + } +} + /// Contains the additional payloads within the L2 block. #[derive(Clone, Debug)] pub struct L2BlockBody { - l1_payload: L1Payload, - exec_payload: ExecPayload, + l1_segment: L1Segment, + exec_segment: ExecSegment, } /// Container for additional messages that we've observed from the L1, if there /// are any. #[derive(Clone, Debug)] -pub struct L1Payload { +pub struct L1Segment { /// New headers that we've seen from L1 that we didn't see in the previous /// L2 block. new_l1_headers: Vec, @@ -75,18 +135,21 @@ pub struct L1Payload { deposits: Vec, } -/// Information relating to the EL payloads. +/// Information relating to the EL data. #[derive(Clone, Debug)] -pub struct ExecPayload { - /// Commitment to the payload. This might be the EVM EL block header or - /// maybe it's the full block. - payload_commitment: Buf32, +pub struct ExecSegment { + /// Header of the EL data. + el_payload: Vec, +} - /// State commitment of the EL state. - el_state_root: Buf32, +impl ExecSegment { + pub fn new(el_payload: Vec) -> Self { + Self { el_payload } + } - /// Withdrawals that were initiated from the EL payload. - new_el_withdraws: Vec, + pub fn payload(&self) -> &[u8] { + &self.el_payload + } } /// Data emitted by EL exec for a withdraw request. diff --git a/crates/state/src/consensus.rs b/crates/state/src/consensus.rs index ea57a3f05..033cf6065 100644 --- a/crates/state/src/consensus.rs +++ b/crates/state/src/consensus.rs @@ -3,6 +3,7 @@ //! implement the consensus logic. use std::collections::*; + use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; @@ -15,34 +16,47 @@ use crate::{block::L2BlockId, l1::L1BlockId}; /// This is updated when we see a consensus-relevant message. This is L2 blocks /// but also L1 blocks being published with interesting things in them, and /// various other events. -#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] +#[derive(Clone, Debug, Eq, PartialEq, Arbitrary, BorshSerialize, BorshDeserialize)] pub struct ConsensusState { /// Important consensus state. - chain_state: ConsensusChainState, + pub(super) chain_state: ConsensusChainState, + + /// L2 block that's been finalized and proven on L1. + pub(super) finalized_tip: L2BlockId, /// Recent L1 blocks that we might still reorg. - recent_l1_blocks: Vec, + // TODO replace with a tracker that we can reorg + pub(super) recent_l1_blocks: Vec, + + /// L1 block index we treat as being "buried" and won't reorg. + pub(super) buried_l1_height: u64, /// Blocks we've received that appear to be on the chain tip but have not /// fully executed yet. - pending_l2_blocks: VecDeque, + pub(super) pending_l2_blocks: VecDeque, +} + +impl ConsensusState { + pub fn chain_state(&self) -> &ConsensusChainState { + &self.chain_state + } } /// L2 blockchain consensus state. /// /// This is updated when we get a new L2 block and is kept more precisely /// synchronized across all nodes. -#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] +#[derive(Clone, Debug, Eq, PartialEq, Arbitrary, BorshSerialize, BorshDeserialize)] pub struct ConsensusChainState { /// Accepted and valid L2 blocks that we might still reorg. The last of /// these is the chain tip. - accepted_l2_blocks: Vec, + pub(super) accepted_l2_blocks: Vec, /// Pending deposits that have been acknowledged in an EL block. - pending_deposits: Vec, + pub(super) pending_deposits: Vec, /// Pending withdrawals that have been initiated but haven't been sent out. - pending_withdraws: Vec, + pub(super) pending_withdraws: Vec, } impl ConsensusChainState { @@ -55,7 +69,7 @@ impl ConsensusChainState { } /// Transfer from L1 into L2. -#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] +#[derive(Clone, Debug, Eq, PartialEq, Arbitrary, BorshDeserialize, BorshSerialize)] pub struct PendingDeposit { /// Deposit index. idx: u64, @@ -68,7 +82,7 @@ pub struct PendingDeposit { } /// Transfer from L2 back to L1. -#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] +#[derive(Clone, Debug, Eq, PartialEq, Arbitrary, BorshDeserialize, BorshSerialize)] pub struct PendingWithdraw { /// Withdraw index. idx: u64, @@ -79,42 +93,3 @@ pub struct PendingWithdraw { /// Schnorr pubkey for the taproot output we're going to generate. dest: Buf64, } - -/// Describes possible writes to chain state that we can make. We use this -/// instead of directly modifying the chain state to reduce the volume of data -/// that we have to clone and save to disk with each sync event. -#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] -pub enum ConsensusWrite { - /// Completely replace the full state with a new instance. - Replace(Box), - - /// Replace just the L2 blockchain consensus-layer state with a new - /// instance. - ReplaceChainState(Box), - - /// Queue an L2 block for verification. - QueueL2Block(L2BlockId), - // TODO -} - -/// Applies consensus writes to an existing consensus state instance. -// FIXME should this be moved to the consensus-logic crate? -fn compute_new_state( - mut state: ConsensusState, - writes: impl Iterator, -) -> ConsensusState { - apply_writes_to_state(&mut state, writes); - state -} - -fn apply_writes_to_state(state: &mut ConsensusState, writes: impl Iterator) { - for w in writes { - use ConsensusWrite::*; - match w { - Replace(cs) => *state = *cs, - ReplaceChainState(ccs) => state.chain_state = *ccs, - QueueL2Block(blkid) => state.pending_l2_blocks.push_back(blkid), - // TODO - } - } -} diff --git a/crates/state/src/l1.rs b/crates/state/src/l1.rs index fa659bdb8..68c8a5be0 100644 --- a/crates/state/src/l1.rs +++ b/crates/state/src/l1.rs @@ -6,6 +6,12 @@ use borsh::{BorshDeserialize, BorshSerialize}; #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, BorshSerialize, BorshDeserialize, Arbitrary)] pub struct L1BlockId(Buf32); +impl From for L1BlockId { + fn from(value: Buf32) -> Self { + Self(value) + } +} + /// Represents a serialized L1 header. #[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub struct L1HeaderPayload { diff --git a/crates/state/src/lib.rs b/crates/state/src/lib.rs index 773fdad8c..fbb7c5e2d 100644 --- a/crates/state/src/lib.rs +++ b/crates/state/src/lib.rs @@ -6,4 +6,5 @@ pub mod block; pub mod consensus; pub mod l1; +pub mod operation; pub mod sync_event; diff --git a/crates/state/src/operation.rs b/crates/state/src/operation.rs new file mode 100644 index 000000000..8e1325d31 --- /dev/null +++ b/crates/state/src/operation.rs @@ -0,0 +1,127 @@ +//! Operations that a state transition emits to update the new state and control +//! the client's high level state. + +use arbitrary::Arbitrary; +use borsh::{BorshDeserialize, BorshSerialize}; + +use crate::block::L2BlockId; +use crate::consensus::{ConsensusChainState, ConsensusState}; +use crate::l1::L1BlockId; + +/// Output of a consensus state transition. Both the consensus state writes and +/// sync actions. +#[derive(Clone, Debug, Eq, PartialEq, Arbitrary, BorshDeserialize, BorshSerialize)] +pub struct ConsensusOutput { + writes: Vec, + actions: Vec, +} + +impl ConsensusOutput { + pub fn new(writes: Vec, actions: Vec) -> Self { + Self { writes, actions } + } + + pub fn into_parts(self) -> (Vec, Vec) { + (self.writes, self.actions) + } + + pub fn writes(&self) -> &[ConsensusWrite] { + &self.writes + } + + pub fn actions(&self) -> &[SyncAction] { + &self.actions + } + + // TODO accessors as needed +} + +/// Describes possible writes to chain state that we can make. We use this +/// instead of directly modifying the chain state to reduce the volume of data +/// that we have to clone and save to disk with each sync event. +#[derive(Clone, Debug, Eq, PartialEq, Arbitrary, BorshDeserialize, BorshSerialize)] +pub enum ConsensusWrite { + /// Completely replace the full state with a new instance. + Replace(Box), + + /// Replace just the L2 blockchain consensus-layer state with a new + /// instance. + ReplaceChainState(Box), + + /// Queue an L2 block to be accepted. + AcceptL2Block(L2BlockId), + + /// Rolls back L1 blocks to this block ID. + RollbackL1BlocksTo(L1BlockId), + + /// Insert L1 blocks into the pending queue. + AcceptL1Block(L1BlockId), + + /// Updates the buried block index to a higher index. + UpdateBuried(u64), +} + +/// Actions the consensus state machine directs the node to take to update its +/// own bookkeeping. These should not be able to fail. +#[derive(Clone, Debug, Eq, PartialEq, Arbitrary, BorshDeserialize, BorshSerialize)] +pub enum SyncAction { + /// Extends our externally-facing tip to a new block ID. This might trigger + /// a reorg of some unfinalized blocks. We probably won't roll this block + /// back but we haven't seen it proven on-chain yet. This is also where + /// we'd build a new block if it's our turn to. + UpdateTip(L2BlockId), + + /// Marks an L2 blockid as invalid and we won't follow any chain that has + /// it, and will reject it from our peers. + // TODO possibly we should have some way of marking a block invalid through + // preliminary checks before writing a sync event we then have to check, + // this should be investigated more + MarkInvalid(L2BlockId), + + /// Finalizes a block, indicating that it won't be reverted. + FinalizeBlock(L2BlockId), +} + +/// Applies consensus writes to the provided consensus state. +pub fn apply_writes_to_state( + state: &mut ConsensusState, + writes: impl Iterator, +) { + for w in writes { + use ConsensusWrite::*; + match w { + Replace(cs) => *state = *cs, + ReplaceChainState(ccs) => state.chain_state = *ccs, + RollbackL1BlocksTo(l1blkid) => { + let pos = state.recent_l1_blocks.iter().position(|b| *b == l1blkid); + let Some(pos) = pos else { + // TODO better logging, maybe make this an actual error + panic!("operation: emitted invalid write"); + }; + state.recent_l1_blocks.truncate(pos); + } + AcceptL1Block(l1blkid) => state.recent_l1_blocks.push(l1blkid), + AcceptL2Block(blkid) => state.pending_l2_blocks.push_back(blkid), + UpdateBuried(new_idx) => { + // Check that it's increasing. + let old_idx = state.buried_l1_height; + if old_idx >= new_idx { + panic!("operation: emitted non-greater buried height"); + } + + // Check that it's not higher than what we know about. + let diff = (new_idx - old_idx) as usize; + if diff > state.recent_l1_blocks.len() { + panic!("operation: new buried height above known L1 tip"); + } + + // If everything checks out we can just remove them. + let blocks = state.recent_l1_blocks.drain(..diff).collect::>(); + state.buried_l1_height = new_idx; + + // TODO merge these blocks into the L1 MMR in the chain state if + // we haven't already + } + } + } +} diff --git a/crates/state/src/sync_event.rs b/crates/state/src/sync_event.rs index e1a1c0391..4019888dc 100644 --- a/crates/state/src/sync_event.rs +++ b/crates/state/src/sync_event.rs @@ -1,33 +1,19 @@ use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; -use crate::block::L2BlockId; +use crate::{block::L2BlockId, l1::L1BlockId}; /// Sync event that updates our consensus state. #[derive(Clone, Debug, PartialEq, Eq, Arbitrary, BorshSerialize, BorshDeserialize)] pub enum SyncEvent { - /// A new L2 block was posted to L1. - L1BlockPosted(Vec), + /// We've observed a valid L1 block. + L1Block(u64, L1BlockId), - /// Received a new L2 block from somewhere, maybe the p2p network, maybe we - /// just made it. - L2BlockRecv(L2BlockId), + /// New L2 blocks were posted to L1 in a DA batch. + L1DABatch(Vec), - /// Finished executing an L2 block with a status. - L2BlockExec(L2BlockId, bool), -} - -/// Actions the consensus state machine directs the node to take to update its -/// own bookkeeping. These should not be able to fail. -#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] -pub enum SyncAction { - /// Directs the EL engine to try to check a block ID. - TryCheckBlock(L2BlockId), - - /// Extends our externally-facing tip to a new block ID. - ExtendTip(L2BlockId), - - /// Reverts out externally-facing tip to a new block ID, directing the EL - /// engine to roll back changes. - RevertTip(L2BlockId), + /// Chain tip tracker found a new valid chain tip block. At this point + /// we've already asked the EL to check if it's valid and know we *could* + /// accept it. + NewTipBlock(L2BlockId), } diff --git a/crates/state/src/transition.rs b/crates/state/src/transition.rs new file mode 100644 index 000000000..e69de29bb diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 37be77d17..781920554 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -9,15 +9,24 @@ path = "src/main.rs" [dependencies] alpen-vertex-common = { workspace = true } +alpen-vertex-consensus-logic = { workspace = true } +alpen-vertex-evmctl = { workspace = true } +alpen-vertex-db = { workspace = true } alpen-vertex-primitives = { workspace = true } alpen-vertex-rpc-api = { workspace = true } +alpen-vertex-state = { workspace = true } +anyhow = { workspace = true } +argh = { workspace = true } async-trait = { workspace = true } jsonrpsee = { workspace = true, features = ["server", "macros"] } +parking_lot = { workspace = true } reth-ipc = { workspace = true } reth-primitives = { workspace = true } reth-rpc-api = { workspace = true } reth-rpc-types = { workspace = true } +rockbound = { workspace = true } +rocksdb = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/sequencer/src/args.rs b/sequencer/src/args.rs new file mode 100644 index 000000000..a31750552 --- /dev/null +++ b/sequencer/src/args.rs @@ -0,0 +1,20 @@ +use std::path::PathBuf; + +use argh::FromArgs; + +#[derive(FromArgs)] +#[argh(description = "Alpen Vertex sequencer")] +pub struct Args { + #[argh( + option, + short = 'd', + description = "datadir path that will contain databases" + )] + pub datadir: PathBuf, + + #[argh(option, short = 'B', description = "bitcoind connection url")] + pub bitcoind: String, + + #[argh(option, short = 'r', description = "JSON-RPC port")] + pub rpc_port: u16, +} diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 03f598f6f..47b369538 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -1,50 +1,134 @@ +use std::io; use std::process; +use std::sync::Arc; +use std::thread; +use std::time; -use reth_rpc_api::EthApiServer; +use alpen_vertex_consensus_logic::chain_tip; +use alpen_vertex_consensus_logic::unfinalized_tracker; +use anyhow::Context; use thiserror::Error; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot, watch}; use tracing::*; use alpen_vertex_common::logging; +use alpen_vertex_consensus_logic::ctl::CsmController; +use alpen_vertex_consensus_logic::message::{ChainTipMessage, CsmMessage}; +use alpen_vertex_consensus_logic::worker; +use alpen_vertex_db::traits::*; +use alpen_vertex_primitives::{block_credential, params::*}; use alpen_vertex_rpc_api::AlpenApiServer; +use alpen_vertex_state::consensus::ConsensusState; +use alpen_vertex_state::operation; +use crate::args::Args; + +mod args; mod rpc_server; #[derive(Debug, Error)] pub enum InitError { + #[error("io: {0}")] + Io(#[from] io::Error), + #[error("{0}")] Other(String), } fn main() { + let args: Args = argh::from_env(); + if let Err(e) = main_inner(args) { + eprintln!("FATAL ERROR: {e}"); + } +} + +fn main_inner(args: Args) -> anyhow::Result<()> { logging::init(); + // Open the database. + let rbdb = open_rocksdb_database(&args)?; + + // Set up block params. + let params = Params { + rollup: RollupParams { + block_time: 1000, + cred_rule: block_credential::CredRule::Unchecked, + }, + run: RunParams { + l1_follow_distance: 6, + }, + }; + let params = Arc::new(params); + + // Initialize databases. + let l1_db = Arc::new(alpen_vertex_db::L1Db::new(rbdb.clone())); + let l2_db = Arc::new(alpen_vertex_db::stubs::l2::StubL2Db::new()); // FIXME stub + let sync_ev_db = Arc::new(alpen_vertex_db::SyncEventDb::new(rbdb.clone())); + let cs_db = Arc::new(alpen_vertex_db::ConsensusStateDb::new(rbdb.clone())); + let database = Arc::new(alpen_vertex_db::database::CommonDatabase::new( + l1_db, l2_db, sync_ev_db, cs_db, + )); + + // Init the consensus worker state and get the current state from it. + let cw_state = worker::WorkerState::open(params.clone(), database.clone())?; + let cur_state = cw_state.cur_state().clone(); + let cur_chain_tip = cur_state.chain_state().chain_tip_blockid(); + + // Init the chain tracker from the state we figured out. + let chain_tracker = unfinalized_tracker::UnfinalizedBlockTracker::new_empty(cur_chain_tip); + let ct_state = chain_tip::ChainTipTrackerState::new( + params, + database.clone(), + cur_state, + chain_tracker, + cur_chain_tip, + ); + // TODO load unfinalized blocks into block tracker + + // Create dataflow channels. + let (ctm_tx, ctm_rx) = mpsc::channel::(64); + let (csm_tx, csm_rx) = mpsc::channel::(64); + let csm_ctl = Arc::new(CsmController::new(database.clone(), csm_tx)); + let (cout_tx, cout_rx) = mpsc::channel::(64); + let (cur_state_tx, cur_state_rx) = watch::channel::>(None); + // TODO connect up these other channels + + // Init engine controller. + let eng_ctl = alpen_vertex_evmctl::stub::StubController::new(time::Duration::from_millis(100)); + let eng_ctl = Arc::new(eng_ctl); + let eng_ctl_cw = eng_ctl.clone(); + let eng_ctl_ct = eng_ctl.clone(); + + // Start worker threads. + // TODO set up watchdog for these things + let cw_handle = thread::spawn(|| worker::consensus_worker_task(cw_state, eng_ctl_cw, csm_rx)); + let ct_handle = + thread::spawn(|| chain_tip::tracker_task(ct_state, eng_ctl_ct, ctm_rx, csm_ctl)); + + // Start runtime for async IO tasks. let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .thread_name("vertex") .build() .expect("init: build rt"); - if let Err(e) = rt.block_on(main_task()) { + if let Err(e) = rt.block_on(main_task(args)) { error!(err = %e, "main task exited"); - process::exit(0); + process::exit(0); // special case exit once we've gotten to this point } info!("exiting"); + Ok(()) } -async fn main_task() -> Result<(), InitError> { +async fn main_task(args: Args) -> anyhow::Result<()> { let (stop_tx, stop_rx) = oneshot::channel(); // Init RPC methods. let alp_rpc = rpc_server::AlpenRpcImpl::new(stop_tx); - let eth_rpc = rpc_server::EthRpcImpl::new(); - let mut methods = alp_rpc.into_rpc(); - methods - .merge(eth_rpc.into_rpc()) - .expect("init: add eth methods"); + let methods = alp_rpc.into_rpc(); - let rpc_port = 12345; // TODO make configurable + let rpc_port = args.rpc_port; // TODO make configurable let rpc_server = jsonrpsee::server::ServerBuilder::new() .build(format!("127.0.0.1:{rpc_port}")) .await @@ -63,3 +147,22 @@ async fn main_task() -> Result<(), InitError> { Ok(()) } + +fn open_rocksdb_database(args: &Args) -> anyhow::Result> { + let mut database_dir = args.datadir.clone(); + database_dir.push("rocksdb"); + + let dbname = alpen_vertex_db::ROCKSDB_NAME; + let cfs = alpen_vertex_db::STORE_COLUMN_FAMILIES; + let opts = rocksdb::Options::default(); + + let rbdb = rockbound::DB::open( + &database_dir, + dbname, + cfs.iter().map(|s| s.to_string()), + &opts, + ) + .context("opening database")?; + + Ok(Arc::new(rbdb)) +} diff --git a/sequencer/src/rpc_server.rs b/sequencer/src/rpc_server.rs index c5c9e9231..60e973557 100644 --- a/sequencer/src/rpc_server.rs +++ b/sequencer/src/rpc_server.rs @@ -16,10 +16,10 @@ use reth_rpc_types::{ TransactionRequest, Work, }; use thiserror::Error; +use tokio::sync::{oneshot, Mutex}; use tracing::*; -use alpen_vertex_rpc_api::AlpenApiServer; -use tokio::sync::{oneshot, Mutex}; +use alpen_vertex_rpc_api::{AlpenApiServer, L1Status}; #[derive(Debug, Error)] pub enum Error { @@ -91,285 +91,14 @@ impl AlpenApiServer for AlpenRpcImpl { } Ok(()) } -} - -/// Impl for the eth_ JSON-RPC interface. -/// -/// See: https://github.com/paradigmxyz/reth/blob/main/crates/rpc/rpc-api/src/eth.rs -pub struct EthRpcImpl { - // TODO -} - -impl EthRpcImpl { - pub fn new() -> Self { - Self {} - } -} - -#[async_trait] -impl EthApiServer for EthRpcImpl { - async fn protocol_version(&self) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - fn syncing(&self) -> RpcResult { - Ok(SyncStatus::None) - } - - async fn author(&self) -> RpcResult
{ - Err(Error::Unsupported.into()) - } - - fn accounts(&self) -> RpcResult> { - Err(Error::Unsupported.into()) - } - - fn block_number(&self) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn chain_id(&self) -> RpcResult> { - // TODO change this - Ok(Some(U64::from(2016))) - } - - async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn block_by_number( - &self, - number: BlockNumberOrTag, - full: bool, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn block_transaction_count_by_hash(&self, hash: B256) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn block_transaction_count_by_number( - &self, - number: BlockNumberOrTag, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn block_uncles_count_by_hash(&self, hash: B256) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn block_uncles_count_by_number( - &self, - number: BlockNumberOrTag, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn block_receipts( - &self, - block_id: BlockId, - ) -> RpcResult>> { - Err(Error::Unimplemented.into()) - } - - async fn uncle_by_block_hash_and_index( - &self, - hash: B256, - index: Index, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn uncle_by_block_number_and_index( - &self, - number: BlockNumberOrTag, - index: Index, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn raw_transaction_by_hash(&self, hash: B256) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn transaction_by_hash(&self, hash: B256) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn raw_transaction_by_block_hash_and_index( - &self, - hash: B256, - index: Index, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn transaction_by_block_hash_and_index( - &self, - hash: B256, - index: Index, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn raw_transaction_by_block_number_and_index( - &self, - number: BlockNumberOrTag, - index: Index, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn transaction_by_block_number_and_index( - &self, - number: BlockNumberOrTag, - index: Index, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn transaction_receipt(&self, hash: B256) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn balance(&self, address: Address, block_number: Option) -> RpcResult { - Err(Error::Unsupported.into()) - } - - async fn storage_at( - &self, - address: Address, - index: JsonStorageKey, - block_number: Option, - ) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn transaction_count( - &self, - address: Address, - block_number: Option, - ) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn get_code(&self, address: Address, block_number: Option) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn header_by_number(&self, hash: BlockNumberOrTag) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn header_by_hash(&self, hash: B256) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn call( - &self, - request: TransactionRequest, - block_number: Option, - state_overrides: Option, - block_overrides: Option>, - ) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn call_many( - &self, - bundle: Bundle, - state_context: Option, - state_override: Option, - ) -> RpcResult> { - Err(Error::Unimplemented.into()) - } - - async fn create_access_list( - &self, - request: TransactionRequest, - block_number: Option, - ) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn estimate_gas( - &self, - request: TransactionRequest, - block_number: Option, - state_override: Option, - ) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn gas_price(&self) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn max_priority_fee_per_gas(&self) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn blob_base_fee(&self) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn fee_history( - &self, - block_count: U64HexOrNumber, - newest_block: BlockNumberOrTag, - reward_percentiles: Option>, - ) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn is_mining(&self) -> RpcResult { - Ok(false) - } - - async fn hashrate(&self) -> RpcResult { - Ok(U256::from(0)) - } - - async fn get_work(&self) -> RpcResult { - Err(Error::Unsupported.into()) - } - - async fn submit_hashrate(&self, hashrate: U256, id: B256) -> RpcResult { - Err(Error::Unsupported.into()) - } - - async fn submit_work(&self, nonce: B64, pow_hash: B256, mix_digest: B256) -> RpcResult { - Err(Error::Unsupported.into()) - } - - async fn send_transaction(&self, request: TransactionRequest) -> RpcResult { - Err(Error::Unsupported.into()) - } - - async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult { - Err(Error::Unimplemented.into()) - } - - async fn sign(&self, address: Address, message: Bytes) -> RpcResult { - Err(Error::Unsupported.into()) - } - - async fn sign_transaction(&self, transaction: TransactionRequest) -> RpcResult { - Err(Error::Unsupported.into()) - } - - async fn sign_typed_data(&self, address: Address, data: serde_json::Value) -> RpcResult { - Err(Error::Unsupported.into()) - } - async fn get_proof( - &self, - address: Address, - keys: Vec, - block_number: Option, - ) -> RpcResult { - Err(Error::Unimplemented.into()) + async fn get_l1_status(&self) -> RpcResult { + // TODO implement this + warn!("alp_l1status not yet implemented"); + Ok(L1Status { + cur_height: 0, + cur_tip_blkid: String::new(), + last_update: 0, + }) } } diff --git a/test/entry.py b/test/entry.py index bb5c03dd0..80bf4ab9a 100755 --- a/test/entry.py +++ b/test/entry.py @@ -3,6 +3,7 @@ import os import sys +from bitcoinlib.services.bitcoind import BitcoindClient import flexitest class BitcoinFactory(flexitest.Factory): @@ -10,8 +11,31 @@ def __init__(self, datadir_pfx: str, port_range: list[int]): super().__init__(datadir_pfx, port_range) def create_regtest_bitcoin(self) -> flexitest.Service: - # TODO - raise NotImplementedError() + datadir = self.create_datadir("bitcoin") + p2p_port = self.next_port() + rpc_port = self.next_port() + logfile = os.path.join(datadir, "service.log") + + cmd = [ + "bitcoind", "-regtest", + "-printtoconsole", + "-datadir=%s" % datadir, + "-port=%s" % p2p_port, + "-rpcport=%s" % rpc_port, + "-rpcuser=alpen", + "-rpcpassword=alpen", + ] + + base_url = "http://alpen:alpen@localhost:%s" % rpc_port + + with open(logfile_path, "w") as f: + svc = flexitest.service.ProcService(props, cmd, stdout=f) + + def _create_rpc(): + return BitcoindClient(base_url) + setattr(svc, "create_rpc", _create_rpc) + + return svc class VertexFactory(flexitest.Factory): def __init__(self, datadir_pfx: str, port_range: list[int]): diff --git a/test/poetry.lock b/test/poetry.lock index 6da40cfe9..ec326f224 100644 --- a/test/poetry.lock +++ b/test/poetry.lock @@ -1,3 +1,61 @@ +[[package]] +name = "bitcoinlib" +version = "0.6.15" +description = "Bitcoin cryptocurrency Library" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +ecdsa = ">=0.17" +fastecdsa = {version = ">=2.2.1", markers = "platform_system != \"Windows\""} +numpy = {version = ">=1.22.0", markers = "python_version >= \"3.9\""} +pycryptodome = ">=3.14.1" +requests = ">=2.25.0" +SQLAlchemy = ">=2.0.0" + +[package.extras] +dev = ["sphinx (>=6.0.0)", "coveralls (>=3.0.1)", "psycopg2 (>=2.9.2)", "mysql-connector-python (>=8.0.27)", "mysqlclient (>=2.1.0)", "parameterized (>=0.8.1)", "sphinx-rtd-theme (>=1.0.0)", "Cython (>=3.0.0)", "scrypt (>=0.8.18)", "win-unicode-console"] + +[[package]] +name = "certifi" +version = "2024.6.2" +description = "Python package for providing Mozilla's CA Bundle." +category = "main" +optional = false +python-versions = ">=3.6" + +[[package]] +name = "charset-normalizer" +version = "3.3.2" +description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." +category = "main" +optional = false +python-versions = ">=3.7.0" + +[[package]] +name = "ecdsa" +version = "0.19.0" +description = "ECDSA cryptographic signature library (pure python)" +category = "main" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.6" + +[package.dependencies] +six = ">=1.9.0" + +[package.extras] +gmpy = ["gmpy"] +gmpy2 = ["gmpy2"] + +[[package]] +name = "fastecdsa" +version = "2.3.2" +description = "Fast elliptic curve digital signatures" +category = "main" +optional = false +python-versions = ">=3.7" + [[package]] name = "flexitest" version = "0.1.0" @@ -13,10 +71,145 @@ url = "https://codeberg.org/treyd/flexitest.git" reference = "master" resolved_reference = "e083cdbfb0c4dbdad994b6f4fa4f686725073181" +[[package]] +name = "greenlet" +version = "3.0.3" +description = "Lightweight in-process concurrent programming" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.extras] +docs = ["sphinx", "furo"] +test = ["objgraph", "psutil"] + +[[package]] +name = "idna" +version = "3.7" +description = "Internationalized Domain Names in Applications (IDNA)" +category = "main" +optional = false +python-versions = ">=3.5" + +[[package]] +name = "numpy" +version = "1.26.4" +description = "Fundamental package for array computing in Python" +category = "main" +optional = false +python-versions = ">=3.9" + +[[package]] +name = "pycryptodome" +version = "3.20.0" +description = "Cryptographic library for Python" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" + +[[package]] +name = "requests" +version = "2.32.3" +description = "Python HTTP for Humans." +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +certifi = ">=2017.4.17" +charset-normalizer = ">=2,<4" +idna = ">=2.5,<4" +urllib3 = ">=1.21.1,<3" + +[package.extras] +socks = ["PySocks (>=1.5.6,!=1.5.7)"] +use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] + +[[package]] +name = "six" +version = "1.16.0" +description = "Python 2 and 3 compatibility utilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] +name = "sqlalchemy" +version = "2.0.30" +description = "Database Abstraction Library" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +greenlet = {version = "!=0.4.17", markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\""} +typing-extensions = ">=4.6.0" + +[package.extras] +asyncio = ["greenlet (!=0.4.17)"] +mypy = ["mypy (>=0.910)"] +mssql = ["pyodbc"] +mssql-pymssql = ["pymssql"] +mssql-pyodbc = ["pyodbc"] +mysql = ["mysqlclient (>=1.4.0)"] +mysql-connector = ["mysql-connector-python"] +mariadb-connector = ["mariadb (>=1.0.1,!=1.1.2,!=1.1.5)"] +oracle = ["cx_oracle (>=8)"] +oracle-oracledb = ["oracledb (>=1.0.1)"] +postgresql = ["psycopg2 (>=2.7)"] +postgresql-pg8000 = ["pg8000 (>=1.29.1)"] +postgresql-asyncpg = ["greenlet (!=0.4.17)", "asyncpg"] +postgresql-psycopg2binary = ["psycopg2-binary"] +postgresql-psycopg2cffi = ["psycopg2cffi"] +postgresql-psycopg = ["psycopg (>=3.0.7)"] +postgresql-psycopgbinary = ["psycopg[binary] (>=3.0.7)"] +pymysql = ["pymysql"] +aiomysql = ["greenlet (!=0.4.17)", "aiomysql (>=0.2.0)"] +aioodbc = ["greenlet (!=0.4.17)", "aioodbc"] +asyncmy = ["greenlet (!=0.4.17)", "asyncmy (>=0.2.3,!=0.2.4,!=0.2.6)"] +aiosqlite = ["greenlet (!=0.4.17)", "aiosqlite", "typing_extensions (!=3.10.0.1)"] +sqlcipher = ["sqlcipher3-binary"] + +[[package]] +name = "typing-extensions" +version = "4.12.1" +description = "Backported and Experimental Type Hints for Python 3.8+" +category = "main" +optional = false +python-versions = ">=3.8" + +[[package]] +name = "urllib3" +version = "2.2.1" +description = "HTTP library with thread-safe connection pooling, file post, and more." +category = "main" +optional = false +python-versions = ">=3.8" + +[package.extras] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +h2 = ["h2 (>=4,<5)"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] + [metadata] lock-version = "1.1" python-versions = "^3.10" -content-hash = "f3db78e02633ed1639b97a2cdfdedbcfe0dfbbeb0263d3efdae98f0da9d17cd3" +content-hash = "dfe821f8951dff7291c7b8d4d85a223b6c0d76e3cd2f937c72e522f9c1a34ae0" [metadata.files] +bitcoinlib = [] +certifi = [] +charset-normalizer = [] +ecdsa = [] +fastecdsa = [] flexitest = [] +greenlet = [] +idna = [] +numpy = [] +pycryptodome = [] +requests = [] +six = [] +sqlalchemy = [] +typing-extensions = [] +urllib3 = [] diff --git a/test/pyproject.toml b/test/pyproject.toml index 24c8c0790..c18a3ed06 100644 --- a/test/pyproject.toml +++ b/test/pyproject.toml @@ -7,6 +7,7 @@ authors = [] [tool.poetry.dependencies] python = "^3.10" flexitest = {git = "https://codeberg.org/treyd/flexitest.git"} +bitcoinlib = "^0.6.15" [tool.poetry.dev-dependencies]