From 156fe150488a2a14c122a5dc8b376798033f4e6c Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 2 Jan 2025 10:24:33 +0100 Subject: [PATCH] Add new rocks db policy for opening columns --- crates/fuel-core/src/state/rocks_db.rs | 140 +++++++++++++++++++------ 1 file changed, 106 insertions(+), 34 deletions(-) diff --git a/crates/fuel-core/src/state/rocks_db.rs b/crates/fuel-core/src/state/rocks_db.rs index e95128b7b2..5b9189cb35 100644 --- a/crates/fuel-core/src/state/rocks_db.rs +++ b/crates/fuel-core/src/state/rocks_db.rs @@ -96,10 +96,21 @@ impl Drop for DropResources { } } +#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)] +/// Defined behaviour for opening the columns of the database. +pub enum ColumnsPolicy { + #[cfg_attr(not(feature = "rocksdb-production"), default)] + // Open a new column only when a database interaction is done with it. + Lazy, + #[cfg_attr(feature = "rocksdb-production", default)] + // Open all columns on creation on the service. + OnCreation, +} + pub struct RocksDb { read_options: ReadOptions, db: Arc, - create_family: Arc>>, + create_family: Option>>>, snapshot: Option>, metrics: Arc, // used for RAII @@ -126,12 +137,17 @@ where Description: DatabaseDescription, { pub fn default_open_temp(capacity: Option) -> DatabaseResult { - Self::default_open_temp_with_params(capacity, 512) + if cfg!(feature = "rocksdb-production") { + Self::default_open_temp_with_params(capacity, 512, ColumnsPolicy::OnCreation) + } else { + Self::default_open_temp_with_params(capacity, 512, ColumnsPolicy::Lazy) + } } pub fn default_open_temp_with_params( capacity: Option, max_fds: i32, + columns_policy: ColumnsPolicy, ) -> DatabaseResult { let tmp_dir = TempDir::new().unwrap(); let path = tmp_dir.path(); @@ -140,6 +156,7 @@ where enum_iterator::all::().collect::>(), capacity, max_fds, + columns_policy, ); let mut db = result?; @@ -160,12 +177,14 @@ where path: P, capacity: Option, max_fds: i32, + columns_policy: ColumnsPolicy, ) -> DatabaseResult { Self::open( path, enum_iterator::all::().collect::>(), capacity, max_fds, + columns_policy, ) } @@ -181,8 +200,16 @@ where columns: Vec, capacity: Option, max_fds: i32, + columns_policy: ColumnsPolicy, ) -> DatabaseResult { - Self::open_with(DB::open_cf_descriptors, path, columns, capacity, max_fds) + Self::open_with( + DB::open_cf_descriptors, + path, + columns, + capacity, + max_fds, + columns_policy, + ) } pub fn open_read_only>( @@ -191,6 +218,7 @@ where capacity: Option, error_if_log_file_exist: bool, max_fds: i32, + columns_policy: ColumnsPolicy, ) -> DatabaseResult { Self::open_with( |options, primary_path, cfs| { @@ -205,6 +233,7 @@ where columns, capacity, max_fds, + columns_policy, ) } @@ -214,6 +243,7 @@ where columns: Vec, capacity: Option, max_fds: i32, + columns_policy: ColumnsPolicy, ) -> DatabaseResult where PrimaryPath: AsRef, @@ -232,6 +262,7 @@ where columns, capacity, max_fds, + columns_policy, ) } @@ -241,6 +272,7 @@ where columns: Vec, capacity: Option, max_fds: i32, + columns_policy: ColumnsPolicy, ) -> DatabaseResult where F: Fn( @@ -315,7 +347,10 @@ where } } - if cf_descriptors_to_open.is_empty() { + if columns_policy == ColumnsPolicy::OnCreation + || (columns_policy == ColumnsPolicy::Lazy + && cf_descriptors_to_open.is_empty()) + { opts.create_if_missing(true); } @@ -356,8 +391,17 @@ where } .map_err(|e| DatabaseError::Other(e.into()))?; + let create_family = match columns_policy { + ColumnsPolicy::OnCreation => { + for (name, opt) in cf_descriptors_to_create { + db.create_cf(name, &opt) + .map_err(|e| DatabaseError::Other(e.into()))?; + } + None + } + ColumnsPolicy::Lazy => Some(Arc::new(Mutex::new(cf_descriptors_to_create))), + }; let db = Arc::new(db); - let create_family = Arc::new(Mutex::new(cf_descriptors_to_create)); let rocks_db = RocksDb { read_options: Self::generate_read_options(&None), @@ -431,26 +475,29 @@ where match family { None => { - let mut lock = self - .create_family - .lock() - .expect("The create family lock should be available"); - - let name = Self::col_name(column); - let Some(family) = lock.remove(&name) else { - return self - .db - .cf_handle(&Self::col_name(column)) - .expect("No column family found"); - }; - - self.db - .create_cf(&name, &family) - .expect("Couldn't create column family"); - - let family = self.db.cf_handle(&name).expect("invalid column state"); - - family + if let Some(create_family) = &self.create_family { + let mut lock = create_family + .lock() + .expect("The create family lock should be available"); + + let name = Self::col_name(column); + let Some(family) = lock.remove(&name) else { + return self + .db + .cf_handle(&Self::col_name(column)) + .expect("No column family found"); + }; + + self.db + .create_cf(&name, &family) + .expect("Couldn't create column family"); + + let family = self.db.cf_handle(&name).expect("invalid column state"); + + family + } else { + panic!("invalid column state"); + } } Some(family) => family, } @@ -933,7 +980,8 @@ mod tests { fn create_db() -> (RocksDb, TempDir) { let tmp_dir = TempDir::new().unwrap(); ( - RocksDb::default_open(tmp_dir.path(), None, 512).unwrap(), + RocksDb::default_open(tmp_dir.path(), None, 512, ColumnsPolicy::Lazy) + .unwrap(), tmp_dir, ) } @@ -945,17 +993,28 @@ mod tests { // Given let old_columns = vec![Column::Coins, Column::Messages, Column::UploadedBytecodes]; - let database_with_old_columns = - RocksDb::::open(tmp_dir.path(), old_columns.clone(), None, 512) - .expect("Failed to open database with old columns"); + let database_with_old_columns = RocksDb::::open( + tmp_dir.path(), + old_columns.clone(), + None, + 512, + ColumnsPolicy::Lazy, + ) + .expect("Failed to open database with old columns"); drop(database_with_old_columns); // When let mut new_columns = old_columns; new_columns.push(Column::ContractsAssets); new_columns.push(Column::Metadata); - let database_with_new_columns = - RocksDb::::open(tmp_dir.path(), new_columns, None, 512).map(|_| ()); + let database_with_new_columns = RocksDb::::open( + tmp_dir.path(), + new_columns, + None, + 512, + ColumnsPolicy::Lazy, + ) + .map(|_| ()); // Then assert_eq!(Ok(()), database_with_new_columns); @@ -1124,7 +1183,13 @@ mod tests { // When let columns = enum_iterator::all::<::Column>() .collect::>(); - let result = RocksDb::::open(tmp_dir.path(), columns, None, 512); + let result = RocksDb::::open( + tmp_dir.path(), + columns, + None, + 512, + ColumnsPolicy::Lazy, + ); // Then assert!(result.is_err()); @@ -1144,6 +1209,7 @@ mod tests { None, false, 512, + ColumnsPolicy::Lazy, ) .map(|_| ()); @@ -1166,6 +1232,7 @@ mod tests { old_columns.clone(), None, 512, + ColumnsPolicy::Lazy, ) .map(|_| ()); @@ -1257,8 +1324,13 @@ mod tests { enum_iterator::all::<::Column>() .skip(1) .collect::>(); - let open_with_part_of_columns = - RocksDb::::open(tmp_dir.path(), part_of_columns, None, 512); + let open_with_part_of_columns = RocksDb::::open( + tmp_dir.path(), + part_of_columns, + None, + 512, + ColumnsPolicy::Lazy, + ); // Then let _ = open_with_part_of_columns