Skip to content

Commit

Permalink
[Storage] Open db in parallel when storage sharding is enabled.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Dec 5, 2024
1 parent a75189b commit 8d01152
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 85 deletions.
11 changes: 2 additions & 9 deletions storage/aptosdb/src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,8 @@ pub(super) fn gen_state_merkle_cfds(rocksdb_config: &RocksdbConfig) -> Vec<Colum
gen_cfds(rocksdb_config, cfs, |_, _| {})
}

pub(super) fn gen_state_kv_cfds(
rocksdb_config: &RocksdbConfig,
enable_sharding: bool,
) -> Vec<ColumnFamilyDescriptor> {
let cfs = if enable_sharding {
state_kv_db_new_key_column_families()
} else {
state_kv_db_column_families()
};
pub(super) fn gen_state_kv_cfds(rocksdb_config: &RocksdbConfig) -> Vec<ColumnFamilyDescriptor> {
let cfs = state_kv_db_new_key_column_families();
gen_cfds(rocksdb_config, cfs, with_state_key_extractor_processor)
}

Expand Down
134 changes: 85 additions & 49 deletions storage/aptosdb/src/ledger_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
schema::db_metadata::{DbMetadataKey, DbMetadataSchema},
};
use aptos_config::config::{RocksdbConfig, RocksdbConfigs};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_logger::prelude::info;
use aptos_rocksdb_options::gen_rocksdb_options;
use aptos_schemadb::{ColumnFamilyDescriptor, ColumnFamilyName, SchemaBatch, DB};
Expand Down Expand Up @@ -155,60 +156,95 @@ impl LedgerDb {

let ledger_db_folder = db_root_path.as_ref().join(LEDGER_DB_FOLDER_NAME);

let event_db_raw = Arc::new(Self::open_rocksdb(
ledger_db_folder.join(EVENT_DB_NAME),
EVENT_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?);
let event_db = EventDb::new(event_db_raw.clone(), EventStore::new(event_db_raw));

let transaction_accumulator_db =
TransactionAccumulatorDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME),
TRANSACTION_ACCUMULATOR_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let transaction_auxiliary_data_db =
TransactionAuxiliaryDataDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME),
TRANSACTION_AUXILIARY_DATA_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));
let transaction_db = TransactionDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_DB_NAME),
TRANSACTION_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let transaction_info_db = TransactionInfoDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_INFO_DB_NAME),
TRANSACTION_INFO_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));

let write_set_db = WriteSetDb::new(Arc::new(Self::open_rocksdb(
ledger_db_folder.join(WRITE_SET_DB_NAME),
WRITE_SET_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)?));
let mut event_db = None;
let mut transaction_accumulator_db = None;
let mut transaction_auxiliary_data_db = None;
let mut transaction_db = None;
let mut transaction_info_db = None;
let mut write_set_db = None;
THREAD_MANAGER.get_non_exe_cpu_pool().scope(|s| {
s.spawn(|_| {
let event_db_raw = Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(EVENT_DB_NAME),
EVENT_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
);
event_db = Some(EventDb::new(
event_db_raw.clone(),
EventStore::new(event_db_raw),
));
});
s.spawn(|_| {
transaction_accumulator_db = Some(TransactionAccumulatorDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME),
TRANSACTION_ACCUMULATOR_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
transaction_auxiliary_data_db = Some(TransactionAuxiliaryDataDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME),
TRANSACTION_AUXILIARY_DATA_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)))
});
s.spawn(|_| {
transaction_db = Some(TransactionDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_DB_NAME),
TRANSACTION_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
transaction_info_db = Some(TransactionInfoDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(TRANSACTION_INFO_DB_NAME),
TRANSACTION_INFO_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
s.spawn(|_| {
write_set_db = Some(WriteSetDb::new(Arc::new(
Self::open_rocksdb(
ledger_db_folder.join(WRITE_SET_DB_NAME),
WRITE_SET_DB_NAME,
&rocksdb_configs.ledger_db_config,
readonly,
)
.unwrap(),
)));
});
});

// TODO(grao): Handle data inconsistency.

Ok(Self {
ledger_metadata_db: LedgerMetadataDb::new(ledger_metadata_db),
event_db,
transaction_accumulator_db,
transaction_auxiliary_data_db,
transaction_db,
transaction_info_db,
write_set_db,
event_db: event_db.unwrap(),
transaction_accumulator_db: transaction_accumulator_db.unwrap(),
transaction_auxiliary_data_db: transaction_auxiliary_data_db.unwrap(),
transaction_db: transaction_db.unwrap(),
transaction_info_db: transaction_info_db.unwrap(),
write_set_db: write_set_db.unwrap(),
enable_storage_sharding: true,
})
}
Expand Down
38 changes: 18 additions & 20 deletions storage/aptosdb/src/state_kv_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use aptos_types::{
transaction::Version,
};
use arr_macro::arr;
use rayon::prelude::*;
use std::{
path::{Path, PathBuf},
sync::Arc,
Expand Down Expand Up @@ -56,19 +57,13 @@ impl StateKvDb {
});
}

Self::open(
db_paths,
rocksdb_configs.state_kv_db_config,
readonly,
sharding,
)
Self::open(db_paths, rocksdb_configs.state_kv_db_config, readonly)
}

pub(crate) fn open(
db_paths: &StorageDirPaths,
state_kv_db_config: RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<Self> {
let state_kv_metadata_db_path =
Self::metadata_db_path(db_paths.state_kv_db_metadata_root_path());
Expand All @@ -78,23 +73,29 @@ impl StateKvDb {
STATE_KV_METADATA_DB_NAME,
&state_kv_db_config,
readonly,
enable_sharding,
)?);

info!(
state_kv_metadata_db_path = state_kv_metadata_db_path,
"Opened state kv metadata db!"
);

let mut shard_id: usize = 0;
let state_kv_db_shards = {
arr![{
let state_kv_db_shards = (0..NUM_STATE_SHARDS)
.into_par_iter()
.map(|shard_id| {
let shard_root_path = db_paths.state_kv_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(shard_root_path, shard_id as u8, &state_kv_db_config, readonly, enable_sharding)?;
shard_id += 1;
let db = Self::open_shard(
shard_root_path,
shard_id as u8,
&state_kv_db_config,
readonly,
)
.expect("Failed to open state kv db shard {shard_id}.");
Arc::new(db)
}; 16]
};
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

let state_kv_db = Self {
state_kv_metadata_db,
Expand Down Expand Up @@ -247,15 +248,13 @@ impl StateKvDb {
shard_id: u8,
state_kv_db_config: &RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<DB> {
let db_name = format!("state_kv_db_shard_{}", shard_id);
Self::open_db(
Self::db_shard_path(db_root_path, shard_id),
&db_name,
state_kv_db_config,
readonly,
enable_sharding,
)
}

Expand All @@ -264,21 +263,20 @@ impl StateKvDb {
name: &str,
state_kv_db_config: &RocksdbConfig,
readonly: bool,
enable_sharding: bool,
) -> Result<DB> {
Ok(if readonly {
DB::open_cf_readonly(
&gen_rocksdb_options(state_kv_db_config, true),
path,
name,
gen_state_kv_cfds(state_kv_db_config, enable_sharding),
gen_state_kv_cfds(state_kv_db_config),
)?
} else {
DB::open_cf(
&gen_rocksdb_options(state_kv_db_config, false),
path,
name,
gen_state_kv_cfds(state_kv_db_config, enable_sharding),
gen_state_kv_cfds(state_kv_db_config),
)?
})
}
Expand Down
23 changes: 16 additions & 7 deletions storage/aptosdb/src/state_merkle_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,13 +583,22 @@ impl StateMerkleDb {
"Opened state merkle metadata db!"
);

let mut shard_id: usize = 0;
let state_merkle_db_shards = arr![{
let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(shard_root_path, shard_id as u8, &state_merkle_db_config, readonly)?;
shard_id += 1;
Arc::new(db)
}; 16];
let state_merkle_db_shards = (0..NUM_STATE_SHARDS)
.into_par_iter()
.map(|shard_id| {
let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8);
let db = Self::open_shard(
shard_root_path,
shard_id as u8,
&state_merkle_db_config,
readonly,
)
.expect("Failed to open state merkle db shard {shard_id}.");
Arc::new(db)
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

let state_merkle_db = Self {
state_merkle_metadata_db,
Expand Down

0 comments on commit 8d01152

Please sign in to comment.