From 4d7646809aa3bd850b0f9349a6f2b144c050bcdb Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Thu, 5 Dec 2024 00:50:29 +0000 Subject: [PATCH] [Storage] Open db in parallel when storage sharding is enabled. --- storage/aptosdb/src/db_options.rs | 11 +- storage/aptosdb/src/ledger_db/mod.rs | 134 ++++++++++++++++--------- storage/aptosdb/src/state_kv_db.rs | 38 ++++--- storage/aptosdb/src/state_merkle_db.rs | 23 +++-- 4 files changed, 121 insertions(+), 85 deletions(-) diff --git a/storage/aptosdb/src/db_options.rs b/storage/aptosdb/src/db_options.rs index e41de403b30b33..bf07444f63670d 100644 --- a/storage/aptosdb/src/db_options.rs +++ b/storage/aptosdb/src/db_options.rs @@ -221,15 +221,8 @@ pub(super) fn gen_state_merkle_cfds(rocksdb_config: &RocksdbConfig) -> Vec Vec { - let cfs = if enable_sharding { - state_kv_db_new_key_column_families() - } else { - state_kv_db_column_families() - }; +pub(super) fn gen_state_kv_cfds(rocksdb_config: &RocksdbConfig) -> Vec { + let cfs = state_kv_db_new_key_column_families(); gen_cfds(rocksdb_config, cfs, with_state_key_extractor_processor) } diff --git a/storage/aptosdb/src/ledger_db/mod.rs b/storage/aptosdb/src/ledger_db/mod.rs index da5d8377fbb26b..96d886360bd3bd 100644 --- a/storage/aptosdb/src/ledger_db/mod.rs +++ b/storage/aptosdb/src/ledger_db/mod.rs @@ -24,6 +24,7 @@ use crate::{ schema::db_metadata::{DbMetadataKey, DbMetadataSchema}, }; use aptos_config::config::{RocksdbConfig, RocksdbConfigs}; +use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_logger::prelude::info; use aptos_rocksdb_options::gen_rocksdb_options; use aptos_schemadb::{ColumnFamilyDescriptor, ColumnFamilyName, SchemaBatch, DB}; @@ -155,60 +156,95 @@ impl LedgerDb { let ledger_db_folder = db_root_path.as_ref().join(LEDGER_DB_FOLDER_NAME); - let event_db_raw = Arc::new(Self::open_rocksdb( - ledger_db_folder.join(EVENT_DB_NAME), - EVENT_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?); - let event_db = EventDb::new(event_db_raw.clone(), EventStore::new(event_db_raw)); - - let transaction_accumulator_db = - TransactionAccumulatorDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME), - TRANSACTION_ACCUMULATOR_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); - - let transaction_auxiliary_data_db = - TransactionAuxiliaryDataDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME), - TRANSACTION_AUXILIARY_DATA_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); - let transaction_db = TransactionDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(TRANSACTION_DB_NAME), - TRANSACTION_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); - - let transaction_info_db = TransactionInfoDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(TRANSACTION_INFO_DB_NAME), - TRANSACTION_INFO_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); - - let write_set_db = WriteSetDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(WRITE_SET_DB_NAME), - WRITE_SET_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); + let mut event_db = None; + let mut transaction_accumulator_db = None; + let mut transaction_auxiliary_data_db = None; + let mut transaction_db = None; + let mut transaction_info_db = None; + let mut write_set_db = None; + THREAD_MANAGER.get_non_exe_cpu_pool().scope(|s| { + s.spawn(|_| { + let event_db_raw = Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(EVENT_DB_NAME), + EVENT_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ); + event_db = Some(EventDb::new( + event_db_raw.clone(), + EventStore::new(event_db_raw), + )); + }); + s.spawn(|_| { + transaction_accumulator_db = Some(TransactionAccumulatorDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME), + TRANSACTION_ACCUMULATOR_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))); + }); + s.spawn(|_| { + transaction_auxiliary_data_db = Some(TransactionAuxiliaryDataDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME), + TRANSACTION_AUXILIARY_DATA_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))) + }); + s.spawn(|_| { + transaction_db = Some(TransactionDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(TRANSACTION_DB_NAME), + TRANSACTION_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))); + }); + s.spawn(|_| { + transaction_info_db = Some(TransactionInfoDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(TRANSACTION_INFO_DB_NAME), + TRANSACTION_INFO_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))); + }); + s.spawn(|_| { + write_set_db = Some(WriteSetDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(WRITE_SET_DB_NAME), + WRITE_SET_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))); + }); + }); // TODO(grao): Handle data inconsistency. Ok(Self { ledger_metadata_db: LedgerMetadataDb::new(ledger_metadata_db), - event_db, - transaction_accumulator_db, - transaction_auxiliary_data_db, - transaction_db, - transaction_info_db, - write_set_db, + event_db: event_db.unwrap(), + transaction_accumulator_db: transaction_accumulator_db.unwrap(), + transaction_auxiliary_data_db: transaction_auxiliary_data_db.unwrap(), + transaction_db: transaction_db.unwrap(), + transaction_info_db: transaction_info_db.unwrap(), + write_set_db: write_set_db.unwrap(), enable_storage_sharding: true, }) } diff --git a/storage/aptosdb/src/state_kv_db.rs b/storage/aptosdb/src/state_kv_db.rs index 35c1d01e3bbdaa..2ae8f1dfc9b982 100644 --- a/storage/aptosdb/src/state_kv_db.rs +++ b/storage/aptosdb/src/state_kv_db.rs @@ -25,6 +25,7 @@ use aptos_types::{ transaction::Version, }; use arr_macro::arr; +use rayon::prelude::*; use std::{ path::{Path, PathBuf}, sync::Arc, @@ -56,19 +57,13 @@ impl StateKvDb { }); } - Self::open( - db_paths, - rocksdb_configs.state_kv_db_config, - readonly, - sharding, - ) + Self::open(db_paths, rocksdb_configs.state_kv_db_config, readonly) } pub(crate) fn open( db_paths: &StorageDirPaths, state_kv_db_config: RocksdbConfig, readonly: bool, - enable_sharding: bool, ) -> Result { let state_kv_metadata_db_path = Self::metadata_db_path(db_paths.state_kv_db_metadata_root_path()); @@ -78,7 +73,6 @@ impl StateKvDb { STATE_KV_METADATA_DB_NAME, &state_kv_db_config, readonly, - enable_sharding, )?); info!( @@ -86,15 +80,22 @@ impl StateKvDb { "Opened state kv metadata db!" ); - let mut shard_id: usize = 0; - let state_kv_db_shards = { - arr![{ + let state_kv_db_shards = (0..NUM_STATE_SHARDS) + .into_par_iter() + .map(|shard_id| { let shard_root_path = db_paths.state_kv_db_shard_root_path(shard_id as u8); - let db = Self::open_shard(shard_root_path, shard_id as u8, &state_kv_db_config, readonly, enable_sharding)?; - shard_id += 1; + let db = Self::open_shard( + shard_root_path, + shard_id as u8, + &state_kv_db_config, + readonly, + ) + .expect(&format!("Failed to open state kv db shard {shard_id}.")); Arc::new(db) - }; 16] - }; + }) + .collect::>() + .try_into() + .unwrap(); let state_kv_db = Self { state_kv_metadata_db, @@ -247,7 +248,6 @@ impl StateKvDb { shard_id: u8, state_kv_db_config: &RocksdbConfig, readonly: bool, - enable_sharding: bool, ) -> Result { let db_name = format!("state_kv_db_shard_{}", shard_id); Self::open_db( @@ -255,7 +255,6 @@ impl StateKvDb { &db_name, state_kv_db_config, readonly, - enable_sharding, ) } @@ -264,21 +263,20 @@ impl StateKvDb { name: &str, state_kv_db_config: &RocksdbConfig, readonly: bool, - enable_sharding: bool, ) -> Result { Ok(if readonly { DB::open_cf_readonly( &gen_rocksdb_options(state_kv_db_config, true), path, name, - gen_state_kv_cfds(state_kv_db_config, enable_sharding), + gen_state_kv_cfds(state_kv_db_config), )? } else { DB::open_cf( &gen_rocksdb_options(state_kv_db_config, false), path, name, - gen_state_kv_cfds(state_kv_db_config, enable_sharding), + gen_state_kv_cfds(state_kv_db_config), )? }) } diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index 7d7ad649d0431c..df8266c0380d7d 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -583,13 +583,22 @@ impl StateMerkleDb { "Opened state merkle metadata db!" ); - let mut shard_id: usize = 0; - let state_merkle_db_shards = arr![{ - let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8); - let db = Self::open_shard(shard_root_path, shard_id as u8, &state_merkle_db_config, readonly)?; - shard_id += 1; - Arc::new(db) - }; 16]; + let state_merkle_db_shards = (0..NUM_STATE_SHARDS) + .into_par_iter() + .map(|shard_id| { + let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8); + let db = Self::open_shard( + shard_root_path, + shard_id as u8, + &state_merkle_db_config, + readonly, + ) + .expect(&format!("Failed to open state merkle db shard {shard_id}.")); + Arc::new(db) + }) + .collect::>() + .try_into() + .unwrap(); let state_merkle_db = Self { state_merkle_metadata_db,