Skip to content

Commit

Permalink
Use specific ordered collection with paritydb.
Browse files Browse the repository at this point in the history
  • Loading branch information
cheme committed Feb 2, 2022
1 parent 013f7ad commit 8b66d0a
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 40 deletions.
21 changes: 12 additions & 9 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ fn delete_meta(tx: &mut DBTransaction, config: &Config, hash: &CandidateHash) {

fn delete_unfinalized_height(tx: &mut DBTransaction, config: &Config, block_number: BlockNumber) {
let prefix = (UNFINALIZED_PREFIX, BEBlockNumber(block_number)).encode();
tx.delete_prefix(config.col_meta, &prefix);
tx.delete_prefix(config.col_meta_ordered, &prefix);
}

fn delete_unfinalized_inclusion(
Expand All @@ -266,7 +266,7 @@ fn delete_unfinalized_inclusion(
let key =
(UNFINALIZED_PREFIX, BEBlockNumber(block_number), block_hash, candidate_hash).encode();

tx.delete(config.col_meta, &key[..]);
tx.delete(config.col_meta_ordered, &key[..]);
}

fn delete_pruning_key(
Expand All @@ -276,7 +276,7 @@ fn delete_pruning_key(
h: &CandidateHash,
) {
let key = (PRUNE_BY_TIME_PREFIX, t.into(), h).encode();
tx.delete(config.col_meta, &key);
tx.delete(config.col_meta_ordered, &key);
}

fn write_pruning_key(
Expand All @@ -287,7 +287,7 @@ fn write_pruning_key(
) {
let t = t.into();
let key = (PRUNE_BY_TIME_PREFIX, t, h).encode();
tx.put(config.col_meta, &key, TOMBSTONE_VALUE);
tx.put(config.col_meta_ordered, &key, TOMBSTONE_VALUE);
}

fn finalized_block_range(finalized: BlockNumber) -> (Vec<u8>, Vec<u8>) {
Expand All @@ -306,7 +306,7 @@ fn write_unfinalized_block_contains(
ch: &CandidateHash,
) {
let key = (UNFINALIZED_PREFIX, BEBlockNumber(n), h, ch).encode();
tx.put(config.col_meta, &key, TOMBSTONE_VALUE);
tx.put(config.col_meta_ordered, &key, TOMBSTONE_VALUE);
}

fn pruning_range(now: impl Into<BETimestamp>) -> (Vec<u8>, Vec<u8>) {
Expand Down Expand Up @@ -424,6 +424,9 @@ pub struct Config {
pub col_data: u32,
/// The column family for availability store meta information.
pub col_meta: u32,
/// Second column family for availability store meta information,
/// content must be ordered.
pub col_meta_ordered: u32,
}

trait Clock: Send + Sync {
Expand Down Expand Up @@ -833,7 +836,7 @@ where
let batch_num = {
let mut iter = subsystem
.db
.iter_with_prefix(subsystem.config.col_meta, &start_prefix)
.iter_with_prefix(subsystem.config.col_meta_ordered, &start_prefix)
.take_while(|(k, _)| &k[..] < &end_prefix[..])
.peekable();

Expand Down Expand Up @@ -881,7 +884,7 @@ where

let iter = subsystem
.db
.iter_with_prefix(subsystem.config.col_meta, &start_prefix)
.iter_with_prefix(subsystem.config.col_meta_ordered, &start_prefix)
.take_while(|(k, _)| &k[..] < &end_prefix[..])
.peekable();

Expand Down Expand Up @@ -1228,11 +1231,11 @@ fn prune_all(db: &Arc<dyn Database>, config: &Config, clock: &dyn Clock) -> Resu

let mut tx = DBTransaction::new();
let iter = db
.iter_with_prefix(config.col_meta, &range_start[..])
.iter_with_prefix(config.col_meta_ordered, &range_start[..])
.take_while(|(k, _)| &k[..] < &range_end[..]);

for (k, _v) in iter {
tx.delete(config.col_meta, &k[..]);
tx.delete(config.col_meta_ordered, &k[..]);

let (_, candidate_hash) = match decode_pruning_key(&k[..]) {
Ok(m) => m,
Expand Down
15 changes: 11 additions & 4 deletions node/core/av-store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@ use sp_keyring::Sr25519Keyring;
mod columns {
pub const DATA: u32 = 0;
pub const META: u32 = 1;
pub const NUM_COLUMNS: u32 = 2;
pub const META_ORDERED: u32 = 2;
pub const NUM_COLUMNS: u32 = 3;
}

const TEST_CONFIG: Config = Config { col_data: columns::DATA, col_meta: columns::META };
const TEST_CONFIG: Config = Config {
col_data: columns::DATA,
col_meta: columns::META,
col_meta_ordered: columns::META_ORDERED,
};

type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>;

Expand Down Expand Up @@ -198,8 +203,10 @@ fn candidate_included(receipt: CandidateReceipt) -> CandidateEvent {
#[cfg(test)]
fn test_store() -> Arc<dyn Database> {
let db = kvdb_memorydb::create(columns::NUM_COLUMNS);
let db =
polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[columns::META]);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(
db,
&[columns::META_ORDERED],
);
Arc::new(db)
}

Expand Down
47 changes: 28 additions & 19 deletions node/core/chain-selection/src/db_backend/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ impl From<BlockEntry> for crate::BlockEntry {
pub struct Config {
/// The column where block metadata is stored.
pub col_data: u32,
/// The column where block metadata is stored,
/// ordered.
pub col_data_ordered: u32,
}

/// The database backend.
Expand Down Expand Up @@ -220,7 +223,7 @@ impl Backend for DbBackend {
fn load_stagnant_at(&self, timestamp: crate::Timestamp) -> Result<Vec<Hash>, Error> {
load_decode::<Vec<Hash>>(
&*self.inner,
self.config.col_data,
self.config.col_data_ordered,
&stagnant_at_key(timestamp.into()),
)
.map(|o| o.unwrap_or_default())
Expand All @@ -230,8 +233,9 @@ impl Backend for DbBackend {
&self,
up_to: crate::Timestamp,
) -> Result<Vec<(crate::Timestamp, Vec<Hash>)>, Error> {
let stagnant_at_iter =
self.inner.iter_with_prefix(self.config.col_data, &STAGNANT_AT_PREFIX[..]);
let stagnant_at_iter = self
.inner
.iter_with_prefix(self.config.col_data_ordered, &STAGNANT_AT_PREFIX[..]);

let val = stagnant_at_iter
.filter_map(|(k, v)| {
Expand All @@ -247,8 +251,9 @@ impl Backend for DbBackend {
}

fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error> {
let blocks_at_height_iter =
self.inner.iter_with_prefix(self.config.col_data, &BLOCK_HEIGHT_PREFIX[..]);
let blocks_at_height_iter = self
.inner
.iter_with_prefix(self.config.col_data_ordered, &BLOCK_HEIGHT_PREFIX[..]);

let val = blocks_at_height_iter
.filter_map(|(k, _)| decode_block_height_key(&k[..]))
Expand All @@ -258,8 +263,12 @@ impl Backend for DbBackend {
}

fn load_blocks_by_number(&self, number: BlockNumber) -> Result<Vec<Hash>, Error> {
load_decode::<Vec<Hash>>(&*self.inner, self.config.col_data, &block_height_key(number))
.map(|o| o.unwrap_or_default())
load_decode::<Vec<Hash>>(
&*self.inner,
self.config.col_data_ordered,
&block_height_key(number),
)
.map(|o| o.unwrap_or_default())
}

/// Atomically write the list of operations, with later operations taking precedence over prior.
Expand All @@ -280,10 +289,10 @@ impl Backend for DbBackend {
},
BackendWriteOp::WriteBlocksByNumber(block_number, v) =>
if v.is_empty() {
tx.delete(self.config.col_data, &block_height_key(block_number));
tx.delete(self.config.col_data_ordered, &block_height_key(block_number));
} else {
tx.put_vec(
self.config.col_data,
self.config.col_data_ordered,
&block_height_key(block_number),
v.encode(),
);
Expand All @@ -299,24 +308,24 @@ impl Backend for DbBackend {
BackendWriteOp::WriteStagnantAt(timestamp, stagnant_at) => {
let timestamp: Timestamp = timestamp.into();
if stagnant_at.is_empty() {
tx.delete(self.config.col_data, &stagnant_at_key(timestamp));
tx.delete(self.config.col_data_ordered, &stagnant_at_key(timestamp));
} else {
tx.put_vec(
self.config.col_data,
self.config.col_data_ordered,
&stagnant_at_key(timestamp),
stagnant_at.encode(),
);
}
},
BackendWriteOp::DeleteBlocksByNumber(block_number) => {
tx.delete(self.config.col_data, &block_height_key(block_number));
tx.delete(self.config.col_data_ordered, &block_height_key(block_number));
},
BackendWriteOp::DeleteBlockEntry(hash) => {
tx.delete(self.config.col_data, &block_entry_key(&hash));
},
BackendWriteOp::DeleteStagnantAt(timestamp) => {
let timestamp: Timestamp = timestamp.into();
tx.delete(self.config.col_data, &stagnant_at_key(timestamp));
tx.delete(self.config.col_data_ordered, &stagnant_at_key(timestamp));
},
}
}
Expand Down Expand Up @@ -389,7 +398,7 @@ mod tests {

#[cfg(test)]
fn test_db() -> Arc<dyn Database> {
let db = kvdb_memorydb::create(1);
let db = kvdb_memorydb::create(2);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[1]);
Arc::new(db)
}
Expand Down Expand Up @@ -433,7 +442,7 @@ mod tests {
#[test]
fn write_read_block_entry() {
let db = test_db();
let config = Config { col_data: 0 };
let config = Config { col_data: 0, col_data_ordered: 1 };

let mut backend = DbBackend::new(db, config);

Expand Down Expand Up @@ -463,7 +472,7 @@ mod tests {
#[test]
fn delete_block_entry() {
let db = test_db();
let config = Config { col_data: 0 };
let config = Config { col_data: 0, col_data_ordered: 1 };

let mut backend = DbBackend::new(db, config);

Expand Down Expand Up @@ -494,7 +503,7 @@ mod tests {
#[test]
fn earliest_block_number() {
let db = test_db();
let config = Config { col_data: 0 };
let config = Config { col_data: 0, col_data_ordered: 1 };

let mut backend = DbBackend::new(db, config);

Expand Down Expand Up @@ -523,7 +532,7 @@ mod tests {
#[test]
fn stagnant_at_up_to() {
let db = test_db();
let config = Config { col_data: 0 };
let config = Config { col_data: 0, col_data_ordered: 1 };

let mut backend = DbBackend::new(db, config);

Expand Down Expand Up @@ -579,7 +588,7 @@ mod tests {
#[test]
fn write_read_blocks_at_height() {
let db = test_db();
let config = Config { col_data: 0 };
let config = Config { col_data: 0, col_data_ordered: 1 };

let mut backend = DbBackend::new(db, config);

Expand Down
8 changes: 7 additions & 1 deletion node/core/chain-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ impl StagnantCheckInterval {
pub struct Config {
/// The column in the database that the storage should use.
pub col_data: u32,
/// The column in the database that the storage should use,
/// ordered.
pub col_data_ordered: u32,
/// How often to check for stagnant blocks.
pub stagnant_check_interval: StagnantCheckInterval,
}
Expand All @@ -325,7 +328,10 @@ where
fn start(self, ctx: Context) -> SpawnedSubsystem {
let backend = crate::db_backend::v1::DbBackend::new(
self.db,
crate::db_backend::v1::Config { col_data: self.config.col_data },
crate::db_backend::v1::Config {
col_data: self.config.col_data,
col_data_ordered: self.config.col_data_ordered,
},
);

SpawnedSubsystem {
Expand Down
17 changes: 17 additions & 0 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,9 +877,17 @@ where
},
};

let col_meta_ordered = if parachains_db
.is_indexed_column(crate::parachains_db::REAL_COLUMNS.col_availability_meta)
{
crate::parachains_db::REAL_COLUMNS.col_availability_meta
} else {
crate::parachains_db::REAL_COLUMNS.col_availability_meta_ordered
};
let availability_config = AvailabilityConfig {
col_data: crate::parachains_db::REAL_COLUMNS.col_availability_data,
col_meta: crate::parachains_db::REAL_COLUMNS.col_availability_meta,
col_meta_ordered,
};

let approval_voting_config = ApprovalVotingConfig {
Expand All @@ -899,8 +907,17 @@ where
},
};

let col_data_ordered = if parachains_db
.is_indexed_column(crate::parachains_db::REAL_COLUMNS.col_chain_selection_data)
{
crate::parachains_db::REAL_COLUMNS.col_chain_selection_data
} else {
crate::parachains_db::REAL_COLUMNS.col_chain_selection_data_ordered
};

let chain_selection_config = ChainSelectionConfig {
col_data: crate::parachains_db::REAL_COLUMNS.col_chain_selection_data,
col_data_ordered,
stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(),
};

Expand Down
Loading

0 comments on commit 8b66d0a

Please sign in to comment.