Skip to content

Commit

Permalink
exposed more rocksdb options, increased max files (#1481)
Browse files Browse the repository at this point in the history
* exposed more rocksdb options, increased max files

* default to lz4 compression
  • Loading branch information
LesnyRumcajs authored Mar 18, 2022
1 parent d52d4f7 commit 41e13c9
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions forest/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Config {
pub sync: SyncConfig,
pub encrypt_keystore: bool,
pub metrics_port: u16,
pub rocks_db: db::rocks::RocksDbConfig,
pub rocks_db: db::rocks_config::RocksDbConfig,
}

impl Default for Config {
Expand All @@ -44,7 +44,7 @@ impl Default for Config {
sync: SyncConfig::default(),
encrypt_keystore: true,
metrics_port: 6116,
rocks_db: db::rocks::RocksDbConfig::default(),
rocks_db: db::rocks_config::RocksDbConfig::default(),
}
}
}
23 changes: 15 additions & 8 deletions forest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use libp2p::identity::{ed25519, Keypair};
use log::{debug, info, trace, warn};
use rpassword::read_password;

use db::rocks::RocksDb;
use std::io::prelude::*;
use std::path::PathBuf;
use std::sync::Arc;
use std::time;

/// Starts daemon process
pub(super) async fn start(config: Config) {
Expand Down Expand Up @@ -117,7 +119,7 @@ pub(super) async fn start(config: Config) {
.expect("Opening SledDB must succeed");

#[cfg(feature = "rocksdb")]
let db = db::rocks::RocksDb::open(PathBuf::from(&config.data_dir).join("db"), config.rocks_db)
let db = db::rocks::RocksDb::open(PathBuf::from(&config.data_dir).join("db"), &config.rocks_db)
.expect("Opening RocksDB must succeed");

let db = Arc::new(db);
Expand All @@ -144,13 +146,7 @@ pub(super) async fn start(config: Config) {

info!("Using network :: {}", network_name);

let validate_height = if config.snapshot { None } else { Some(0) };
// Sync from snapshot
if let Some(path) = &config.snapshot_path {
import_chain::<FullVerifier, _>(&state_manager, path, validate_height, config.skip_load)
.await
.unwrap();
}
sync_from_snapshot(&config, &state_manager).await;

// Fetch and ensure verification keys are downloaded
get_params_default(SectorSizeOpt::Keys, false)
Expand Down Expand Up @@ -261,6 +257,17 @@ pub(super) async fn start(config: Config) {
info!("Forest finish shutdown");
}

async fn sync_from_snapshot(config: &Config, state_manager: &Arc<StateManager<RocksDb>>) {
if let Some(path) = &config.snapshot_path {
let stopwatch = time::Instant::now();
let validate_height = if config.snapshot { None } else { Some(0) };
import_chain::<FullVerifier, _>(state_manager, path, validate_height, config.skip_load)
.await
.expect("Failed miserably while importing chain from snapshot");
debug!("Imported snapshot in: {}s", stopwatch.elapsed().as_secs());
}
}

#[cfg(test)]
#[cfg(not(any(feature = "interopnet", feature = "devnet")))]
mod test {
Expand Down
1 change: 1 addition & 0 deletions node/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ encoding = { package = "forest_encoding", version = "0.2" }
thiserror = "1.0"
num_cpus = "1.13"
serde = { version = "1.0", features = ["derive"] }
anyhow = "1"

[dev-dependencies]
tempfile = "3.3"
3 changes: 3 additions & 0 deletions node/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod memory;
#[cfg(feature = "rocksdb")]
pub mod rocks;

#[cfg(feature = "rocksdb")]
pub mod rocks_config;

#[cfg(feature = "sled")]
pub mod sled;

Expand Down
43 changes: 18 additions & 25 deletions node/db/src/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,10 @@

use super::errors::Error;
use super::Store;
use num_cpus;
use crate::rocks_config::{compaction_style_from_str, compression_type_from_str, RocksDbConfig};
pub use rocksdb::{Options, WriteBatch, DB};
use serde::Deserialize;
use std::path::Path;

#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct RocksDbConfig {
create_if_missing: bool,
parallelism: i32,
write_buffer_size: usize,
max_open_files: i32,
}

impl Default for RocksDbConfig {
fn default() -> Self {
Self {
create_if_missing: true,
parallelism: num_cpus::get() as i32,
write_buffer_size: 256 * 1024 * 1024,
max_open_files: 200,
}
}
}

/// RocksDB instance this satisfies the [Store] interface.
#[derive(Debug)]
pub struct RocksDb {
Expand All @@ -38,12 +17,13 @@ pub struct RocksDb {
///
/// Usage:
/// ```no_run
/// use forest_db::rocks::{RocksDb, RocksDbConfig};
/// use forest_db::rocks::RocksDb;
/// use forest_db::rocks_config::RocksDbConfig;
///
/// let mut db = RocksDb::open("test_db", RocksDbConfig::default()).unwrap();
/// let mut db = RocksDb::open("test_db", &RocksDbConfig::default()).unwrap();
/// ```
impl RocksDb {
pub fn open<P>(path: P, config: RocksDbConfig) -> Result<Self, Error>
pub fn open<P>(path: P, config: &RocksDbConfig) -> Result<Self, Error>
where
P: AsRef<Path>,
{
Expand All @@ -52,6 +32,19 @@ impl RocksDb {
db_opts.increase_parallelism(config.parallelism);
db_opts.set_write_buffer_size(config.write_buffer_size);
db_opts.set_max_open_files(config.max_open_files);

if let Some(max_background_jobs) = config.max_background_jobs {
db_opts.set_max_background_jobs(max_background_jobs);
}
if let Some(compaction_style) = &config.compaction_style {
db_opts.set_compaction_style(compaction_style_from_str(compaction_style).unwrap());
}
if let Some(compression_type) = &config.compression_type {
db_opts.set_compression_type(compression_type_from_str(compression_type).unwrap());
}
if config.enable_statistics {
db_opts.enable_statistics();
};
Ok(Self {
db: DB::open(&db_opts, path)?,
})
Expand Down
108 changes: 108 additions & 0 deletions node/db/src/rocks_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2019-2022 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT
use anyhow::anyhow;
use num_cpus;
use rocksdb::{DBCompactionStyle, DBCompressionType};
use serde::Deserialize;

/// RocksDB configuration exposed in Forest.
/// Only subset of possible options is implemented, add missing ones when needed.
/// For description of different options please refer to the `rocksdb` crate documentation.
/// <https://docs.rs/rocksdb/latest/rocksdb/>
#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct RocksDbConfig {
pub create_if_missing: bool,
pub parallelism: i32,
pub write_buffer_size: usize,
pub max_open_files: i32,
pub max_background_jobs: Option<i32>,
pub compression_type: Option<String>,
pub compaction_style: Option<String>,
pub enable_statistics: bool,
}

impl Default for RocksDbConfig {
fn default() -> Self {
Self {
create_if_missing: true,
parallelism: num_cpus::get() as i32,
write_buffer_size: 256 * 1024 * 1024,
max_open_files: 1024,
max_background_jobs: None,
compaction_style: None,
compression_type: Some("lz4".into()),
enable_statistics: false,
}
}
}

/// Converts string to a compaction style RocksDB variant.
pub(crate) fn compaction_style_from_str(s: &str) -> anyhow::Result<DBCompactionStyle> {
match s.to_lowercase().as_str() {
"level" => Ok(DBCompactionStyle::Level),
"universal" => Ok(DBCompactionStyle::Universal),
"fifo" => Ok(DBCompactionStyle::Fifo),
_ => Err(anyhow!("invalid compaction option")),
}
}

/// Converts string to a compression type RocksDB variant.
pub(crate) fn compression_type_from_str(s: &str) -> anyhow::Result<DBCompressionType> {
match s.to_lowercase().as_str() {
"bz2" => Ok(DBCompressionType::Bz2),
"lz4" => Ok(DBCompressionType::Lz4),
"lz4hc" => Ok(DBCompressionType::Lz4hc),
"snappy" => Ok(DBCompressionType::Snappy),
"zlib" => Ok(DBCompressionType::Zlib),
"zstd" => Ok(DBCompressionType::Zstd),
"none" => Ok(DBCompressionType::None),
_ => Err(anyhow!("invalid compression option")),
}
}

#[cfg(test)]
mod test {
use super::*;
use rocksdb::DBCompactionStyle;

#[test]
fn compaction_style_from_str_test() {
let test_cases = vec![
("Level", Ok(DBCompactionStyle::Level)),
("UNIVERSAL", Ok(DBCompactionStyle::Universal)),
("fifo", Ok(DBCompactionStyle::Fifo)),
("cthulhu", Err(anyhow!("some error message"))),
];
for (input, expected) in test_cases {
let actual = compaction_style_from_str(input);
if let Ok(compaction_style) = actual {
assert_eq!(expected.unwrap(), compaction_style);
} else {
assert!(expected.is_err());
}
}
}

#[test]
fn compression_style_from_str_test() {
let test_cases = vec![
("bz2", Ok(DBCompressionType::Bz2)),
("lz4", Ok(DBCompressionType::Lz4)),
("lz4HC", Ok(DBCompressionType::Lz4hc)),
("SNAPPY", Ok(DBCompressionType::Snappy)),
("zlib", Ok(DBCompressionType::Zlib)),
("ZSTD", Ok(DBCompressionType::Zstd)),
("none", Ok(DBCompressionType::None)),
("cthulhu", Err(anyhow!("some error message"))),
];
for (input, expected) in test_cases {
let actual = compression_type_from_str(input);
if let Ok(compression_type) = actual {
assert_eq!(expected.unwrap(), compression_type);
} else {
assert!(expected.is_err());
}
}
}
}
5 changes: 3 additions & 2 deletions node/db/tests/db_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

#![cfg(feature = "rocksdb")]

use forest_db::rocks::{RocksDb, RocksDbConfig};
use forest_db::rocks::RocksDb;
use forest_db::rocks_config::RocksDbConfig;
use std::ops::Deref;

/// Temporary, self-cleaning RocksDB
Expand All @@ -22,7 +23,7 @@ impl TempRocksDB {
let path = dir.path().join("db");

TempRocksDB {
db: RocksDb::open(&path, RocksDbConfig::default()).unwrap(),
db: RocksDb::open(&path, &RocksDbConfig::default()).unwrap(),
_dir: dir,
}
}
Expand Down

0 comments on commit 41e13c9

Please sign in to comment.