Skip to content

Commit

Permalink
feat: avoid frequent write stall (#843)
Browse files Browse the repository at this point in the history
* feat: avoid frequent write stall

* feat: add option to control mutable limit

* fix: allow preflush_write_buffer_size_ratio to equal 0
  • Loading branch information
ShiKaiWi authored Apr 17, 2023
1 parent f5b0bc9 commit f909e6b
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 15 deletions.
1 change: 1 addition & 0 deletions analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Instance {
request,
table_opts,
&self.file_purger,
self.preflush_write_buffer_size_ratio,
space.mem_usage_collector.clone(),
)
.context(CreateTableData {
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ pub struct Instance {
pub(crate) db_write_buffer_size: usize,
/// Space write buffer size
pub(crate) space_write_buffer_size: usize,
/// The ratio of table's write buffer size to trigger preflush
pub(crate) preflush_write_buffer_size_ratio: f32,
/// Replay wal batch size
pub(crate) replay_batch_size: usize,
/// Write sst max buffer size
Expand Down
6 changes: 4 additions & 2 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl Instance {
mem_usage_collector: Arc::new(MemUsageCollector::default()),
db_write_buffer_size: ctx.config.db_write_buffer_size,
space_write_buffer_size: ctx.config.space_write_buffer_size,
preflush_write_buffer_size_ratio: ctx.config.preflush_write_buffer_size_ratio,
replay_batch_size: ctx.config.replay_batch_size,
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_byte() as usize,
max_bytes_per_write_batch: ctx
Expand Down Expand Up @@ -237,8 +238,9 @@ impl Instance {
TableData::recover_from_add(
table_meta,
&self.file_purger,
space.mem_usage_collector.clone(),
request.shard_id,
self.preflush_write_buffer_size_ratio,
space.mem_usage_collector.clone(),
)
.context(RecoverTableData {
space_id: space.id,
Expand Down Expand Up @@ -384,7 +386,7 @@ impl Instance {
})?;

// Flush the table if necessary.
if table_data.should_flush_table() {
if table_data.should_flush_table(serial_exec) {
let opts = TableFlushOptions {
res_sender: None,
compact_after_flush: None,
Expand Down
5 changes: 5 additions & 0 deletions analytic_engine/src/instance/serial_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ impl TableOpSerialExecutor {
}

impl TableFlushScheduler {
pub fn is_in_flush(&self) -> bool {
let state = self.schedule_sync.state.lock().unwrap();
matches!(&*state, FlushState::Flushing)
}

/// Control the flush procedure and ensure multiple flush procedures to be
/// sequential.
///
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ impl<'a> Writer<'a> {
}
}

if self.table_data.should_flush_table() {
if self.table_data.should_flush_table(self.serial_exec) {
let table_data = self.table_data.clone();
let _timer = table_data.metrics.start_table_write_flush_wait_timer();
self.handle_memtable_flush(&table_data).await?;
Expand Down
6 changes: 4 additions & 2 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ pub struct Config {
/// Manifest options
pub manifest: ManifestOptions,

// Global write buffer options:
/// The maximum write buffer size used for single space.
pub space_write_buffer_size: usize,
/// The maximum size of all Write Buffers across all spaces.
pub db_write_buffer_size: usize,
/// End of global write buffer options.
/// The ratio of table's write buffer size to trigger preflush, and it
/// should be in the range (0, 1].
pub preflush_write_buffer_size_ratio: f32,

// Iterator scanning options
/// Batch size for iterator.
Expand Down Expand Up @@ -112,6 +113,7 @@ impl Default for Config {
/// Zero means disabling this param, give a positive value to enable
/// it.
db_write_buffer_size: 0,
preflush_write_buffer_size_ratio: 0.75,
scan_batch_size: None,
sst_background_read_parallelism: 8,
scan_max_record_batches_in_flight: 1024,
Expand Down
86 changes: 76 additions & 10 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub struct TableData {

/// Mutable memtable memory size limitation
mutable_limit: AtomicU32,
/// Mutable memtable memory usage ratio of the write buffer size.
mutable_limit_write_buffer_ratio: f32,

/// Options of this table
///
/// Most modification to `opts` can be done by replacing the old options
Expand Down Expand Up @@ -174,8 +177,18 @@ impl Drop for TableData {
}

#[inline]
fn get_mutable_limit(opts: &TableOptions) -> u32 {
opts.write_buffer_size / 8 * 7
fn compute_mutable_limit(
write_buffer_size: u32,
mutable_limit_write_buffer_size_ratio: f32,
) -> u32 {
assert!(
mutable_limit_write_buffer_size_ratio >= 0.0
&& mutable_limit_write_buffer_size_ratio <= 1.0
);

let limit = write_buffer_size as f32 * mutable_limit_write_buffer_size_ratio;
// This is safe because the limit won't be larger than the write_buffer_size.
limit as u32
}

impl TableData {
Expand All @@ -188,6 +201,7 @@ impl TableData {
request: CreateTableRequest,
table_opts: TableOptions,
purger: &FilePurger,
preflush_write_buffer_size_ratio: f32,
mem_usage_collector: CollectorRef,
) -> Result<Self> {
// FIXME(yingwen): Validate TableOptions, such as bucket_duration >=
Expand All @@ -197,13 +211,18 @@ impl TableData {
let purge_queue = purger.create_purge_queue(space_id, request.table_id);
let current_version = TableVersion::new(purge_queue);
let metrics = Metrics::default();
let mutable_limit = AtomicU32::new(compute_mutable_limit(
table_opts.write_buffer_size,
preflush_write_buffer_size_ratio,
));

Ok(Self {
id: request.table_id,
name: request.table_name,
schema: Mutex::new(request.table_schema),
space_id,
mutable_limit: AtomicU32::new(get_mutable_limit(&table_opts)),
mutable_limit,
mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio,
opts: ArcSwap::new(Arc::new(table_opts)),
memtable_factory,
mem_usage_collector,
Expand All @@ -225,20 +244,26 @@ impl TableData {
pub fn recover_from_add(
add_meta: AddTableMeta,
purger: &FilePurger,
mem_usage_collector: CollectorRef,
shard_id: ShardId,
preflush_write_buffer_size_ratio: f32,
mem_usage_collector: CollectorRef,
) -> Result<Self> {
let memtable_factory = Arc::new(SkiplistMemTableFactory);
let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id);
let current_version = TableVersion::new(purge_queue);
let metrics = Metrics::default();
let mutable_limit = AtomicU32::new(compute_mutable_limit(
add_meta.opts.write_buffer_size,
preflush_write_buffer_size_ratio,
));

Ok(Self {
id: add_meta.table_id,
name: add_meta.table_name,
schema: Mutex::new(add_meta.schema),
space_id: add_meta.space_id,
mutable_limit: AtomicU32::new(get_mutable_limit(&add_meta.opts)),
mutable_limit,
mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio,
opts: ArcSwap::new(Arc::new(add_meta.opts)),
memtable_factory,
mem_usage_collector,
Expand Down Expand Up @@ -307,8 +332,11 @@ impl TableData {
/// Update table options.
#[inline]
pub fn set_table_options(&self, opts: TableOptions) {
self.mutable_limit
.store(get_mutable_limit(&opts), Ordering::Relaxed);
let mutable_limit = compute_mutable_limit(
opts.write_buffer_size,
self.mutable_limit_write_buffer_ratio,
);
self.mutable_limit.store(mutable_limit, Ordering::Relaxed);
self.opts.store(Arc::new(opts))
}

Expand Down Expand Up @@ -399,7 +427,7 @@ impl TableData {
/// Returns true if the memory usage of this table reaches flush threshold
///
/// REQUIRE: Do in write worker
pub fn should_flush_table(&self) -> bool {
pub fn should_flush_table(&self, serial_exec: &mut TableOpSerialExecutor) -> bool {
// Fallback to usize::MAX if Failed to convert arena_block_size into
// usize (overflow)
let max_write_buffer_size = self
Expand All @@ -416,8 +444,9 @@ impl TableData {
let mutable_usage = self.current_version.mutable_memory_usage();
let total_usage = self.current_version.total_memory_usage();

let in_flush = serial_exec.flush_scheduler().is_in_flush();
// Inspired by https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94
if mutable_usage > mutable_limit {
if mutable_usage > mutable_limit && !in_flush {
info!(
"TableData should flush, table:{}, table_id:{}, mutable_usage:{}, mutable_limit: {}, total_usage:{}, max_write_buffer_size:{}",
self.name, self.id, mutable_usage, mutable_limit, total_usage, max_write_buffer_size
Expand Down Expand Up @@ -660,7 +689,15 @@ pub mod tests {
let purger = FilePurgerMocker::mock();
let collector = Arc::new(NoopCollector);

TableData::new(space_id, create_request, table_opts, &purger, collector).unwrap()
TableData::new(
space_id,
create_request,
table_opts,
&purger,
0.75,
collector,
)
.unwrap()
}
}

Expand Down Expand Up @@ -734,4 +771,33 @@ pub mod tests {
TimeRange::bucket_of(now_ts, table_options::DEFAULT_SEGMENT_DURATION).unwrap();
assert_eq!(time_range, mem_state.time_range);
}

#[test]
fn test_compute_mutable_limit() {
// Build the cases for compute_mutable_limit.
let cases = vec![
(80, 0.8, 64),
(80, 0.5, 40),
(80, 0.1, 8),
(80, 0.0, 0),
(80, 1.0, 80),
(0, 0.8, 0),
(0, 0.5, 0),
(0, 0.1, 0),
(0, 0.0, 0),
(0, 1.0, 0),
];

for (write_buffer_size, ratio, expected) in cases {
let limit = compute_mutable_limit(write_buffer_size, ratio);
assert_eq!(expected, limit);
}
}

#[should_panic]
#[test]
fn test_compute_mutable_limit_panic() {
compute_mutable_limit(80, 1.1);
compute_mutable_limit(80, -0.1);
}
}

0 comments on commit f909e6b

Please sign in to comment.