Skip to content

Commit

Permalink
feat: expose more rocksdb options (apache#1033)
Browse files Browse the repository at this point in the history
## Rationale
Now rocksdb as the wal, it is easy to become the bottleneck of write.

## Detailed Changes
1. expose more rocksdb options to avoid write stall
2. introduce rocksdb's FIFO compaction style, which makes rocksdb looks
like a message queue.
(FIFO is more suitable for time-series data, maybe it will become the
default option in the future.)
## Test Plan
I will test it in my test env.

---------

Co-authored-by: WEI Xikai <[email protected]>
  • Loading branch information
2 people authored and dust1 committed Aug 9, 2023
1 parent 6c001b0 commit 57689b9
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 2 deletions.
48 changes: 48 additions & 0 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,63 @@ impl WalsOpener for RocksDBWalsOpener {
let data_path = Path::new(&rocksdb_wal_config.data_dir);
let wal_path = data_path.join(WAL_DIR_NAME);
let data_wal = RocksWalBuilder::new(wal_path, write_runtime.clone())
.max_subcompactions(rocksdb_wal_config.data_namespace.max_subcompactions)
.max_background_jobs(rocksdb_wal_config.data_namespace.max_background_jobs)
.enable_statistics(rocksdb_wal_config.data_namespace.enable_statistics)
.write_buffer_size(rocksdb_wal_config.data_namespace.write_buffer_size.0)
.max_write_buffer_number(rocksdb_wal_config.data_namespace.max_write_buffer_number)
.level_zero_file_num_compaction_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_file_num_compaction_trigger,
)
.level_zero_slowdown_writes_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_slowdown_writes_trigger,
)
.level_zero_stop_writes_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_stop_writes_trigger,
)
.fifo_compaction_max_table_files_size(
rocksdb_wal_config
.data_namespace
.fifo_compaction_max_table_files_size
.0,
)
.build()
.context(OpenWal)?;

let manifest_path = data_path.join(MANIFEST_DIR_NAME);
let manifest_wal = RocksWalBuilder::new(manifest_path, write_runtime)
.max_subcompactions(rocksdb_wal_config.meta_namespace.max_subcompactions)
.max_background_jobs(rocksdb_wal_config.meta_namespace.max_background_jobs)
.enable_statistics(rocksdb_wal_config.meta_namespace.enable_statistics)
.write_buffer_size(rocksdb_wal_config.meta_namespace.write_buffer_size.0)
.max_write_buffer_number(rocksdb_wal_config.meta_namespace.max_write_buffer_number)
.level_zero_file_num_compaction_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_file_num_compaction_trigger,
)
.level_zero_slowdown_writes_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_slowdown_writes_trigger,
)
.level_zero_stop_writes_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_stop_writes_trigger,
)
.fifo_compaction_max_table_files_size(
rocksdb_wal_config
.meta_namespace
.fifo_compaction_max_table_files_size
.0,
)
.build()
.context(OpenManifestWal)?;
let opened_wals = OpenedWals {
Expand Down
21 changes: 21 additions & 0 deletions wal/src/rocks_impl/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,43 @@

//! RocksDB Config
use common_util::config::ReadableSize;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub max_subcompactions: u32,
pub max_background_jobs: i32,
pub enable_statistics: bool,
pub write_buffer_size: ReadableSize,
pub max_write_buffer_number: i32,
// Number of files to trigger level-0 compaction. A value <0 means that level-0 compaction will
// not be triggered by number of files at all.
pub level_zero_file_num_compaction_trigger: i32,
// Soft limit on number of level-0 files. We start slowing down writes at this point. A value
// <0 means that no writing slow down will be triggered by number of files in level-0.
pub level_zero_slowdown_writes_trigger: i32,
// Maximum number of level-0 files. We stop writes at this point.
pub level_zero_stop_writes_trigger: i32,
pub fifo_compaction_max_table_files_size: ReadableSize,
}

impl Default for Config {
fn default() -> Self {
Self {
// Same with rocksdb
// https://github.com/facebook/rocksdb/blob/v6.4.6/include/rocksdb/options.h#L537
max_subcompactions: 1,
max_background_jobs: 2,
enable_statistics: false,
write_buffer_size: ReadableSize::mb(64),
max_write_buffer_number: 2,
level_zero_file_num_compaction_trigger: 4,
level_zero_slowdown_writes_trigger: 20,
level_zero_stop_writes_trigger: 36,
// default is 1G, use 0 to disable fifo
fifo_compaction_max_table_files_size: ReadableSize::gb(0),
}
}
}
90 changes: 88 additions & 2 deletions wal/src/rocks_impl/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use common_types::{
};
use common_util::{error::BoxError, runtime::Runtime};
use log::{debug, info, warn};
use rocksdb::{DBIterator, DBOptions, ReadOptions, SeekKey, Statistics, Writable, WriteBatch, DB};
use rocksdb::{
rocksdb_options::ColumnFamilyDescriptor, ColumnFamilyOptions, DBCompactionStyle, DBIterator,
DBOptions, FifoCompactionOptions, ReadOptions, SeekKey, Statistics, Writable, WriteBatch, DB,
};
use snafu::ResultExt;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -525,8 +528,15 @@ impl RocksImpl {
pub struct Builder {
wal_path: String,
runtime: Arc<Runtime>,
max_subcompactions: Option<u32>,
max_background_jobs: Option<i32>,
enable_statistics: Option<bool>,
write_buffer_size: Option<u64>,
max_write_buffer_number: Option<i32>,
level_zero_file_num_compaction_trigger: Option<i32>,
level_zero_slowdown_writes_trigger: Option<i32>,
level_zero_stop_writes_trigger: Option<i32>,
fifo_compaction_max_table_files_size: Option<u64>,
}

impl Builder {
Expand All @@ -535,11 +545,23 @@ impl Builder {
Self {
wal_path: wal_path.to_str().unwrap().to_owned(),
runtime,
max_subcompactions: None,
max_background_jobs: None,
enable_statistics: None,
write_buffer_size: None,
max_write_buffer_number: None,
level_zero_file_num_compaction_trigger: None,
level_zero_slowdown_writes_trigger: None,
level_zero_stop_writes_trigger: None,
fifo_compaction_max_table_files_size: None,
}
}

pub fn max_subcompactions(mut self, v: u32) -> Self {
self.max_subcompactions = Some(v);
self
}

pub fn max_background_jobs(mut self, v: i32) -> Self {
self.max_background_jobs = Some(v);
self
Expand All @@ -550,10 +572,43 @@ impl Builder {
self
}

pub fn write_buffer_size(mut self, v: u64) -> Self {
self.write_buffer_size = Some(v);
self
}

pub fn max_write_buffer_number(mut self, v: i32) -> Self {
self.max_write_buffer_number = Some(v);
self
}

pub fn level_zero_file_num_compaction_trigger(mut self, v: i32) -> Self {
self.level_zero_file_num_compaction_trigger = Some(v);
self
}

pub fn level_zero_slowdown_writes_trigger(mut self, v: i32) -> Self {
self.level_zero_slowdown_writes_trigger = Some(v);
self
}

pub fn level_zero_stop_writes_trigger(mut self, v: i32) -> Self {
self.level_zero_stop_writes_trigger = Some(v);
self
}

pub fn fifo_compaction_max_table_files_size(mut self, v: u64) -> Self {
self.fifo_compaction_max_table_files_size = Some(v);
self
}

pub fn build(self) -> Result<RocksImpl> {
let mut rocksdb_config = DBOptions::default();
rocksdb_config.create_if_missing(true);

if let Some(v) = self.max_subcompactions {
rocksdb_config.set_max_subcompactions(v);
}
if let Some(v) = self.max_background_jobs {
rocksdb_config.set_max_background_jobs(v);
}
Expand All @@ -566,7 +621,38 @@ impl Builder {
None
};

let db = DB::open(rocksdb_config, &self.wal_path)
let mut cf_opts = ColumnFamilyOptions::new();
if let Some(v) = self.write_buffer_size {
cf_opts.set_write_buffer_size(v);
}
if let Some(v) = self.max_write_buffer_number {
cf_opts.set_max_write_buffer_number(v);
}
if let Some(v) = self.level_zero_file_num_compaction_trigger {
cf_opts.set_level_zero_file_num_compaction_trigger(v);
}
if let Some(v) = self.level_zero_slowdown_writes_trigger {
cf_opts.set_level_zero_slowdown_writes_trigger(v);
}
if let Some(v) = self.level_zero_stop_writes_trigger {
cf_opts.set_level_zero_stop_writes_trigger(v);
}

// FIFO compaction strategy let rocksdb looks like a message queue.
if let Some(v) = self.fifo_compaction_max_table_files_size {
if v > 0 {
let mut fifo_opts = FifoCompactionOptions::new();
fifo_opts.set_max_table_files_size(v);
cf_opts.set_fifo_compaction_options(fifo_opts);
cf_opts.set_compaction_style(DBCompactionStyle::Fifo);
}
}

let default_cfd = ColumnFamilyDescriptor {
options: cf_opts,
..ColumnFamilyDescriptor::default()
};
let db = DB::open_cf(rocksdb_config, &self.wal_path, vec![default_cfd])
.map_err(|e| e.into())
.context(Open {
wal_path: self.wal_path.clone(),
Expand Down

0 comments on commit 57689b9

Please sign in to comment.