From 0e96d4f1053d8a903140cc9e2de2d63b9d4dde0b Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 2 Aug 2024 16:58:37 +1000 Subject: [PATCH] Store changes to persist data columns (#6073) * Store changes to persist data columns. Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com> * Update to use `eip7594_fork_epoch` for data column slot in Store. * Fix formatting. * Merge branch 'unstable' into data-columns-store # Conflicts: # beacon_node/store/src/lib.rs # consensus/types/src/chain_spec.rs * Minor refactor. * Merge branch 'unstable' into data-columns-store # Conflicts: # beacon_node/store/src/metrics.rs * Init data colum info at PeerDAS epoch instead of Deneb fork epoch. Address review comments. * Remove Deneb-related comments --- beacon_node/store/src/errors.rs | 2 + beacon_node/store/src/hot_cold_store.rs | 278 ++++++++++++++++++++++-- beacon_node/store/src/leveldb_store.rs | 4 + beacon_node/store/src/lib.rs | 33 ++- beacon_node/store/src/memory_store.rs | 14 +- beacon_node/store/src/metadata.rs | 28 +++ beacon_node/store/src/metrics.rs | 7 + 7 files changed, 348 insertions(+), 18 deletions(-) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 91e6a920ba3..e3b2d327b0a 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -27,6 +27,8 @@ pub enum Error { AnchorInfoConcurrentMutation, /// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied. BlobInfoConcurrentMutation, + /// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied. + DataColumnInfoConcurrentMutation, /// The block or state is unavailable due to weak subjectivity sync. HistoryUnavailable, /// State reconstruction cannot commence because not all historic blocks are known. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 9c247c983a9..8b144c1be93 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -12,12 +12,13 @@ use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY, - BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, - PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, + AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion, + ANCHOR_INFO_KEY, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, + DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, + STATE_UPPER_LIMIT_NO_RETAIN, }; -use crate::metrics; use crate::state_cache::{PutStateOutcome, StateCache}; +use crate::{get_data_column_key, metrics, parse_data_column_key}; use crate::{ get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, @@ -35,11 +36,13 @@ use state_processing::{ SlotProcessingError, }; use std::cmp::min; +use std::collections::HashMap; use std::marker::PhantomData; use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; use std::time::Duration; +use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidecarList}; use types::*; /// On-disk database that stores finalized states efficiently. @@ -57,6 +60,8 @@ pub struct HotColdDB, Cold: ItemStore> { anchor_info: RwLock>, /// The starting slots for the range of blobs stored in the database. blob_info: RwLock, + /// The starting slots for the range of data columns stored in the database. + data_column_info: RwLock, pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, @@ -86,6 +91,7 @@ pub struct HotColdDB, Cold: ItemStore> { struct BlockCache { block_cache: LruCache>, blob_cache: LruCache>, + data_column_cache: LruCache>>>, } impl BlockCache { @@ -93,6 +99,7 @@ impl BlockCache { Self { block_cache: LruCache::new(size), blob_cache: LruCache::new(size), + data_column_cache: LruCache::new(size), } } pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock) { @@ -101,12 +108,26 @@ impl BlockCache { pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList) { self.blob_cache.put(block_root, blobs); } + pub fn put_data_column(&mut self, block_root: Hash256, data_column: Arc>) { + self.data_column_cache + .get_or_insert_mut(block_root, Default::default) + .insert(data_column.index, data_column); + } pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock> { self.block_cache.get(block_root) } pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { self.blob_cache.get(block_root) } + pub fn get_data_column<'a>( + &'a mut self, + block_root: &Hash256, + column_index: &ColumnIndex, + ) -> Option<&'a Arc>> { + self.data_column_cache + .get(block_root) + .and_then(|map| map.get(column_index)) + } pub fn delete_block(&mut self, block_root: &Hash256) { let _ = self.block_cache.pop(block_root); } @@ -180,6 +201,7 @@ impl HotColdDB, MemoryStore> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), + data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: MemoryStore::open(), blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), @@ -216,6 +238,7 @@ impl HotColdDB, LevelDB> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), + data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: LevelDB::open(cold_path)?, blobs_db: LevelDB::open(blobs_db_path)?, hot_db: LevelDB::open(hot_path)?, @@ -294,11 +317,39 @@ impl HotColdDB, LevelDB> { }, }; db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?; + + let data_column_info = db.load_data_column_info()?; + let eip7594_fork_slot = db + .spec + .eip7594_fork_epoch + .map(|epoch| epoch.start_slot(E::slots_per_epoch())); + let new_data_column_info = match &data_column_info { + Some(data_column_info) => { + // Set the oldest data column slot to the fork slot if it is not yet set. + let oldest_data_column_slot = data_column_info + .oldest_data_column_slot + .or(eip7594_fork_slot); + DataColumnInfo { + oldest_data_column_slot, + } + } + // First start. + None => DataColumnInfo { + // Set the oldest data column slot to the fork slot if it is not yet set. + oldest_data_column_slot: eip7594_fork_slot, + }, + }; + db.compare_and_set_data_column_info_with_write( + <_>::default(), + new_data_column_info.clone(), + )?; + info!( db.log, "Blob DB initialized"; "path" => ?blobs_db_path, "oldest_blob_slot" => ?new_blob_info.oldest_blob_slot, + "oldest_data_column_slot" => ?new_data_column_info.oldest_data_column_slot, ); // Ensure that the schema version of the on-disk database matches the software. @@ -626,6 +677,24 @@ impl, Cold: ItemStore> HotColdDB ops.push(KeyValueStoreOp::PutKeyValue(db_key, blobs.as_ssz_bytes())); } + pub fn data_columns_as_kv_store_ops( + &self, + block_root: &Hash256, + data_columns: DataColumnSidecarList, + ops: &mut Vec, + ) { + for data_column in data_columns { + let db_key = get_key_for_col( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(block_root, &data_column.index), + ); + ops.push(KeyValueStoreOp::PutKeyValue( + db_key, + data_column.as_ssz_bytes(), + )); + } + } + pub fn put_state_summary( &self, state_root: &Hash256, @@ -909,6 +978,14 @@ impl, Cold: ItemStore> HotColdDB self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch); } + StoreOp::PutDataColumns(block_root, data_columns) => { + self.data_columns_as_kv_store_ops( + &block_root, + data_columns, + &mut key_value_batch, + ); + } + StoreOp::PutStateSummary(state_root, summary) => { key_value_batch.push(summary.as_kv_store_op(state_root)); } @@ -933,6 +1010,16 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } + StoreOp::DeleteDataColumns(block_root, column_indices) => { + for index in column_indices { + let key = get_key_for_col( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(&block_root, &index), + ); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } + } + StoreOp::DeleteState(state_root, slot) => { let state_summary_key = get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); @@ -963,9 +1050,10 @@ impl, Cold: ItemStore> HotColdDB batch: Vec>, ) -> Result<(), Error> { let mut blobs_to_delete = Vec::new(); + let mut data_columns_to_delete = Vec::new(); let (blobs_ops, hot_db_ops): (Vec>, Vec>) = batch.into_iter().partition(|store_op| match store_op { - StoreOp::PutBlobs(_, _) => true, + StoreOp::PutBlobs(_, _) | StoreOp::PutDataColumns(_, _) => true, StoreOp::DeleteBlobs(block_root) => { match self.get_blobs(block_root) { Ok(Some(blob_sidecar_list)) => { @@ -982,6 +1070,31 @@ impl, Cold: ItemStore> HotColdDB } true } + StoreOp::DeleteDataColumns(block_root, indices) => { + match indices + .iter() + .map(|index| self.get_data_column(block_root, index)) + .collect::, _>>() + { + Ok(data_column_sidecar_list_opt) => { + let data_column_sidecar_list = data_column_sidecar_list_opt + .into_iter() + .flatten() + .collect::>(); + // Must push the same number of items as StoreOp::DeleteDataColumns items to + // prevent a `HotColdDBError::Rollback` error below in case of rollback + data_columns_to_delete.push((*block_root, data_column_sidecar_list)); + } + Err(e) => { + error!( + self.log, "Error getting data columns"; + "block_root" => %block_root, + "error" => ?e + ); + } + } + true + } StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false, _ => false, }); @@ -1013,10 +1126,20 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops.iter_mut() { let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), + StoreOp::PutDataColumns(block_root, data_columns) => { + let indices = data_columns.iter().map(|c| c.index).collect(); + StoreOp::DeleteDataColumns(*block_root, indices) + } StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, + StoreOp::DeleteDataColumns(_, _) => match data_columns_to_delete.pop() { + Some((block_root, data_columns)) => { + StoreOp::PutDataColumns(block_root, data_columns) + } + None => return Err(HotColdDBError::Rollback.into()), + }, _ => return Err(HotColdDBError::Rollback.into()), }; *op = reverse_op; @@ -1034,6 +1157,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutBlobs(_, _) => (), + StoreOp::PutDataColumns(_, _) => (), + StoreOp::PutState(_, _) => (), StoreOp::PutStateSummary(_, _) => (), @@ -1053,6 +1178,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteBlobs(_) => (), + StoreOp::DeleteDataColumns(_, _) => (), + StoreOp::DeleteExecutionPayload(_) => (), StoreOp::KeyValueOp(_) => (), @@ -1552,6 +1679,45 @@ impl, Cold: ItemStore> HotColdDB } } + /// Fetch all keys in the data_column column with prefix `block_root` + pub fn get_data_column_keys(&self, block_root: Hash256) -> Result, Error> { + self.blobs_db + .iter_raw_keys(DBColumn::BeaconDataColumn, block_root.as_bytes()) + .map(|key| key.and_then(|key| parse_data_column_key(key).map(|key| key.1))) + .collect() + } + + /// Fetch a single data_column for a given block from the store. + pub fn get_data_column( + &self, + block_root: &Hash256, + column_index: &ColumnIndex, + ) -> Result>>, Error> { + // Check the cache. + if let Some(data_column) = self + .block_cache + .lock() + .get_data_column(block_root, column_index) + { + metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT); + return Ok(Some(data_column.clone())); + } + + match self.blobs_db.get_bytes( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(block_root, column_index), + )? { + Some(ref data_column_bytes) => { + let data_column = Arc::new(DataColumnSidecar::from_ssz_bytes(data_column_bytes)?); + self.block_cache + .lock() + .put_data_column(*block_root, data_column.clone()); + Ok(Some(data_column)) + } + None => Ok(None), + } + } + /// Get a reference to the `ChainSpec` used by the database. pub fn get_chain_spec(&self) -> &ChainSpec { &self.spec @@ -1748,6 +1914,24 @@ impl, Cold: ItemStore> HotColdDB self.blob_info.read_recursive().clone() } + /// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint. + pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result { + let oldest_data_column_slot = self.spec.eip7594_fork_epoch.map(|fork_epoch| { + std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) + }); + let data_column_info = DataColumnInfo { + oldest_data_column_slot, + }; + self.compare_and_set_data_column_info(self.get_data_column_info(), data_column_info) + } + + /// Get a clone of the store's data column info. + /// + /// To do mutations, use `compare_and_set_data_column_info`. + pub fn get_data_column_info(&self) -> DataColumnInfo { + self.data_column_info.read_recursive().clone() + } + /// Atomically update the blob info from `prev_value` to `new_value`. /// /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other @@ -1793,6 +1977,54 @@ impl, Cold: ItemStore> HotColdDB blob_info.as_kv_store_op(BLOB_INFO_KEY) } + /// Atomically update the data column info from `prev_value` to `new_value`. + /// + /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other + /// values. + /// + /// Return an `DataColumnInfoConcurrentMutation` error if the `prev_value` provided + /// is not correct. + pub fn compare_and_set_data_column_info( + &self, + prev_value: DataColumnInfo, + new_value: DataColumnInfo, + ) -> Result { + let mut data_column_info = self.data_column_info.write(); + if *data_column_info == prev_value { + let kv_op = self.store_data_column_info_in_batch(&new_value); + *data_column_info = new_value; + Ok(kv_op) + } else { + Err(Error::DataColumnInfoConcurrentMutation) + } + } + + /// As for `compare_and_set_data_column_info`, but also writes the blob info to disk immediately. + pub fn compare_and_set_data_column_info_with_write( + &self, + prev_value: DataColumnInfo, + new_value: DataColumnInfo, + ) -> Result<(), Error> { + let kv_store_op = self.compare_and_set_data_column_info(prev_value, new_value)?; + self.hot_db.do_atomically(vec![kv_store_op]) + } + + /// Load the blob info from disk, but do not set `self.data_column_info`. + fn load_data_column_info(&self) -> Result, Error> { + self.hot_db.get(&DATA_COLUMN_INFO_KEY) + } + + /// Store the given `data_column_info` to disk. + /// + /// The argument is intended to be `self.data_column_info`, but is passed manually to avoid issues + /// with recursive locking. + fn store_data_column_info_in_batch( + &self, + data_column_info: &DataColumnInfo, + ) -> KeyValueStoreOp { + data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY) + } + /// Return the slot-window describing the available historic states. /// /// Returns `(lower_limit, upper_limit)`. @@ -2285,15 +2517,33 @@ impl, Cold: ItemStore> HotColdDB } }; - if Some(block_root) != last_pruned_block_root && self.blobs_exist(&block_root)? { - trace!( - self.log, - "Pruning blobs of block"; - "slot" => slot, - "block_root" => ?block_root, - ); - last_pruned_block_root = Some(block_root); - ops.push(StoreOp::DeleteBlobs(block_root)); + if Some(block_root) != last_pruned_block_root { + if self + .spec + .is_peer_das_enabled_for_epoch(slot.epoch(E::slots_per_epoch())) + { + // data columns + let indices = self.get_data_column_keys(block_root)?; + if !indices.is_empty() { + trace!( + self.log, + "Pruning data columns of block"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteDataColumns(block_root, indices)); + } + } else if self.blobs_exist(&block_root)? { + trace!( + self.log, + "Pruning blobs of block"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteBlobs(block_root)); + } } if slot >= end_slot { diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index b28bf689f83..32ff942ddc7 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -270,6 +270,10 @@ impl db_key::Key for BytesKey { } impl BytesKey { + pub fn starts_with(&self, prefix: &Self) -> bool { + self.key.starts_with(&prefix.key) + } + /// Return `true` iff this `BytesKey` was created with the given `column`. pub fn matches_column(&self, column: DBColumn) -> bool { self.key.starts_with(column.as_bytes()) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3b6d9ddff63..1f8cc8ca019 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -44,6 +44,8 @@ use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; pub use types::*; +const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8; + pub type ColumnIter<'a, K> = Box), Error>> + 'a>; pub type ColumnKeyIter<'a, K> = Box> + 'a>; @@ -109,9 +111,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { Box::new(std::iter::empty()) } - fn iter_raw_keys(&self, _column: DBColumn, _prefix: &[u8]) -> RawKeyIter { - Box::new(std::iter::empty()) - } + fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter; /// Iterate through all keys in a particular column. fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; @@ -150,6 +150,28 @@ pub fn get_col_from_key(key: &[u8]) -> Option { String::from_utf8(key[0..3].to_vec()).ok() } +pub fn get_data_column_key(block_root: &Hash256, column_index: &ColumnIndex) -> Vec { + let mut result = block_root.as_bytes().to_vec(); + result.extend_from_slice(&column_index.to_le_bytes()); + result +} + +pub fn parse_data_column_key(data: Vec) -> Result<(Hash256, ColumnIndex), Error> { + if data.len() != DBColumn::BeaconDataColumn.key_size() { + return Err(Error::InvalidKey); + } + // split_at panics if 32 < 40 which will never happen after the length check above + let (block_root_bytes, column_index_bytes) = data.split_at(32); + let block_root = Hash256::from_slice(block_root_bytes); + // column_index_bytes is asserted to be 8 bytes after the length check above + let column_index = ColumnIndex::from_le_bytes( + column_index_bytes + .try_into() + .map_err(|_| Error::InvalidKey)?, + ); + Ok((block_root, column_index)) +} + #[must_use] #[derive(Clone)] pub enum KeyValueStoreOp { @@ -210,11 +232,13 @@ pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), PutBlobs(Hash256, BlobSidecarList), + PutDataColumns(Hash256, DataColumnSidecarList), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), DeleteBlobs(Hash256), + DeleteDataColumns(Hash256, Vec), DeleteState(Hash256, Option), DeleteExecutionPayload(Hash256), KeyValueOp(KeyValueStoreOp), @@ -230,6 +254,8 @@ pub enum DBColumn { BeaconBlock, #[strum(serialize = "blb")] BeaconBlob, + #[strum(serialize = "bdc")] + BeaconDataColumn, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). #[strum(serialize = "ste")] BeaconState, @@ -317,6 +343,7 @@ impl DBColumn { | Self::BeaconHistoricalRoots | Self::BeaconHistoricalSummaries | Self::BeaconRandaoMixes => 8, + Self::BeaconDataColumn => DATA_COLUMN_DB_KEY_SIZE, } } } diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 302d2c2add2..4c7bfdf10ff 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,6 +1,6 @@ use crate::{ get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error, - ItemStore, Key, KeyValueStore, KeyValueStoreOp, + ItemStore, Key, KeyValueStore, KeyValueStoreOp, RawKeyIter, }; use parking_lot::{Mutex, MutexGuard, RwLock}; use std::collections::BTreeMap; @@ -100,6 +100,18 @@ impl KeyValueStore for MemoryStore { })) } + fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), prefix)); + let keys = self + .db + .read() + .range(start_key.clone()..) + .take_while(|(k, _)| k.starts_with(&start_key)) + .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) + .collect::>(); + Box::new(keys.into_iter().map(Ok)) + } + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index a22dc4aab4c..0c93251fe2e 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -16,6 +16,7 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); +pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7); /// State upper limit value used to indicate that a node is not storing historic states. pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX); @@ -152,3 +153,30 @@ impl StoreItem for BlobInfo { Ok(Self::from_ssz_bytes(bytes)?) } } + +/// Database parameters relevant to data column sync. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] +pub struct DataColumnInfo { + /// The slot after which data columns are or *will be* available (>=). + /// + /// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which + /// data columns will be available. + /// + /// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is + /// not yet known. + pub oldest_data_column_slot: Option, +} + +impl StoreItem for DataColumnInfo { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index af7b5e93e89..902c440be86 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -151,6 +151,13 @@ pub static BEACON_BLOBS_CACHE_HIT_COUNT: LazyLock> = LazyLock "Number of hits to the store's blob cache", ) }); +pub static BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "store_beacon_data_columns_cache_hit_total", + "Number of hits to the store's data column cache", + ) + }); /// Updates the global metrics registry with store-related information. pub fn scrape_for_metrics(db_path: &Path, freezer_db_path: &Path) {