Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Open db in parallel when storage sharding is enabled. #15504

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db_debugger/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub fn verify_state_kvs(
) -> Result<()> {
println!("Validating db statekeys");
let storage_dir = StorageDirPaths::from_path(db_root_path);
let state_kv_db = StateKvDb::open(&storage_dir, RocksdbConfig::default(), false, true)?;
let state_kv_db = StateKvDb::open_sharded(&storage_dir, RocksdbConfig::default(), false)?;

//read all statekeys from internal db and store them in mem
let mut all_internal_keys = HashSet::new();
Expand Down
9 changes: 2 additions & 7 deletions storage/aptosdb/src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,10 @@ pub(super) fn gen_state_merkle_cfds(rocksdb_config: &RocksdbConfig) -> Vec<Colum
gen_cfds(rocksdb_config, cfs, |_, _| {})
}

pub(super) fn gen_state_kv_cfds(
pub(super) fn gen_state_kv_shard_cfds(
rocksdb_config: &RocksdbConfig,
enable_sharding: bool,
) -> Vec<ColumnFamilyDescriptor> {
let cfs = if enable_sharding {
state_kv_db_new_key_column_families()
} else {
state_kv_db_column_families()
};
let cfs = state_kv_db_new_key_column_families();
gen_cfds(rocksdb_config, cfs, with_state_key_extractor_processor)
}

Expand Down
134 changes: 85 additions & 49 deletions storage/aptosdb/src/ledger_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
schema::db_metadata::{DbMetadataKey, DbMetadataSchema},
};
use aptos_config::config::{RocksdbConfig, RocksdbConfigs};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_logger::prelude::info;
use aptos_rocksdb_options::gen_rocksdb_options;
use aptos_schemadb::{ColumnFamilyDescriptor, ColumnFamilyName, SchemaBatch, DB};
Expand Down Expand Up @@ -155,60 +156,95 @@ impl LedgerDb {

let ledger_db_folder = db_root_path.as_ref().join(LEDGER_DB_FOLDER_NAME);

let event_db_raw = Arc::new(Self::open_rocksdb(
ledger_db_folder.join(EVENT_DB_NAME),
EVENT_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?);
let event_db = EventDb::new(event_db_raw.clone(), EventStore::new(event_db_raw));

let transaction_accumulator_db =
TransactionAccumulatorDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME),
TRANSACTION_ACCUMULATOR_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let transaction_auxiliary_data_db =
TransactionAuxiliaryDataDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME),
TRANSACTION_AUXILIARY_DATA_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));
let transaction_db = TransactionDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_DB_NAME),
TRANSACTION_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let transaction_info_db = TransactionInfoDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_INFO_DB_NAME),
TRANSACTION_INFO_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let write_set_db = WriteSetDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(WRITE_SET_DB_NAME),
WRITE_SET_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));
let mut event_db = None;
let mut transaction_accumulator_db = None;
let mut transaction_auxiliary_data_db = None;
let mut transaction_db = None;
let mut transaction_info_db = None;
let mut write_set_db = None;
THREAD_MANAGER.get_non_exe_cpu_pool().scope(|s| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems no particular reason to use this pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Just need some threads here.

s.spawn(|_| {
let event_db_raw = Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(EVENT_DB_NAME),
EVENT_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
);
event_db = Some(EventDb::new(
event_db_raw.clone(),
EventStore::new(event_db_raw),
));
});
s.spawn(|_| {
transaction_accumulator_db = Some(TransactionAccumulatorDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME),
TRANSACTION_ACCUMULATOR_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
transaction_auxiliary_data_db = Some(TransactionAuxiliaryDataDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME),
TRANSACTION_AUXILIARY_DATA_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)))
});
s.spawn(|_| {
transaction_db = Some(TransactionDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_DB_NAME),
TRANSACTION_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
transaction_info_db = Some(TransactionInfoDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_INFO_DB_NAME),
TRANSACTION_INFO_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
write_set_db = Some(WriteSetDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(WRITE_SET_DB_NAME),
WRITE_SET_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
});

// TODO(grao): Handle data inconsistency.

Ok(Self {
ledger_metadata_db: LedgerMetadataDb::new(ledger_metadata_db),
event_db,
transaction_accumulator_db,
transaction_auxiliary_data_db,
transaction_db,
transaction_info_db,
write_set_db,
event_db: event_db.unwrap(),
transaction_accumulator_db: transaction_accumulator_db.unwrap(),
transaction_auxiliary_data_db: transaction_auxiliary_data_db.unwrap(),
transaction_db: transaction_db.unwrap(),
transaction_info_db: transaction_info_db.unwrap(),
write_set_db: write_set_db.unwrap(),
enable_storage_sharding: true,
})
}
Expand Down
45 changes: 21 additions & 24 deletions storage/aptosdb/src/state_kv_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![forbid(unsafe_code)]

use crate::{
db_options::gen_state_kv_cfds,
db_options::gen_state_kv_shard_cfds,
metrics::OTHER_TIMERS_SECONDS,
schema::{
db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue},
Expand All @@ -25,6 +25,7 @@ use aptos_types::{
transaction::Version,
};
use arr_macro::arr;
use rayon::prelude::*;
use std::{
path::{Path, PathBuf},
sync::Arc,
Expand Down Expand Up @@ -56,19 +57,13 @@ impl StateKvDb {
});
}

Self::open(
db_paths,
rocksdb_configs.state_kv_db_config,
readonly,
sharding,
)
Self::open_sharded(db_paths, rocksdb_configs.state_kv_db_config, readonly)
}

pub(crate) fn open(
pub(crate) fn open_sharded(
db_paths: &StorageDirPaths,
state_kv_db_config: RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<Self> {
let state_kv_metadata_db_path =
Self::metadata_db_path(db_paths.state_kv_db_metadata_root_path());
Expand All @@ -78,23 +73,29 @@ impl StateKvDb {
STATE_KV_METADATA_DB_NAME,
&state_kv_db_config,
readonly,
enable_sharding,
)?);

info!(
state_kv_metadata_db_path = state_kv_metadata_db_path,
"Opened state kv metadata db!"
);

let mut shard_id: usize = 0;
let state_kv_db_shards = {
arr![{
let state_kv_db_shards = (0..NUM_STATE_SHARDS)
.into_par_iter()
.map(|shard_id| {
let shard_root_path = db_paths.state_kv_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(shard_root_path, shard_id as u8, &state_kv_db_config, readonly, enable_sharding)?;
shard_id += 1;
let db = Self::open_shard(
shard_root_path,
shard_id as u8,
&state_kv_db_config,
readonly,
)
.unwrap_or_else(|e| panic!("Failed to open state kv db shard {shard_id}: {e:?}."));
Arc::new(db)
}; 16]
};
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

let state_kv_db = Self {
state_kv_metadata_db,
Expand Down Expand Up @@ -171,11 +172,10 @@ impl StateKvDb {
cp_root_path: impl AsRef<Path>,
) -> Result<()> {
// TODO(grao): Support path override here.
let state_kv_db = Self::open(
let state_kv_db = Self::open_sharded(
&StorageDirPaths::from_path(db_root_path),
RocksdbConfig::default(),
false,
true,
)?;
let cp_state_kv_db_path = cp_root_path.as_ref().join(STATE_KV_DB_FOLDER_NAME);

Expand Down Expand Up @@ -247,15 +247,13 @@ impl StateKvDb {
shard_id: u8,
state_kv_db_config: &RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<DB> {
let db_name = format!("state_kv_db_shard_{}", shard_id);
Self::open_db(
Self::db_shard_path(db_root_path, shard_id),
&db_name,
state_kv_db_config,
readonly,
enable_sharding,
)
}

Expand All @@ -264,21 +262,20 @@ impl StateKvDb {
name: &str,
state_kv_db_config: &RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<DB> {
Ok(if readonly {
DB::open_cf_readonly(
&gen_rocksdb_options(state_kv_db_config, true),
path,
name,
gen_state_kv_cfds(state_kv_db_config, enable_sharding),
gen_state_kv_shard_cfds(state_kv_db_config),
)?
} else {
DB::open_cf(
&gen_rocksdb_options(state_kv_db_config, false),
path,
name,
gen_state_kv_cfds(state_kv_db_config, enable_sharding),
gen_state_kv_shard_cfds(state_kv_db_config),
)?
})
}
Expand Down
25 changes: 18 additions & 7 deletions storage/aptosdb/src/state_merkle_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,13 +583,24 @@ impl StateMerkleDb {
"Opened state merkle metadata db!"
);

let mut shard_id: usize = 0;
let state_merkle_db_shards = arr![{
let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(shard_root_path, shard_id as u8, &state_merkle_db_config, readonly)?;
shard_id += 1;
Arc::new(db)
}; 16];
let state_merkle_db_shards = (0..NUM_STATE_SHARDS)
.into_par_iter()
.map(|shard_id| {
let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(
shard_root_path,
shard_id as u8,
&state_merkle_db_config,
readonly,
)
.unwrap_or_else(|e| {
panic!("Failed to open state merkle db shard {shard_id}: {e:?}.")
});
Arc::new(db)
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

let state_merkle_db = Self {
state_merkle_metadata_db,
Expand Down
Loading