Skip to content

Commit

Permalink
[Storage] Open db in parallel when storage sharding is enabled. (#15504)
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 authored and danielxiangzl committed Dec 12, 2024
1 parent ab23d1c commit b4ef3d6
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 88 deletions.
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db_debugger/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub fn verify_state_kvs(
) -> Result<()> {
println!("Validating db statekeys");
let storage_dir = StorageDirPaths::from_path(db_root_path);
let state_kv_db = StateKvDb::open(&storage_dir, RocksdbConfig::default(), false, true)?;
let state_kv_db = StateKvDb::open_sharded(&storage_dir, RocksdbConfig::default(), false)?;

//read all statekeys from internal db and store them in mem
let mut all_internal_keys = HashSet::new();
Expand Down
9 changes: 2 additions & 7 deletions storage/aptosdb/src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,10 @@ pub(super) fn gen_state_merkle_cfds(rocksdb_config: &RocksdbConfig) -> Vec<Colum
gen_cfds(rocksdb_config, cfs, |_, _| {})
}

pub(super) fn gen_state_kv_cfds(
pub(super) fn gen_state_kv_shard_cfds(
rocksdb_config: &RocksdbConfig,
enable_sharding: bool,
) -> Vec<ColumnFamilyDescriptor> {
let cfs = if enable_sharding {
state_kv_db_new_key_column_families()
} else {
state_kv_db_column_families()
};
let cfs = state_kv_db_new_key_column_families();
gen_cfds(rocksdb_config, cfs, with_state_key_extractor_processor)
}

Expand Down
134 changes: 85 additions & 49 deletions storage/aptosdb/src/ledger_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
schema::db_metadata::{DbMetadataKey, DbMetadataSchema},
};
use aptos_config::config::{RocksdbConfig, RocksdbConfigs};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_logger::prelude::info;
use aptos_rocksdb_options::gen_rocksdb_options;
use aptos_schemadb::{ColumnFamilyDescriptor, ColumnFamilyName, SchemaBatch, DB};
Expand Down Expand Up @@ -155,60 +156,95 @@ impl LedgerDb {

let ledger_db_folder = db_root_path.as_ref().join(LEDGER_DB_FOLDER_NAME);

let event_db_raw = Arc::new(Self::open_rocksdb(
ledger_db_folder.join(EVENT_DB_NAME),
EVENT_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?);
let event_db = EventDb::new(event_db_raw.clone(), EventStore::new(event_db_raw));

let transaction_accumulator_db =
TransactionAccumulatorDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME),
TRANSACTION_ACCUMULATOR_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let transaction_auxiliary_data_db =
TransactionAuxiliaryDataDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME),
TRANSACTION_AUXILIARY_DATA_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));
let transaction_db = TransactionDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_DB_NAME),
TRANSACTION_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let transaction_info_db = TransactionInfoDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_INFO_DB_NAME),
TRANSACTION_INFO_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let write_set_db = WriteSetDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(WRITE_SET_DB_NAME),
WRITE_SET_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));
let mut event_db = None;
let mut transaction_accumulator_db = None;
let mut transaction_auxiliary_data_db = None;
let mut transaction_db = None;
let mut transaction_info_db = None;
let mut write_set_db = None;
THREAD_MANAGER.get_non_exe_cpu_pool().scope(|s| {
s.spawn(|_| {
let event_db_raw = Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(EVENT_DB_NAME),
EVENT_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
);
event_db = Some(EventDb::new(
event_db_raw.clone(),
EventStore::new(event_db_raw),
));
});
s.spawn(|_| {
transaction_accumulator_db = Some(TransactionAccumulatorDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME),
TRANSACTION_ACCUMULATOR_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
transaction_auxiliary_data_db = Some(TransactionAuxiliaryDataDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME),
TRANSACTION_AUXILIARY_DATA_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)))
});
s.spawn(|_| {
transaction_db = Some(TransactionDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_DB_NAME),
TRANSACTION_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
transaction_info_db = Some(TransactionInfoDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_INFO_DB_NAME),
TRANSACTION_INFO_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
write_set_db = Some(WriteSetDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(WRITE_SET_DB_NAME),
WRITE_SET_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
});

// TODO(grao): Handle data inconsistency.

Ok(Self {
ledger_metadata_db: LedgerMetadataDb::new(ledger_metadata_db),
event_db,
transaction_accumulator_db,
transaction_auxiliary_data_db,
transaction_db,
transaction_info_db,
write_set_db,
event_db: event_db.unwrap(),
transaction_accumulator_db: transaction_accumulator_db.unwrap(),
transaction_auxiliary_data_db: transaction_auxiliary_data_db.unwrap(),
transaction_db: transaction_db.unwrap(),
transaction_info_db: transaction_info_db.unwrap(),
write_set_db: write_set_db.unwrap(),
enable_storage_sharding: true,
})
}
Expand Down
45 changes: 21 additions & 24 deletions storage/aptosdb/src/state_kv_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![forbid(unsafe_code)]

use crate::{
db_options::gen_state_kv_cfds,
db_options::gen_state_kv_shard_cfds,
metrics::OTHER_TIMERS_SECONDS,
schema::{
db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue},
Expand All @@ -25,6 +25,7 @@ use aptos_types::{
transaction::Version,
};
use arr_macro::arr;
use rayon::prelude::*;
use std::{
path::{Path, PathBuf},
sync::Arc,
Expand Down Expand Up @@ -56,19 +57,13 @@ impl StateKvDb {
});
}

Self::open(
db_paths,
rocksdb_configs.state_kv_db_config,
readonly,
sharding,
)
Self::open_sharded(db_paths, rocksdb_configs.state_kv_db_config, readonly)
}

pub(crate) fn open(
pub(crate) fn open_sharded(
db_paths: &StorageDirPaths,
state_kv_db_config: RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<Self> {
let state_kv_metadata_db_path =
Self::metadata_db_path(db_paths.state_kv_db_metadata_root_path());
Expand All @@ -78,23 +73,29 @@ impl StateKvDb {
STATE_KV_METADATA_DB_NAME,
&state_kv_db_config,
readonly,
enable_sharding,
)?);

info!(
state_kv_metadata_db_path = state_kv_metadata_db_path,
"Opened state kv metadata db!"
);

let mut shard_id: usize = 0;
let state_kv_db_shards = {
arr![{
let state_kv_db_shards = (0..NUM_STATE_SHARDS)
.into_par_iter()
.map(|shard_id| {
let shard_root_path = db_paths.state_kv_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(shard_root_path, shard_id as u8, &state_kv_db_config, readonly, enable_sharding)?;
shard_id += 1;
let db = Self::open_shard(
shard_root_path,
shard_id as u8,
&state_kv_db_config,
readonly,
)
.unwrap_or_else(|e| panic!("Failed to open state kv db shard {shard_id}: {e:?}."));
Arc::new(db)
}; 16]
};
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

let state_kv_db = Self {
state_kv_metadata_db,
Expand Down Expand Up @@ -171,11 +172,10 @@ impl StateKvDb {
cp_root_path: impl AsRef<Path>,
) -> Result<()> {
// TODO(grao): Support path override here.
let state_kv_db = Self::open(
let state_kv_db = Self::open_sharded(
&StorageDirPaths::from_path(db_root_path),
RocksdbConfig::default(),
false,
true,
)?;
let cp_state_kv_db_path = cp_root_path.as_ref().join(STATE_KV_DB_FOLDER_NAME);

Expand Down Expand Up @@ -247,15 +247,13 @@ impl StateKvDb {
shard_id: u8,
state_kv_db_config: &RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<DB> {
let db_name = format!("state_kv_db_shard_{}", shard_id);
Self::open_db(
Self::db_shard_path(db_root_path, shard_id),
&db_name,
state_kv_db_config,
readonly,
enable_sharding,
)
}

Expand All @@ -264,21 +262,20 @@ impl StateKvDb {
name: &str,
state_kv_db_config: &RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<DB> {
Ok(if readonly {
DB::open_cf_readonly(
&gen_rocksdb_options(state_kv_db_config, true),
path,
name,
gen_state_kv_cfds(state_kv_db_config, enable_sharding),
gen_state_kv_shard_cfds(state_kv_db_config),
)?
} else {
DB::open_cf(
&gen_rocksdb_options(state_kv_db_config, false),
path,
name,
gen_state_kv_cfds(state_kv_db_config, enable_sharding),
gen_state_kv_shard_cfds(state_kv_db_config),
)?
})
}
Expand Down
25 changes: 18 additions & 7 deletions storage/aptosdb/src/state_merkle_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,13 +583,24 @@ impl StateMerkleDb {
"Opened state merkle metadata db!"
);

let mut shard_id: usize = 0;
let state_merkle_db_shards = arr![{
let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(shard_root_path, shard_id as u8, &state_merkle_db_config, readonly)?;
shard_id += 1;
Arc::new(db)
}; 16];
let state_merkle_db_shards = (0..NUM_STATE_SHARDS)
.into_par_iter()
.map(|shard_id| {
let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(
shard_root_path,
shard_id as u8,
&state_merkle_db_config,
readonly,
)
.unwrap_or_else(|e| {
panic!("Failed to open state merkle db shard {shard_id}: {e:?}.")
});
Arc::new(db)
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

let state_merkle_db = Self {
state_merkle_metadata_db,
Expand Down

0 comments on commit b4ef3d6

Please sign in to comment.