From 85894349ecee63a65058886a9da07f7940076c04 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 28 May 2024 12:43:33 +0100 Subject: [PATCH] Minor config tuning --- .../bifrost/src/loglets/local_loglet/keys.rs | 7 +++--- .../src/loglets/local_loglet/log_store.rs | 17 ++++++++------ crates/core/src/task_center.rs | 4 +--- crates/rocksdb/src/db_manager.rs | 2 ++ crates/types/src/config/common.rs | 23 ++++++++++--------- crates/types/src/config/rocksdb.rs | 2 ++ crates/types/src/config/worker.rs | 2 +- 7 files changed, 32 insertions(+), 25 deletions(-) diff --git a/crates/bifrost/src/loglets/local_loglet/keys.rs b/crates/bifrost/src/loglets/local_loglet/keys.rs index c903cdb90b..d758b41ba2 100644 --- a/crates/bifrost/src/loglets/local_loglet/keys.rs +++ b/crates/bifrost/src/loglets/local_loglet/keys.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::fmt::Write; use std::mem::size_of; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -17,6 +16,8 @@ use restate_types::logs::SequenceNumber; use crate::loglet::LogletOffset; +pub(crate) const DATA_KEY_PREFIX_LENGTH: usize = size_of::() + size_of::(); + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct RecordKey { pub log_id: u64, @@ -37,7 +38,7 @@ impl RecordKey { pub fn to_bytes(self) -> Bytes { let mut buf = BytesMut::with_capacity(size_of::() + 1); - buf.write_char('d').expect("enough key buffer"); + buf.put_u8(b'd'); buf.put_u64(self.log_id); buf.put_u64(self.offset.into()); buf.freeze() @@ -75,7 +76,7 @@ impl MetadataKey { pub fn to_bytes(self) -> Bytes { let mut buf = BytesMut::with_capacity(size_of::() + 1); // m for metadata - buf.write_char('m').expect("enough key buffer"); + buf.put_u8(b'm'); buf.put_u64(self.log_id); buf.put_u8(self.kind as u8); buf.freeze() diff --git a/crates/bifrost/src/loglets/local_loglet/log_store.rs b/crates/bifrost/src/loglets/local_loglet/log_store.rs index 75819e425f..c67ff6b6c0 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store.rs +++ b/crates/bifrost/src/loglets/local_loglet/log_store.rs @@ -16,9 +16,9 @@ use restate_rocksdb::{ use restate_types::arc_util::Updateable; use restate_types::config::{LocalLogletOptions, RocksDbOptions}; use restate_types::storage::{StorageDecodeError, StorageEncodeError}; -use rocksdb::{BoundColumnFamily, DBCompressionType, DB}; +use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB}; -use super::keys::{MetadataKey, MetadataKind}; +use super::keys::{MetadataKey, MetadataKind, DATA_KEY_PREFIX_LENGTH}; use super::log_state::{log_state_full_merge, log_state_partial_merge, LogState}; use super::log_store_writer::LogStoreWriter; @@ -138,12 +138,15 @@ fn cf_data_options(mut opts: rocksdb::Options) -> rocksdb::Options { opts.set_compression_per_level(&[ DBCompressionType::None, DBCompressionType::Snappy, - DBCompressionType::Snappy, - DBCompressionType::Snappy, - DBCompressionType::Snappy, - DBCompressionType::Snappy, + DBCompressionType::Zstd, + DBCompressionType::Zstd, + DBCompressionType::Zstd, + DBCompressionType::Zstd, DBCompressionType::Zstd, ]); + + opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(DATA_KEY_PREFIX_LENGTH)); + opts.set_memtable_prefix_bloom_ratio(0.2); // most reads are sequential opts.set_advise_random_on_open(false); // @@ -158,7 +161,7 @@ fn cf_metadata_options(mut opts: rocksdb::Options) -> rocksdb::Options { opts.set_num_levels(3); opts.set_compression_per_level(&[ DBCompressionType::None, - DBCompressionType::None, + DBCompressionType::Snappy, DBCompressionType::Zstd, ]); opts.set_max_write_buffer_number(2); diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index fe08aaac13..af127cd991 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -127,9 +127,7 @@ fn tokio_builder(common_opts: &CommonOptions) -> tokio::runtime::Builder { format!("rs:worker-{}", id) }); - if let Some(worker_threads) = common_opts.default_thread_pool_size { - builder.worker_threads(worker_threads); - } + builder.worker_threads(common_opts.default_thread_pool_size()); builder } diff --git a/crates/rocksdb/src/db_manager.rs b/crates/rocksdb/src/db_manager.rs index a0664c8e45..e343803954 100644 --- a/crates/rocksdb/src/db_manager.rs +++ b/crates/rocksdb/src/db_manager.rs @@ -328,6 +328,8 @@ impl RocksDbManager { // https://github.com/facebook/rocksdb/blob/f059c7d9b96300091e07429a60f4ad55dac84859/include/rocksdb/table.h#L275 block_opts.set_format_version(5); block_opts.set_cache_index_and_filter_blocks(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_block_cache(&self.cache); cf_options.set_block_based_table_factory(&block_opts); diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 3942c4acb0..baa13508ce 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -87,9 +87,9 @@ pub struct CommonOptions { /// # Default async runtime thread pool /// /// Size of the default thread pool used to perform internal tasks. - /// If not set, it defaults to the number of CPU cores. + /// If not set, it defaults to twice the number of CPU cores. #[builder(setter(strip_option))] - pub default_thread_pool_size: Option, + default_thread_pool_size: Option, /// # Tracing Endpoint /// @@ -246,19 +246,20 @@ impl CommonOptions { } pub fn storage_high_priority_bg_threads(&self) -> NonZeroUsize { - self.storage_high_priority_bg_threads.unwrap_or( + NonZeroUsize::new(4).unwrap() + } + + pub fn default_thread_pool_size(&self) -> usize { + 2 * self.default_thread_pool_size.unwrap_or( std::thread::available_parallelism() // Shouldn't really fail, but just in case. - .unwrap_or(NonZeroUsize::new(4).unwrap()), + .unwrap_or(NonZeroUsize::new(4).unwrap()) + .get(), ) } pub fn storage_low_priority_bg_threads(&self) -> NonZeroUsize { - self.storage_low_priority_bg_threads.unwrap_or( - std::thread::available_parallelism() - // Shouldn't really fail, but just in case. - .unwrap_or(NonZeroUsize::new(4).unwrap()), - ) + NonZeroUsize::new(4).unwrap() } pub fn rocksdb_bg_threads(&self) -> NonZeroU32 { @@ -301,8 +302,8 @@ impl Default for CommonOptions { default_thread_pool_size: None, storage_high_priority_bg_threads: None, storage_low_priority_bg_threads: None, - rocksdb_total_memtables_ratio: 0.5, // (50% of rocksdb-total-memory-size) - rocksdb_total_memory_size: NonZeroUsize::new(4_000_000_000).unwrap(), // 4GB + rocksdb_total_memtables_ratio: 0.6, // (60% of rocksdb-total-memory-size) + rocksdb_total_memory_size: NonZeroUsize::new(6_000_000_000).unwrap(), // 4GB rocksdb_bg_threads: None, rocksdb_high_priority_bg_threads: NonZeroU32::new(2).unwrap(), rocksdb_write_stall_threshold: std::time::Duration::from_secs(3).into(), diff --git a/crates/types/src/config/rocksdb.rs b/crates/types/src/config/rocksdb.rs index ba56b355d8..d06df8ee92 100644 --- a/crates/types/src/config/rocksdb.rs +++ b/crates/types/src/config/rocksdb.rs @@ -123,6 +123,8 @@ impl RocksDbOptions { // Assuming 256MB for bifrost's data cf (2 memtables * 128MB default write buffer size) // Assuming 256MB for bifrost's metadata cf (2 memtables * 128MB default write buffer size) let buffer_size = (all_memtables - 512_000_000) / (num_partitions * 3) as usize; + // reduce the buffer_size by 10% for safety + let buffer_size = (buffer_size as f64 * 0.9) as usize; NonZeroUsize::new(buffer_size).unwrap() }) } diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index d6658d5031..5cba334f4d 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -68,7 +68,7 @@ impl WorkerOptions { impl Default for WorkerOptions { fn default() -> Self { Self { - internal_queue_length: NonZeroUsize::new(64).unwrap(), + internal_queue_length: NonZeroUsize::new(6400).unwrap(), num_timers_in_memory_limit: None, storage: StorageOptions::default(), invoker: Default::default(),