Skip to content

Commit

Permalink
Add new rocks db policy for opening columns
Browse files Browse the repository at this point in the history
  • Loading branch information
AurelienFT committed Jan 2, 2025
1 parent b4c20a0 commit 156fe15
Showing 1 changed file with 106 additions and 34 deletions.
140 changes: 106 additions & 34 deletions crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,21 @@ impl Drop for DropResources {
}
}

#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
/// Defined behaviour for opening the columns of the database.
pub enum ColumnsPolicy {
#[cfg_attr(not(feature = "rocksdb-production"), default)]
// Open a new column only when a database interaction is done with it.
Lazy,
#[cfg_attr(feature = "rocksdb-production", default)]
// Open all columns on creation on the service.
OnCreation,
}

pub struct RocksDb<Description> {
read_options: ReadOptions,
db: Arc<DB>,
create_family: Arc<Mutex<BTreeMap<String, Options>>>,
create_family: Option<Arc<Mutex<BTreeMap<String, Options>>>>,
snapshot: Option<rocksdb::SnapshotWithThreadMode<'static, DB>>,
metrics: Arc<DatabaseMetrics>,
// used for RAII
Expand All @@ -126,12 +137,17 @@ where
Description: DatabaseDescription,
{
pub fn default_open_temp(capacity: Option<usize>) -> DatabaseResult<Self> {
Self::default_open_temp_with_params(capacity, 512)
if cfg!(feature = "rocksdb-production") {
Self::default_open_temp_with_params(capacity, 512, ColumnsPolicy::OnCreation)
} else {
Self::default_open_temp_with_params(capacity, 512, ColumnsPolicy::Lazy)
}
}

pub fn default_open_temp_with_params(
capacity: Option<usize>,
max_fds: i32,
columns_policy: ColumnsPolicy,
) -> DatabaseResult<Self> {
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path();
Expand All @@ -140,6 +156,7 @@ where
enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
capacity,
max_fds,
columns_policy,
);
let mut db = result?;

Expand All @@ -160,12 +177,14 @@ where
path: P,
capacity: Option<usize>,
max_fds: i32,
columns_policy: ColumnsPolicy,
) -> DatabaseResult<Self> {
Self::open(
path,
enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
capacity,
max_fds,
columns_policy,
)
}

Expand All @@ -181,8 +200,16 @@ where
columns: Vec<Description::Column>,
capacity: Option<usize>,
max_fds: i32,
columns_policy: ColumnsPolicy,
) -> DatabaseResult<Self> {
Self::open_with(DB::open_cf_descriptors, path, columns, capacity, max_fds)
Self::open_with(
DB::open_cf_descriptors,
path,
columns,
capacity,
max_fds,
columns_policy,
)
}

pub fn open_read_only<P: AsRef<Path>>(
Expand All @@ -191,6 +218,7 @@ where
capacity: Option<usize>,
error_if_log_file_exist: bool,
max_fds: i32,
columns_policy: ColumnsPolicy,
) -> DatabaseResult<Self> {
Self::open_with(
|options, primary_path, cfs| {
Expand All @@ -205,6 +233,7 @@ where
columns,
capacity,
max_fds,
columns_policy,
)
}

Expand All @@ -214,6 +243,7 @@ where
columns: Vec<Description::Column>,
capacity: Option<usize>,
max_fds: i32,
columns_policy: ColumnsPolicy,
) -> DatabaseResult<Self>
where
PrimaryPath: AsRef<Path>,
Expand All @@ -232,6 +262,7 @@ where
columns,
capacity,
max_fds,
columns_policy,
)
}

Expand All @@ -241,6 +272,7 @@ where
columns: Vec<Description::Column>,
capacity: Option<usize>,
max_fds: i32,
columns_policy: ColumnsPolicy,
) -> DatabaseResult<Self>
where
F: Fn(
Expand Down Expand Up @@ -315,7 +347,10 @@ where
}
}

if cf_descriptors_to_open.is_empty() {
if columns_policy == ColumnsPolicy::OnCreation
|| (columns_policy == ColumnsPolicy::Lazy
&& cf_descriptors_to_open.is_empty())
{
opts.create_if_missing(true);
}

Expand Down Expand Up @@ -356,8 +391,17 @@ where
}
.map_err(|e| DatabaseError::Other(e.into()))?;

let create_family = match columns_policy {
ColumnsPolicy::OnCreation => {
for (name, opt) in cf_descriptors_to_create {
db.create_cf(name, &opt)
.map_err(|e| DatabaseError::Other(e.into()))?;
}
None
}
ColumnsPolicy::Lazy => Some(Arc::new(Mutex::new(cf_descriptors_to_create))),
};
let db = Arc::new(db);
let create_family = Arc::new(Mutex::new(cf_descriptors_to_create));

let rocks_db = RocksDb {
read_options: Self::generate_read_options(&None),
Expand Down Expand Up @@ -431,26 +475,29 @@ where

match family {
None => {
let mut lock = self
.create_family
.lock()
.expect("The create family lock should be available");

let name = Self::col_name(column);
let Some(family) = lock.remove(&name) else {
return self
.db
.cf_handle(&Self::col_name(column))
.expect("No column family found");
};

self.db
.create_cf(&name, &family)
.expect("Couldn't create column family");

let family = self.db.cf_handle(&name).expect("invalid column state");

family
if let Some(create_family) = &self.create_family {
let mut lock = create_family
.lock()
.expect("The create family lock should be available");

let name = Self::col_name(column);
let Some(family) = lock.remove(&name) else {
return self
.db
.cf_handle(&Self::col_name(column))
.expect("No column family found");
};

self.db
.create_cf(&name, &family)
.expect("Couldn't create column family");

let family = self.db.cf_handle(&name).expect("invalid column state");

family
} else {
panic!("invalid column state");
}
}
Some(family) => family,
}
Expand Down Expand Up @@ -933,7 +980,8 @@ mod tests {
fn create_db() -> (RocksDb<OnChain>, TempDir) {
let tmp_dir = TempDir::new().unwrap();
(
RocksDb::default_open(tmp_dir.path(), None, 512).unwrap(),
RocksDb::default_open(tmp_dir.path(), None, 512, ColumnsPolicy::Lazy)
.unwrap(),
tmp_dir,
)
}
Expand All @@ -945,17 +993,28 @@ mod tests {
// Given
let old_columns =
vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
let database_with_old_columns =
RocksDb::<OnChain>::open(tmp_dir.path(), old_columns.clone(), None, 512)
.expect("Failed to open database with old columns");
let database_with_old_columns = RocksDb::<OnChain>::open(
tmp_dir.path(),
old_columns.clone(),
None,
512,
ColumnsPolicy::Lazy,
)
.expect("Failed to open database with old columns");
drop(database_with_old_columns);

// When
let mut new_columns = old_columns;
new_columns.push(Column::ContractsAssets);
new_columns.push(Column::Metadata);
let database_with_new_columns =
RocksDb::<OnChain>::open(tmp_dir.path(), new_columns, None, 512).map(|_| ());
let database_with_new_columns = RocksDb::<OnChain>::open(
tmp_dir.path(),
new_columns,
None,
512,
ColumnsPolicy::Lazy,
)
.map(|_| ());

// Then
assert_eq!(Ok(()), database_with_new_columns);
Expand Down Expand Up @@ -1124,7 +1183,13 @@ mod tests {
// When
let columns = enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
.collect::<Vec<_>>();
let result = RocksDb::<OnChain>::open(tmp_dir.path(), columns, None, 512);
let result = RocksDb::<OnChain>::open(
tmp_dir.path(),
columns,
None,
512,
ColumnsPolicy::Lazy,
);

// Then
assert!(result.is_err());
Expand All @@ -1144,6 +1209,7 @@ mod tests {
None,
false,
512,
ColumnsPolicy::Lazy,
)
.map(|_| ());

Expand All @@ -1166,6 +1232,7 @@ mod tests {
old_columns.clone(),
None,
512,
ColumnsPolicy::Lazy,
)
.map(|_| ());

Expand Down Expand Up @@ -1257,8 +1324,13 @@ mod tests {
enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
.skip(1)
.collect::<Vec<_>>();
let open_with_part_of_columns =
RocksDb::<OnChain>::open(tmp_dir.path(), part_of_columns, None, 512);
let open_with_part_of_columns = RocksDb::<OnChain>::open(
tmp_dir.path(),
part_of_columns,
None,
512,
ColumnsPolicy::Lazy,
);

// Then
let _ = open_with_part_of_columns
Expand Down

0 comments on commit 156fe15

Please sign in to comment.