diff --git a/node/core/approval-voting/src/approval_db/v1/mod.rs b/node/core/approval-voting/src/approval_db/v1/mod.rs
index d2a13ad54550..c31389269d2e 100644
--- a/node/core/approval-voting/src/approval_db/v1/mod.rs
+++ b/node/core/approval-voting/src/approval_db/v1/mod.rs
@@ -15,6 +15,12 @@
// along with Polkadot. If not, see .
//! Version 1 of the DB schema.
+//!
+//! Note that the version here differs from the actual version of the parachains
+//! database (check `CURRENT_VERSION` in `node/service/src/parachains_db/upgrade.rs`).
+//! The code in this module implements the way approval voting works with
+//! its data in the database. Any breaking changes here will still
+//! require a db migration (check `node/service/src/parachains_db/upgrade.rs`).
use parity_scale_codec::{Decode, Encode};
use polkadot_node_primitives::approval::{AssignmentCert, DelayTranche};
@@ -154,8 +160,6 @@ pub type Bitfield = BitVec;
pub struct Config {
/// The column family in the database where data is stored.
pub col_approval_data: u32,
- /// The column of the database where rolling session window data is stored.
- pub col_session_data: u32,
}
/// Details pertaining to our assignment on a block.
diff --git a/node/core/approval-voting/src/approval_db/v1/tests.rs b/node/core/approval-voting/src/approval_db/v1/tests.rs
index 0d30cc8c0cdc..07d8242b772e 100644
--- a/node/core/approval-voting/src/approval_db/v1/tests.rs
+++ b/node/core/approval-voting/src/approval_db/v1/tests.rs
@@ -28,12 +28,10 @@ use std::{collections::HashMap, sync::Arc};
use ::test_helpers::{dummy_candidate_receipt, dummy_candidate_receipt_bad_sig, dummy_hash};
const DATA_COL: u32 = 0;
-const SESSION_DATA_COL: u32 = 1;
-const NUM_COLUMNS: u32 = 2;
+const NUM_COLUMNS: u32 = 1;
-const TEST_CONFIG: Config =
- Config { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
+const TEST_CONFIG: Config = Config { col_approval_data: DATA_COL };
fn make_db() -> (DbBackend, Arc) {
let db = kvdb_memorydb::create(NUM_COLUMNS);
diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs
index 1ea2687a0246..e33caed49c5f 100644
--- a/node/core/approval-voting/src/import.rs
+++ b/node/core/approval-voting/src/import.rs
@@ -609,12 +609,10 @@ pub(crate) mod tests {
use crate::{approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry};
const DATA_COL: u32 = 0;
- const SESSION_DATA_COL: u32 = 1;
- const NUM_COLUMNS: u32 = 2;
+ const NUM_COLUMNS: u32 = 1;
- const TEST_CONFIG: DatabaseConfig =
- DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
+ const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_approval_data: DATA_COL };
#[derive(Default)]
struct MockClock;
diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs
index 18b8746ca317..f5e888c7c538 100644
--- a/node/core/approval-voting/src/lib.rs
+++ b/node/core/approval-voting/src/lib.rs
@@ -116,8 +116,6 @@ const LOG_TARGET: &str = "parachain::approval-voting";
pub struct Config {
/// The column family in the DB where approval-voting data is stored.
pub col_approval_data: u32,
- /// The of the DB where rolling session info is stored.
- pub col_session_data: u32,
/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
/// divisible by 500.
pub slot_duration_millis: u64,
@@ -357,10 +355,7 @@ impl ApprovalVotingSubsystem {
keystore,
slot_duration_millis: config.slot_duration_millis,
db,
- db_config: DatabaseConfig {
- col_approval_data: config.col_approval_data,
- col_session_data: config.col_session_data,
- },
+ db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
mode: Mode::Syncing(sync_oracle),
metrics,
}
@@ -369,10 +364,8 @@ impl ApprovalVotingSubsystem {
/// Revert to the block corresponding to the specified `hash`.
/// The operation is not allowed for blocks older than the last finalized one.
pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
- let config = approval_db::v1::Config {
- col_approval_data: self.db_config.col_approval_data,
- col_session_data: self.db_config.col_session_data,
- };
+ let config =
+ approval_db::v1::Config { col_approval_data: self.db_config.col_approval_data };
let mut backend = approval_db::v1::DbBackend::new(self.db.clone(), config);
let mut overlay = OverlayedBackend::new(&backend);
diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs
index d7e19a8c09f3..f58e60c6a487 100644
--- a/node/core/approval-voting/src/tests.rs
+++ b/node/core/approval-voting/src/tests.rs
@@ -14,8 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use crate::tests::test_constants::TEST_CONFIG;
-
use super::*;
use polkadot_node_primitives::{
approval::{
@@ -115,12 +113,10 @@ fn make_sync_oracle(val: bool) -> (Box, TestSyncOracleHan
pub mod test_constants {
use crate::approval_db::v1::Config as DatabaseConfig;
const DATA_COL: u32 = 0;
- const SESSION_DATA_COL: u32 = 1;
- pub(crate) const NUM_COLUMNS: u32 = 2;
+ pub(crate) const NUM_COLUMNS: u32 = 1;
- pub(crate) const TEST_CONFIG: DatabaseConfig =
- DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
+ pub(crate) const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_approval_data: DATA_COL };
}
struct MockSupportsParachains;
@@ -493,7 +489,6 @@ fn test_harness>(
Config {
col_approval_data: test_constants::TEST_CONFIG.col_approval_data,
slot_duration_millis: SLOT_DURATION_MILLIS,
- col_session_data: TEST_CONFIG.col_session_data,
},
Arc::new(db),
Arc::new(keystore),
diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs
index aa67781ddd25..2d14f5151003 100644
--- a/node/core/dispute-coordinator/src/db/v1.rs
+++ b/node/core/dispute-coordinator/src/db/v1.rs
@@ -15,6 +15,12 @@
// along with Polkadot. If not, see .
//! `V1` database for the dispute coordinator.
+//!
+//! Note that the version here differs from the actual version of the parachains
+//! database (check `CURRENT_VERSION` in `node/service/src/parachains_db/upgrade.rs`).
+//! The code in this module implements the way dispute coordinator works with
+//! the dispute data in the database. Any breaking changes here will still
+//! require a db migration (check `node/service/src/parachains_db/upgrade.rs`).
use polkadot_node_primitives::DisputeStatus;
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
@@ -206,8 +212,6 @@ fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] {
pub struct ColumnConfiguration {
/// The column in the key-value DB where data is stored.
pub col_dispute_data: u32,
- /// The column in the key-value DB where session data is stored.
- pub col_session_data: u32,
}
/// Tracked votes on candidates, for the purposes of dispute resolution.
@@ -378,7 +382,7 @@ mod tests {
let db = kvdb_memorydb::create(1);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
let store = Arc::new(db);
- let config = ColumnConfiguration { col_dispute_data: 0, col_session_data: 1 };
+ let config = ColumnConfiguration { col_dispute_data: 0 };
DbBackend::new(store, config, Metrics::default())
}
diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs
index 81134a43a3a0..1b90a9d865e1 100644
--- a/node/core/dispute-coordinator/src/initialized.rs
+++ b/node/core/dispute-coordinator/src/initialized.rs
@@ -305,13 +305,12 @@ impl Initialized {
Ok(session_idx)
if self.gaps_in_cache || session_idx > self.highest_session_seen =>
{
- // If error has occurred during last session caching - fetch the whole window
- // Otherwise - cache only the new sessions
- let lower_bound = if self.gaps_in_cache {
- session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)
- } else {
- self.highest_session_seen + 1
- };
+ // Fetch the last `DISPUTE_WINDOW` number of sessions unless there are no gaps in
+ // cache and we are not missing too many `SessionInfo`s
+ let mut lower_bound = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
+ if !self.gaps_in_cache && self.highest_session_seen > lower_bound {
+ lower_bound = self.highest_session_seen + 1
+ }
// There is a new session. Perform a dummy fetch to cache it.
for idx in lower_bound..=session_idx {
diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs
index 7379b392f312..02bb6ef9ecda 100644
--- a/node/core/dispute-coordinator/src/lib.rs
+++ b/node/core/dispute-coordinator/src/lib.rs
@@ -127,16 +127,11 @@ pub struct DisputeCoordinatorSubsystem {
pub struct Config {
/// The data column in the store to use for dispute data.
pub col_dispute_data: u32,
- /// The data column in the store to use for session data.
- pub col_session_data: u32,
}
impl Config {
fn column_config(&self) -> db::v1::ColumnConfiguration {
- db::v1::ColumnConfiguration {
- col_dispute_data: self.col_dispute_data,
- col_session_data: self.col_session_data,
- }
+ db::v1::ColumnConfiguration { col_dispute_data: self.col_dispute_data }
}
}
diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs
index 7d3b87f3c228..ceeac351e8b8 100644
--- a/node/core/dispute-coordinator/src/tests.rs
+++ b/node/core/dispute-coordinator/src/tests.rs
@@ -33,6 +33,7 @@ use polkadot_node_subsystem_util::database::Database;
use polkadot_node_primitives::{
DisputeMessage, DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement,
+ DISPUTE_WINDOW,
};
use polkadot_node_subsystem::{
messages::{
@@ -214,9 +215,9 @@ impl Default for TestState {
make_keystore(vec![Sr25519Keyring::Alice.to_seed()].into_iter()).into();
let db = kvdb_memorydb::create(1);
- let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
+ let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
let db = Arc::new(db);
- let config = Config { col_dispute_data: 0, col_session_data: 1 };
+ let config = Config { col_dispute_data: 0 };
let genesis_header = Header {
parent_hash: Hash::zero(),
@@ -330,9 +331,11 @@ impl TestState {
assert_eq!(h, block_hash);
let _ = tx.send(Ok(session));
+ let first_expected_session = session.saturating_sub(DISPUTE_WINDOW.get() - 1);
+
// Queries for session caching - see `handle_startup`
if self.known_session.is_none() {
- for i in 0..=session {
+ for i in first_expected_session..=session {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
@@ -3393,3 +3396,174 @@ fn informs_chain_selection_when_dispute_concluded_against() {
})
});
}
+
+// On startup `SessionInfo` cache should be populated
+#[test]
+fn session_info_caching_on_startup_works() {
+ test_harness(|mut test_state, mut virtual_overseer| {
+ Box::pin(async move {
+ let session = 1;
+
+ test_state.handle_resume_sync(&mut virtual_overseer, session).await;
+
+ test_state
+ })
+ });
+}
+
+// Underflow means that no more than `DISPUTE_WINDOW` sessions should be fetched on startup
+#[test]
+fn session_info_caching_doesnt_underflow() {
+ test_harness(|mut test_state, mut virtual_overseer| {
+ Box::pin(async move {
+ let session = DISPUTE_WINDOW.get() + 1;
+
+ test_state.handle_resume_sync(&mut virtual_overseer, session).await;
+
+ test_state
+ })
+ });
+}
+
+// Cached `SessionInfo` shouldn't be re-requested from the runtime
+#[test]
+fn session_info_is_requested_only_once() {
+ test_harness(|mut test_state, mut virtual_overseer| {
+ Box::pin(async move {
+ let session = 1;
+
+ test_state.handle_resume_sync(&mut virtual_overseer, session).await;
+
+ // This leaf activation shouldn't fetch `SessionInfo` because the session is already cached
+ test_state
+ .activate_leaf_at_session(
+ &mut virtual_overseer,
+ session,
+ 3,
+ vec![make_candidate_included_event(make_valid_candidate_receipt())],
+ )
+ .await;
+
+ // This leaf activation should fetch `SessionInfo` because the session is new
+ test_state
+ .activate_leaf_at_session(
+ &mut virtual_overseer,
+ session + 1,
+ 4,
+ vec![make_candidate_included_event(make_valid_candidate_receipt())],
+ )
+ .await;
+
+ assert_matches!(
+ virtual_overseer.recv().await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ _,
+ RuntimeApiRequest::SessionInfo(session_index, tx),
+ )) => {
+ assert_eq!(session_index, 2);
+ let _ = tx.send(Ok(Some(test_state.session_info())));
+ }
+ );
+ test_state
+ })
+ });
+}
+
+// Big jump means the new session we see with a leaf update is at least a `DISPUTE_WINDOW` bigger than
+// the already known one. In this case The whole `DISPUTE_WINDOW` should be fetched.
+#[test]
+fn session_info_big_jump_works() {
+ test_harness(|mut test_state, mut virtual_overseer| {
+ Box::pin(async move {
+ let session_on_startup = 1;
+
+ test_state.handle_resume_sync(&mut virtual_overseer, session_on_startup).await;
+
+ // This leaf activation shouldn't fetch `SessionInfo` because the session is already cached
+ test_state
+ .activate_leaf_at_session(
+ &mut virtual_overseer,
+ session_on_startup,
+ 3,
+ vec![make_candidate_included_event(make_valid_candidate_receipt())],
+ )
+ .await;
+
+ let session_after_jump = session_on_startup + DISPUTE_WINDOW.get() + 10;
+ // This leaf activation should cache all missing `SessionInfo`s
+ test_state
+ .activate_leaf_at_session(
+ &mut virtual_overseer,
+ session_after_jump,
+ 4,
+ vec![make_candidate_included_event(make_valid_candidate_receipt())],
+ )
+ .await;
+
+ let first_expected_session =
+ session_after_jump.saturating_sub(DISPUTE_WINDOW.get() - 1);
+ for expected_idx in first_expected_session..=session_after_jump {
+ assert_matches!(
+ virtual_overseer.recv().await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ _,
+ RuntimeApiRequest::SessionInfo(session_index, tx),
+ )) => {
+ assert_eq!(session_index, expected_idx);
+ let _ = tx.send(Ok(Some(test_state.session_info())));
+ }
+ );
+ }
+ test_state
+ })
+ });
+}
+
+// Small jump means the new session we see with a leaf update is at less than last known one + `DISPUTE_WINDOW`. In this
+// case fetching should start from last known one + 1.
+#[test]
+fn session_info_small_jump_works() {
+ test_harness(|mut test_state, mut virtual_overseer| {
+ Box::pin(async move {
+ let session_on_startup = 1;
+
+ test_state.handle_resume_sync(&mut virtual_overseer, session_on_startup).await;
+
+ // This leaf activation shouldn't fetch `SessionInfo` because the session is already cached
+ test_state
+ .activate_leaf_at_session(
+ &mut virtual_overseer,
+ session_on_startup,
+ 3,
+ vec![make_candidate_included_event(make_valid_candidate_receipt())],
+ )
+ .await;
+
+ let session_after_jump = session_on_startup + DISPUTE_WINDOW.get() - 1;
+ // This leaf activation should cache all missing `SessionInfo`s
+ test_state
+ .activate_leaf_at_session(
+ &mut virtual_overseer,
+ session_after_jump,
+ 4,
+ vec![make_candidate_included_event(make_valid_candidate_receipt())],
+ )
+ .await;
+
+ let first_expected_session = session_on_startup + 1;
+ for expected_idx in first_expected_session..=session_after_jump {
+ assert_matches!(
+ virtual_overseer.recv().await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ _,
+ RuntimeApiRequest::SessionInfo(session_index, tx),
+ )) => {
+ assert_eq!(session_index, expected_idx);
+ let _ = tx.send(Ok(Some(test_state.session_info())));
+ }
+ );
+ }
+ test_state
+ })
+ });
+}
diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs
index e55f2456160b..c29933732388 100644
--- a/node/service/src/lib.rs
+++ b/node/service/src/lib.rs
@@ -896,7 +896,6 @@ where
let approval_voting_config = ApprovalVotingConfig {
col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data,
- col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data,
slot_duration_millis: slot_duration.as_millis() as u64,
};
@@ -920,7 +919,6 @@ where
let dispute_coordinator_config = DisputeCoordinatorConfig {
col_dispute_data: parachains_db::REAL_COLUMNS.col_dispute_coordinator_data,
- col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data,
};
let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams {
@@ -1512,7 +1510,6 @@ fn revert_chain_selection(db: Arc, hash: Hash) -> sp_blockchain::R
fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::Result<()> {
let config = approval_voting_subsystem::Config {
col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data,
- col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data,
slot_duration_millis: Default::default(),
};
diff --git a/node/service/src/parachains_db/mod.rs b/node/service/src/parachains_db/mod.rs
index 918aecd25e76..519afbe0ccd1 100644
--- a/node/service/src/parachains_db/mod.rs
+++ b/node/service/src/parachains_db/mod.rs
@@ -36,12 +36,18 @@ pub(crate) mod columns {
pub mod v2 {
pub const NUM_COLUMNS: u32 = 6;
+
+ #[cfg(test)]
+ pub const COL_SESSION_WINDOW_DATA: u32 = 5;
+ }
+
+ pub mod v3 {
+ pub const NUM_COLUMNS: u32 = 5;
pub const COL_AVAILABILITY_DATA: u32 = 0;
pub const COL_AVAILABILITY_META: u32 = 1;
pub const COL_APPROVAL_DATA: u32 = 2;
pub const COL_CHAIN_SELECTION_DATA: u32 = 3;
pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
- pub const COL_SESSION_WINDOW_DATA: u32 = 5;
pub const ORDERED_COL: &[u32] =
&[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA];
@@ -62,19 +68,16 @@ pub struct ColumnsConfig {
pub col_chain_selection_data: u32,
/// The column used by dispute coordinator for data.
pub col_dispute_coordinator_data: u32,
- /// The column used for session window data.
- pub col_session_window_data: u32,
}
/// The real columns used by the parachains DB.
#[cfg(any(test, feature = "full-node"))]
pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig {
- col_availability_data: columns::v2::COL_AVAILABILITY_DATA,
- col_availability_meta: columns::v2::COL_AVAILABILITY_META,
- col_approval_data: columns::v2::COL_APPROVAL_DATA,
- col_chain_selection_data: columns::v2::COL_CHAIN_SELECTION_DATA,
- col_dispute_coordinator_data: columns::v2::COL_DISPUTE_COORDINATOR_DATA,
- col_session_window_data: columns::v2::COL_SESSION_WINDOW_DATA,
+ col_availability_data: columns::v3::COL_AVAILABILITY_DATA,
+ col_availability_meta: columns::v3::COL_AVAILABILITY_META,
+ col_approval_data: columns::v3::COL_APPROVAL_DATA,
+ col_chain_selection_data: columns::v3::COL_CHAIN_SELECTION_DATA,
+ col_dispute_coordinator_data: columns::v3::COL_DISPUTE_COORDINATOR_DATA,
};
#[derive(PartialEq)]
@@ -122,20 +125,17 @@ pub fn open_creating_rocksdb(
let path = root.join("parachains").join("db");
- let mut db_config = DatabaseConfig::with_columns(columns::v2::NUM_COLUMNS);
+ let mut db_config = DatabaseConfig::with_columns(columns::v3::NUM_COLUMNS);
let _ = db_config
.memory_budget
- .insert(columns::v2::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
- let _ = db_config
- .memory_budget
- .insert(columns::v2::COL_AVAILABILITY_META, cache_sizes.availability_meta);
+ .insert(columns::v3::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
let _ = db_config
.memory_budget
- .insert(columns::v2::COL_APPROVAL_DATA, cache_sizes.approval_data);
+ .insert(columns::v3::COL_AVAILABILITY_META, cache_sizes.availability_meta);
let _ = db_config
.memory_budget
- .insert(columns::v2::COL_SESSION_WINDOW_DATA, cache_sizes.session_data);
+ .insert(columns::v3::COL_APPROVAL_DATA, cache_sizes.approval_data);
let path_str = path
.to_str()
@@ -146,7 +146,7 @@ pub fn open_creating_rocksdb(
let db = Database::open(&db_config, &path_str)?;
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(
db,
- columns::v2::ORDERED_COL,
+ columns::v3::ORDERED_COL,
);
Ok(Arc::new(db))
@@ -166,12 +166,12 @@ pub fn open_creating_paritydb(
std::fs::create_dir_all(&path_str)?;
upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB)?;
- let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_2_config(&path))
+ let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_3_config(&path))
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;
let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new(
db,
- columns::v2::ORDERED_COL,
+ columns::v3::ORDERED_COL,
);
Ok(Arc::new(db))
}
diff --git a/node/service/src/parachains_db/upgrade.rs b/node/service/src/parachains_db/upgrade.rs
index c52bd21c0573..6041a093ef9b 100644
--- a/node/service/src/parachains_db/upgrade.rs
+++ b/node/service/src/parachains_db/upgrade.rs
@@ -28,7 +28,7 @@ type Version = u32;
const VERSION_FILE_NAME: &'static str = "parachain_db_version";
/// Current db version.
-const CURRENT_VERSION: Version = 2;
+const CURRENT_VERSION: Version = 3;
#[derive(thiserror::Error, Debug)]
pub enum Error {
@@ -58,6 +58,8 @@ pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<()
Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?,
// 1 -> 2 migration
Some(1) => migrate_from_version_1_to_2(db_path, db_kind)?,
+ // 2 -> 3 migration
+ Some(2) => migrate_from_version_2_to_3(db_path, db_kind)?,
// Already at current version, do nothing.
Some(CURRENT_VERSION) => (),
// This is an arbitrary future version, we don't handle it.
@@ -127,6 +129,18 @@ fn migrate_from_version_1_to_2(path: &Path, db_kind: DatabaseKind) -> Result<(),
})
}
+fn migrate_from_version_2_to_3(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> {
+ gum::info!(target: LOG_TARGET, "Migrating parachains db from version 2 to version 3 ...");
+ match db_kind {
+ DatabaseKind::ParityDB => paritydb_migrate_from_version_2_to_3(path),
+ DatabaseKind::RocksDB => rocksdb_migrate_from_version_2_to_3(path),
+ }
+ .and_then(|result| {
+ gum::info!(target: LOG_TARGET, "Migration complete! ");
+ Ok(result)
+ })
+}
+
/// Migration from version 0 to version 1:
/// * the number of columns has changed from 3 to 5;
fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
@@ -160,6 +174,20 @@ fn rocksdb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> {
Ok(())
}
+fn rocksdb_migrate_from_version_2_to_3(path: &Path) -> Result<(), Error> {
+ use kvdb_rocksdb::{Database, DatabaseConfig};
+
+ let db_path = path
+ .to_str()
+ .ok_or_else(|| super::other_io_error("Invalid database path".into()))?;
+ let db_cfg = DatabaseConfig::with_columns(super::columns::v2::NUM_COLUMNS);
+ let mut db = Database::open(&db_cfg, db_path)?;
+
+ db.remove_last_column()?;
+
+ Ok(())
+}
+
// This currently clears columns which had their configs altered between versions.
// The columns to be changed are constrained by the `allowed_columns` vector.
fn paritydb_fix_columns(
@@ -221,7 +249,7 @@ fn paritydb_fix_columns(
pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options {
let mut options =
parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
- for i in columns::v2::ORDERED_COL {
+ for i in columns::v3::ORDERED_COL {
options.columns[*i as usize].btree_index = true;
}
@@ -232,7 +260,18 @@ pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options {
pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options {
let mut options =
parity_db::Options::with_columns(&path, super::columns::v2::NUM_COLUMNS as u8);
- for i in columns::v2::ORDERED_COL {
+ for i in columns::v3::ORDERED_COL {
+ options.columns[*i as usize].btree_index = true;
+ }
+
+ options
+}
+
+/// Database configuration for version 3.
+pub(crate) fn paritydb_version_3_config(path: &Path) -> parity_db::Options {
+ let mut options =
+ parity_db::Options::with_columns(&path, super::columns::v3::NUM_COLUMNS as u8);
+ for i in columns::v3::ORDERED_COL {
options.columns[*i as usize].btree_index = true;
}
@@ -244,8 +283,8 @@ pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options {
pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options {
let mut options =
parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
- options.columns[super::columns::v2::COL_AVAILABILITY_META as usize].btree_index = true;
- options.columns[super::columns::v2::COL_CHAIN_SELECTION_DATA as usize].btree_index = true;
+ options.columns[super::columns::v3::COL_AVAILABILITY_META as usize].btree_index = true;
+ options.columns[super::columns::v3::COL_CHAIN_SELECTION_DATA as usize].btree_index = true;
options
}
@@ -260,7 +299,7 @@ fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
paritydb_fix_columns(
path,
paritydb_version_1_config(path),
- vec![super::columns::v2::COL_DISPUTE_COORDINATOR_DATA],
+ vec![super::columns::v3::COL_DISPUTE_COORDINATOR_DATA],
)?;
Ok(())
@@ -278,9 +317,20 @@ fn paritydb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> {
Ok(())
}
+/// Migration from version 2 to version 3:
+/// - drop the column used by `RollingSessionWindow`
+fn paritydb_migrate_from_version_2_to_3(path: &Path) -> Result<(), Error> {
+ parity_db::Db::drop_last_column(&mut paritydb_version_2_config(path))
+ .map_err(|e| other_io_error(format!("Error removing COL_SESSION_WINDOW_DATA {:?}", e)))?;
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
- use super::{columns::v2::*, *};
+ use super::{
+ columns::{v2::COL_SESSION_WINDOW_DATA, v3::*},
+ *,
+ };
#[test]
fn test_paritydb_migrate_0_to_1() {
@@ -375,7 +425,7 @@ mod tests {
// We need to properly set db version for upgrade to work.
fs::write(version_file_path(db_dir.path()), "1").expect("Failed to write DB version");
{
- let db = DbAdapter::new(db, columns::v2::ORDERED_COL);
+ let db = DbAdapter::new(db, columns::v3::ORDERED_COL);
db.write(DBTransaction {
ops: vec![DBOp::Insert {
col: COL_DISPUTE_COORDINATOR_DATA,
@@ -393,7 +443,7 @@ mod tests {
assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS);
- let db = DbAdapter::new(db, columns::v2::ORDERED_COL);
+ let db = DbAdapter::new(db, columns::v3::ORDERED_COL);
assert_eq!(
db.get(COL_DISPUTE_COORDINATOR_DATA, b"1234").unwrap(),
@@ -416,4 +466,59 @@ mod tests {
Some("0xdeadb00b".as_bytes().to_vec())
);
}
+
+ #[test]
+ fn test_paritydb_migrate_2_to_3() {
+ use parity_db::Db;
+
+ let db_dir = tempfile::tempdir().unwrap();
+ let path = db_dir.path();
+ let test_key = b"1337";
+
+ // We need to properly set db version for upgrade to work.
+ fs::write(version_file_path(path), "2").expect("Failed to write DB version");
+
+ {
+ let db = Db::open_or_create(&paritydb_version_2_config(&path)).unwrap();
+
+ // Write some dummy data
+ db.commit(vec![(
+ COL_SESSION_WINDOW_DATA as u8,
+ test_key.to_vec(),
+ Some(b"0xdeadb00b".to_vec()),
+ )])
+ .unwrap();
+
+ assert_eq!(db.num_columns(), columns::v2::NUM_COLUMNS as u8);
+ }
+
+ try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap();
+
+ let db = Db::open(&paritydb_version_3_config(&path)).unwrap();
+
+ assert_eq!(db.num_columns(), columns::v3::NUM_COLUMNS as u8);
+ }
+
+ #[test]
+ fn test_rocksdb_migrate_2_to_3() {
+ use kvdb_rocksdb::{Database, DatabaseConfig};
+
+ let db_dir = tempfile::tempdir().unwrap();
+ let db_path = db_dir.path().to_str().unwrap();
+ let db_cfg = DatabaseConfig::with_columns(super::columns::v2::NUM_COLUMNS);
+ {
+ let db = Database::open(&db_cfg, db_path).unwrap();
+ assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS as u32);
+ }
+
+ // We need to properly set db version for upgrade to work.
+ fs::write(version_file_path(db_dir.path()), "2").expect("Failed to write DB version");
+
+ try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB).unwrap();
+
+ let db_cfg = DatabaseConfig::with_columns(super::columns::v3::NUM_COLUMNS);
+ let db = Database::open(&db_cfg, db_path).unwrap();
+
+ assert_eq!(db.num_columns(), super::columns::v3::NUM_COLUMNS);
+ }
}
diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs
index 6c16cf396c40..1444bc0a2bf1 100644
--- a/node/subsystem-util/src/lib.rs
+++ b/node/subsystem-util/src/lib.rs
@@ -65,8 +65,6 @@ pub mod reexports {
pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
}
-/// A rolling session window cache.
-pub mod rolling_session_window;
/// Convenient and efficient runtime info access.
pub mod runtime;
diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs
deleted file mode 100644
index 18364491849a..000000000000
--- a/node/subsystem-util/src/rolling_session_window.rs
+++ /dev/null
@@ -1,1532 +0,0 @@
-// Copyright (C) Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-//! A rolling window of sessions and cached session info, updated by the state of newly imported blocks.
-//!
-//! This is useful for consensus components which need to stay up-to-date about recent sessions but don't
-//! care about the state of particular blocks.
-
-use super::database::{DBTransaction, Database};
-use kvdb::{DBKey, DBOp};
-
-use parity_scale_codec::{Decode, Encode};
-pub use polkadot_node_primitives::{new_session_window_size, SessionWindowSize};
-use polkadot_primitives::{BlockNumber, Hash, SessionIndex, SessionInfo};
-use std::sync::Arc;
-
-use futures::channel::oneshot;
-use polkadot_node_subsystem::{
- errors::{ChainApiError, RuntimeApiError},
- messages::{ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest},
- overseer,
-};
-
-// The window size is equal to the `approval-voting` and `dispute-coordinator` constants that
-// have been obsoleted.
-const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6);
-const LOG_TARGET: &str = "parachain::rolling-session-window";
-const STORED_ROLLING_SESSION_WINDOW: &[u8] = b"Rolling_session_window";
-
-/// Sessions unavailable in state to cache.
-#[derive(Debug, Clone, thiserror::Error)]
-pub enum SessionsUnavailableReason {
- /// Runtime API subsystem was unavailable.
- #[error(transparent)]
- RuntimeApiUnavailable(#[from] oneshot::Canceled),
- /// The runtime API itself returned an error.
- #[error(transparent)]
- RuntimeApi(#[from] RuntimeApiError),
- /// The chain API itself returned an error.
- #[error(transparent)]
- ChainApi(#[from] ChainApiError),
- /// Missing session info from runtime API for given `SessionIndex`.
- #[error("Missing session index {0:?}")]
- Missing(SessionIndex),
- /// Missing last finalized block number.
- #[error("Missing last finalized block number")]
- MissingLastFinalizedBlock,
- /// Missing last finalized block hash.
- #[error("Missing last finalized block hash")]
- MissingLastFinalizedBlockHash(BlockNumber),
-}
-
-/// Information about the sessions being fetched.
-#[derive(Debug, Clone)]
-pub struct SessionsUnavailableInfo {
- /// The desired window start.
- pub window_start: SessionIndex,
- /// The desired window end.
- pub window_end: SessionIndex,
- /// The block hash whose state the sessions were meant to be drawn from.
- pub block_hash: Hash,
-}
-
-/// Sessions were unavailable to fetch from the state for some reason.
-#[derive(Debug, thiserror::Error, Clone)]
-#[error("Sessions unavailable: {kind:?}, info: {info:?}")]
-pub struct SessionsUnavailable {
- /// The error kind.
- #[source]
- kind: SessionsUnavailableReason,
- /// The info about the session window, if any.
- info: Option,
-}
-
-/// An indicated update of the rolling session window.
-#[derive(Debug, PartialEq, Clone)]
-pub enum SessionWindowUpdate {
- /// The session window was just advanced from one range to a new one.
- Advanced {
- /// The previous start of the window (inclusive).
- prev_window_start: SessionIndex,
- /// The previous end of the window (inclusive).
- prev_window_end: SessionIndex,
- /// The new start of the window (inclusive).
- new_window_start: SessionIndex,
- /// The new end of the window (inclusive).
- new_window_end: SessionIndex,
- },
- /// The session window was unchanged.
- Unchanged,
-}
-
-/// A structure to store rolling session database parameters.
-#[derive(Clone)]
-pub struct DatabaseParams {
- /// Database reference.
- pub db: Arc,
- /// The column which stores the rolling session info.
- pub db_column: u32,
-}
-/// A rolling window of sessions and cached session info.
-pub struct RollingSessionWindow {
- earliest_session: SessionIndex,
- session_info: Vec,
- window_size: SessionWindowSize,
- // The option is just to enable some approval-voting tests to force feed sessions
- // in the window without dealing with the DB.
- db_params: Option,
-}
-
-/// The rolling session data we persist in the database.
-#[derive(Encode, Decode, Default)]
-struct StoredWindow {
- earliest_session: SessionIndex,
- session_info: Vec,
-}
-
-impl RollingSessionWindow {
- /// Initialize a new session info cache with the given window size.
- /// Invariant: The database always contains the earliest session. Then,
- /// we can always extend the session info vector using chain state.
- pub async fn new(
- mut sender: Sender,
- block_hash: Hash,
- db_params: DatabaseParams,
- ) -> Result
- where
- Sender: overseer::SubsystemSender
- + overseer::SubsystemSender,
- {
- // At first, determine session window start using the chain state.
- let session_index = get_session_index_for_child(&mut sender, block_hash).await?;
- let earliest_non_finalized_block_session =
- Self::earliest_non_finalized_block_session(&mut sender).await?;
-
- // This will increase the session window to cover the full unfinalized chain.
- let on_chain_window_start = std::cmp::min(
- session_index.saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- earliest_non_finalized_block_session,
- );
-
- // Fetch session information from DB.
- let maybe_stored_window = Self::db_load(db_params.clone());
-
- // Get the DB stored sessions and recompute window start based on DB data.
- let (mut window_start, stored_sessions) =
- if let Some(mut stored_window) = maybe_stored_window {
- // Check if DB is ancient.
- if earliest_non_finalized_block_session >
- stored_window.earliest_session + stored_window.session_info.len() as u32
- {
- // If ancient, we scrap it and fetch from chain state.
- stored_window.session_info.clear();
- }
-
- // The session window might extend beyond the last finalized block, but that's fine as we'll prune it at
- // next update.
- let window_start = if stored_window.session_info.len() > 0 {
- // If there is at least one entry in db, we always take the DB as source of truth.
- stored_window.earliest_session
- } else {
- on_chain_window_start
- };
-
- (window_start, stored_window.session_info)
- } else {
- (on_chain_window_start, Vec::new())
- };
-
- // Compute the amount of sessions missing from the window that will be fetched from chain state.
- let sessions_missing_count = session_index
- .saturating_sub(window_start)
- .saturating_add(1)
- .saturating_sub(stored_sessions.len() as u32);
-
- // Extend from chain state.
- let sessions = if sessions_missing_count > 0 {
- match extend_sessions_from_chain_state(
- stored_sessions,
- &mut sender,
- block_hash,
- &mut window_start,
- session_index,
- )
- .await
- {
- Err(kind) => Err(SessionsUnavailable {
- kind,
- info: Some(SessionsUnavailableInfo {
- window_start,
- window_end: session_index,
- block_hash,
- }),
- }),
- Ok(sessions) => Ok(sessions),
- }?
- } else {
- // There are no new sessions to be fetched from chain state.
- stored_sessions
- };
-
- Ok(Self {
- earliest_session: window_start,
- session_info: sessions,
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(db_params),
- })
- }
-
- // Load session information from the parachains db.
- fn db_load(db_params: DatabaseParams) -> Option {
- match db_params.db.get(db_params.db_column, STORED_ROLLING_SESSION_WINDOW).ok()? {
- None => None,
- Some(raw) => {
- let maybe_decoded = StoredWindow::decode(&mut &raw[..]).map(Some);
- match maybe_decoded {
- Ok(decoded) => decoded,
- Err(err) => {
- gum::warn!(
- target: LOG_TARGET,
- ?err,
- "Failed decoding db entry; will start with onchain session infos and self-heal DB entry on next update."
- );
- None
- },
- }
- },
- }
- }
-
- // Saves/Updates all sessions in the database.
- // TODO: https://github.com/paritytech/polkadot/issues/6144
- fn db_save(&mut self, stored_window: StoredWindow) {
- if let Some(db_params) = self.db_params.as_ref() {
- match db_params.db.write(DBTransaction {
- ops: vec![DBOp::Insert {
- col: db_params.db_column,
- key: DBKey::from_slice(STORED_ROLLING_SESSION_WINDOW),
- value: stored_window.encode(),
- }],
- }) {
- Ok(_) => {},
- Err(err) => {
- gum::warn!(target: LOG_TARGET, ?err, "Failed writing db entry");
- },
- }
- }
- }
-
- /// Initialize a new session info cache with the given window size and
- /// initial data.
- /// This is only used in `approval voting` tests.
- pub fn with_session_info(
- earliest_session: SessionIndex,
- session_info: Vec,
- ) -> Self {
- RollingSessionWindow {
- earliest_session,
- session_info,
- window_size: SESSION_WINDOW_SIZE,
- db_params: None,
- }
- }
-
- /// Access the session info for the given session index, if stored within the window.
- pub fn session_info(&self, index: SessionIndex) -> Option<&SessionInfo> {
- if index < self.earliest_session {
- None
- } else {
- self.session_info.get((index - self.earliest_session) as usize)
- }
- }
-
- /// Access the index of the earliest session.
- pub fn earliest_session(&self) -> SessionIndex {
- self.earliest_session
- }
-
- /// Access the index of the latest session.
- pub fn latest_session(&self) -> SessionIndex {
- self.earliest_session + (self.session_info.len() as SessionIndex).saturating_sub(1)
- }
-
- /// Returns `true` if `session_index` is contained in the window.
- pub fn contains(&self, session_index: SessionIndex) -> bool {
- session_index >= self.earliest_session() && session_index <= self.latest_session()
- }
-
- async fn earliest_non_finalized_block_session(
- sender: &mut Sender,
- ) -> Result
- where
- Sender: overseer::SubsystemSender
- + overseer::SubsystemSender,
- {
- let last_finalized_height = {
- let (tx, rx) = oneshot::channel();
- sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await;
- match rx.await {
- Ok(Ok(number)) => number,
- Ok(Err(e)) =>
- return Err(SessionsUnavailable {
- kind: SessionsUnavailableReason::ChainApi(e),
- info: None,
- }),
- Err(err) => {
- gum::warn!(
- target: LOG_TARGET,
- ?err,
- "Failed fetching last finalized block number"
- );
- return Err(SessionsUnavailable {
- kind: SessionsUnavailableReason::MissingLastFinalizedBlock,
- info: None,
- })
- },
- }
- };
-
- let (tx, rx) = oneshot::channel();
- // We want to get the session index for the child of the last finalized block.
- sender
- .send_message(ChainApiMessage::FinalizedBlockHash(last_finalized_height, tx))
- .await;
- let last_finalized_hash_parent = match rx.await {
- Ok(Ok(maybe_hash)) => maybe_hash,
- Ok(Err(e)) =>
- return Err(SessionsUnavailable {
- kind: SessionsUnavailableReason::ChainApi(e),
- info: None,
- }),
- Err(err) => {
- gum::warn!(target: LOG_TARGET, ?err, "Failed fetching last finalized block hash");
- return Err(SessionsUnavailable {
- kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash(
- last_finalized_height,
- ),
- info: None,
- })
- },
- };
-
- // Get the session in which the last finalized block was authored.
- if let Some(last_finalized_hash_parent) = last_finalized_hash_parent {
- let session =
- match get_session_index_for_child(sender, last_finalized_hash_parent).await {
- Ok(session_index) => session_index,
- Err(err) => {
- gum::warn!(
- target: LOG_TARGET,
- ?err,
- ?last_finalized_hash_parent,
- "Failed fetching session index"
- );
- return Err(err)
- },
- };
-
- Ok(session)
- } else {
- return Err(SessionsUnavailable {
- kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash(
- last_finalized_height,
- ),
- info: None,
- })
- }
- }
-
- /// When inspecting a new import notification, updates the session info cache to match
- /// the session of the imported block's child.
- ///
- /// this only needs to be called on heads where we are directly notified about import, as sessions do
- /// not change often and import notifications are expected to be typically increasing in session number.
- ///
- /// some backwards drift in session index is acceptable.
- pub async fn cache_session_info_for_head(
- &mut self,
- sender: &mut Sender,
- block_hash: Hash,
- ) -> Result
- where
- Sender: overseer::SubsystemSender
- + overseer::SubsystemSender,
- {
- let session_index = get_session_index_for_child(sender, block_hash).await?;
- let latest = self.latest_session();
-
- // Either cached or ancient.
- if session_index <= latest {
- return Ok(SessionWindowUpdate::Unchanged)
- }
-
- let earliest_non_finalized_block_session =
- Self::earliest_non_finalized_block_session(sender).await?;
-
- let old_window_start = self.earliest_session;
- let old_window_end = latest;
-
- // Ensure we keep sessions up to last finalized block by adjusting the window start.
- // This will increase the session window to cover the full unfinalized chain.
- let window_start = std::cmp::min(
- session_index.saturating_sub(self.window_size.get() - 1),
- earliest_non_finalized_block_session,
- );
-
- // Never look back past earliest session, since if sessions beyond were not needed or available
- // in the past remains valid for the future (window only advances forward).
- let mut window_start = std::cmp::max(window_start, self.earliest_session);
-
- let mut sessions = self.session_info.clone();
- let sessions_out_of_window = window_start.saturating_sub(old_window_start) as usize;
-
- let sessions = if sessions_out_of_window < sessions.len() {
- // Drop sessions based on how much the window advanced.
- sessions.split_off((window_start as usize).saturating_sub(old_window_start as usize))
- } else {
- // Window has jumped such that we need to fetch all sessions from on chain.
- Vec::new()
- };
-
- match extend_sessions_from_chain_state(
- sessions,
- sender,
- block_hash,
- &mut window_start,
- session_index,
- )
- .await
- {
- Err(kind) => Err(SessionsUnavailable {
- kind,
- info: Some(SessionsUnavailableInfo {
- window_start,
- window_end: session_index,
- block_hash,
- }),
- }),
- Ok(s) => {
- let update = SessionWindowUpdate::Advanced {
- prev_window_start: old_window_start,
- prev_window_end: old_window_end,
- new_window_start: window_start,
- new_window_end: session_index,
- };
-
- self.session_info = s;
-
- // we need to account for this case:
- // window_start ................................... session_index
- // old_window_start ........... latest
- let new_earliest = std::cmp::max(window_start, old_window_start);
- self.earliest_session = new_earliest;
-
- // Update current window in DB.
- self.db_save(StoredWindow {
- earliest_session: self.earliest_session,
- session_info: self.session_info.clone(),
- });
- Ok(update)
- },
- }
- }
-}
-
-// Returns the session index expected at any child of the `parent` block.
-//
-// Note: We could use `RuntimeInfo::get_session_index_for_child` here but it's
-// cleaner to just call the runtime API directly without needing to create an instance
-// of `RuntimeInfo`.
-async fn get_session_index_for_child(
- sender: &mut impl overseer::SubsystemSender,
- block_hash: Hash,
-) -> Result {
- let (s_tx, s_rx) = oneshot::channel();
-
- // We're requesting session index of a child to populate the cache in advance.
- sender
- .send_message(RuntimeApiMessage::Request(
- block_hash,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- ))
- .await;
-
- match s_rx.await {
- Ok(Ok(s)) => Ok(s),
- Ok(Err(e)) =>
- return Err(SessionsUnavailable {
- kind: SessionsUnavailableReason::RuntimeApi(e),
- info: None,
- }),
- Err(e) =>
- return Err(SessionsUnavailable {
- kind: SessionsUnavailableReason::RuntimeApiUnavailable(e),
- info: None,
- }),
- }
-}
-
-/// Attempts to extend db stored sessions with sessions missing between `start` and up to `end_inclusive`.
-/// Runtime session info fetching errors are ignored if that doesn't create a gap in the window.
-async fn extend_sessions_from_chain_state(
- stored_sessions: Vec,
- sender: &mut impl overseer::SubsystemSender,
- block_hash: Hash,
- window_start: &mut SessionIndex,
- end_inclusive: SessionIndex,
-) -> Result, SessionsUnavailableReason> {
- // Start from the db sessions.
- let mut sessions = stored_sessions;
- // We allow session fetch failures only if we won't create a gap in the window by doing so.
- // If `allow_failure` is set to true here, fetching errors are ignored until we get a first session.
- let mut allow_failure = sessions.is_empty();
-
- let start = *window_start + sessions.len() as u32;
-
- for i in start..=end_inclusive {
- let (tx, rx) = oneshot::channel();
- sender
- .send_message(RuntimeApiMessage::Request(
- block_hash,
- RuntimeApiRequest::SessionInfo(i, tx),
- ))
- .await;
-
- match rx.await {
- Ok(Ok(Some(session_info))) => {
- // We do not allow failure anymore after having at least 1 session in window.
- allow_failure = false;
- sessions.push(session_info);
- },
- Ok(Ok(None)) if !allow_failure => return Err(SessionsUnavailableReason::Missing(i)),
- Ok(Ok(None)) => {
- // Handle `allow_failure` true.
- // If we didn't get the session, we advance window start.
- *window_start += 1;
- gum::debug!(
- target: LOG_TARGET,
- session = ?i,
- "Session info missing from runtime."
- );
- },
- Ok(Err(e)) if !allow_failure => return Err(SessionsUnavailableReason::RuntimeApi(e)),
- Err(canceled) if !allow_failure =>
- return Err(SessionsUnavailableReason::RuntimeApiUnavailable(canceled)),
- Ok(Err(err)) => {
- // Handle `allow_failure` true.
- // If we didn't get the session, we advance window start.
- *window_start += 1;
- gum::debug!(
- target: LOG_TARGET,
- session = ?i,
- ?err,
- "Error while fetching session information."
- );
- },
- Err(err) => {
- // Handle `allow_failure` true.
- // If we didn't get the session, we advance window start.
- *window_start += 1;
- gum::debug!(
- target: LOG_TARGET,
- session = ?i,
- ?err,
- "Channel error while fetching session information."
- );
- },
- };
- }
-
- Ok(sessions)
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::database::kvdb_impl::DbAdapter;
- use assert_matches::assert_matches;
- use polkadot_node_subsystem::{
- messages::{AllMessages, AvailabilityRecoveryMessage},
- SubsystemContext,
- };
- use polkadot_node_subsystem_test_helpers::make_subsystem_context;
- use polkadot_primitives::Header;
- use sp_core::testing::TaskExecutor;
-
- const SESSION_DATA_COL: u32 = 0;
-
- const NUM_COLUMNS: u32 = 1;
-
- fn dummy_db_params() -> DatabaseParams {
- let db = kvdb_memorydb::create(NUM_COLUMNS);
- let db = DbAdapter::new(db, &[]);
- let db: Arc = Arc::new(db);
- DatabaseParams { db, db_column: SESSION_DATA_COL }
- }
-
- fn dummy_session_info(index: SessionIndex) -> SessionInfo {
- SessionInfo {
- validators: Default::default(),
- discovery_keys: Vec::new(),
- assignment_keys: Vec::new(),
- validator_groups: Default::default(),
- n_cores: index as _,
- zeroth_delay_tranche_width: index as _,
- relay_vrf_modulo_samples: index as _,
- n_delay_tranches: index as _,
- no_show_slots: index as _,
- needed_approvals: index as _,
- active_validator_indices: Vec::new(),
- dispute_period: 6,
- random_seed: [0u8; 32],
- }
- }
-
- fn cache_session_info_test(
- expected_start_session: SessionIndex,
- session: SessionIndex,
- window: Option,
- expect_requests_from: SessionIndex,
- db_params: Option,
- ) -> RollingSessionWindow {
- let db_params = db_params.unwrap_or(dummy_db_params());
-
- let header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 5,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let finalized_header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 0,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let pool = TaskExecutor::new();
- let (mut ctx, mut handle) =
- make_subsystem_context::(pool.clone());
-
- let hash = header.hash();
-
- let sender = ctx.sender();
-
- let test_fut = {
- Box::pin(async move {
- let window = match window {
- None =>
- RollingSessionWindow::new(sender.clone(), hash, db_params).await.unwrap(),
- Some(mut window) => {
- window.cache_session_info_for_head(sender, hash).await.unwrap();
- window
- },
- };
- assert_eq!(window.earliest_session, expected_start_session);
- assert_eq!(
- window.session_info,
- (expected_start_session..=session).map(dummy_session_info).collect::>(),
- );
-
- window
- })
- };
-
- let aux_fut = Box::pin(async move {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, hash);
- let _ = s_tx.send(Ok(session));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
- s_tx,
- )) => {
- let _ = s_tx.send(Ok(finalized_header.number));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
- block_number,
- s_tx,
- )) => {
- assert_eq!(block_number, finalized_header.number);
- let _ = s_tx.send(Ok(Some(finalized_header.hash())));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, finalized_header.hash());
- let _ = s_tx.send(Ok(session));
- }
- );
-
- for i in expect_requests_from..=session {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionInfo(j, s_tx),
- )) => {
- assert_eq!(h, hash);
- assert_eq!(i, j);
- let _ = s_tx.send(Ok(Some(dummy_session_info(i))));
- }
- );
- }
- });
-
- let (window, _) = futures::executor::block_on(futures::future::join(test_fut, aux_fut));
- window
- }
-
- #[test]
- fn cache_session_info_start_empty_db() {
- let db_params = dummy_db_params();
-
- let window = cache_session_info_test(
- (10 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 10,
- None,
- (10 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- Some(db_params.clone()),
- );
-
- let window = cache_session_info_test(
- (11 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 11,
- Some(window),
- 11,
- None,
- );
- assert_eq!(window.session_info.len(), SESSION_WINDOW_SIZE.get() as usize);
-
- cache_session_info_test(
- (11 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 12,
- None,
- 12,
- Some(db_params),
- );
- }
-
- #[test]
- fn cache_session_info_first_early() {
- cache_session_info_test(0, 1, None, 0, None);
- }
-
- #[test]
- fn cache_session_info_does_not_underflow() {
- let window = RollingSessionWindow {
- earliest_session: 1,
- session_info: vec![dummy_session_info(1)],
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(dummy_db_params()),
- };
-
- cache_session_info_test(1, 2, Some(window), 2, None);
- }
-
- #[test]
- fn cache_session_window_contains() {
- let window = RollingSessionWindow {
- earliest_session: 10,
- session_info: vec![dummy_session_info(1)],
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(dummy_db_params()),
- };
-
- assert!(!window.contains(0));
- assert!(!window.contains(10 + SESSION_WINDOW_SIZE.get()));
- assert!(!window.contains(11));
- assert!(!window.contains(10 + SESSION_WINDOW_SIZE.get() - 1));
- }
-
- #[test]
- fn cache_session_info_first_late() {
- cache_session_info_test(
- (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 100,
- None,
- (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- None,
- );
- }
-
- #[test]
- fn cache_session_info_jump() {
- let window = RollingSessionWindow {
- earliest_session: 50,
- session_info: vec![
- dummy_session_info(50),
- dummy_session_info(51),
- dummy_session_info(52),
- ],
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(dummy_db_params()),
- };
-
- cache_session_info_test(
- (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 100,
- Some(window),
- (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- None,
- );
- }
-
- #[test]
- fn cache_session_info_roll_full() {
- let start = 99 - (SESSION_WINDOW_SIZE.get() - 1);
- let window = RollingSessionWindow {
- earliest_session: start,
- session_info: (start..=99).map(dummy_session_info).collect(),
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(dummy_db_params()),
- };
-
- cache_session_info_test(
- (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 100,
- Some(window),
- 100, // should only make one request.
- None,
- );
- }
-
- #[test]
- fn cache_session_info_roll_many_full_db() {
- let db_params = dummy_db_params();
- let start = 97 - (SESSION_WINDOW_SIZE.get() - 1);
- let window = RollingSessionWindow {
- earliest_session: start,
- session_info: (start..=97).map(dummy_session_info).collect(),
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(db_params.clone()),
- };
-
- cache_session_info_test(
- (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 100,
- Some(window),
- 98,
- None,
- );
-
- // We expect the session to be populated from DB, and only fetch 101 from on chain.
- cache_session_info_test(
- (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 101,
- None,
- 101,
- Some(db_params.clone()),
- );
-
- // Session warps in the future.
- let window = cache_session_info_test(195, 200, None, 195, Some(db_params));
-
- assert_eq!(window.session_info.len(), SESSION_WINDOW_SIZE.get() as usize);
- }
-
- #[test]
- fn cache_session_info_roll_many_full() {
- let start = 97 - (SESSION_WINDOW_SIZE.get() - 1);
- let window = RollingSessionWindow {
- earliest_session: start,
- session_info: (start..=97).map(dummy_session_info).collect(),
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(dummy_db_params()),
- };
-
- cache_session_info_test(
- (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
- 100,
- Some(window),
- 98,
- None,
- );
- }
-
- #[test]
- fn cache_session_info_roll_early() {
- let start = 0;
- let window = RollingSessionWindow {
- earliest_session: start,
- session_info: (0..=1).map(dummy_session_info).collect(),
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(dummy_db_params()),
- };
-
- cache_session_info_test(
- 0,
- 2,
- Some(window),
- 2, // should only make one request.
- None,
- );
- }
-
- #[test]
- fn cache_session_info_roll_many_early() {
- let start = 0;
- let window = RollingSessionWindow {
- earliest_session: start,
- session_info: (0..=1).map(dummy_session_info).collect(),
- window_size: SESSION_WINDOW_SIZE,
- db_params: Some(dummy_db_params()),
- };
-
- let actual_window_size = window.session_info.len() as u32;
-
- cache_session_info_test(0, 3, Some(window), actual_window_size, None);
- }
-
- #[test]
- fn db_load_works() {
- // Session index of the tip of our fake test chain.
- let session: SessionIndex = 100;
- let genesis_session: SessionIndex = 0;
-
- let header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 5,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let finalized_header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 0,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let finalized_header_clone = finalized_header.clone();
-
- let hash: sp_core::H256 = header.hash();
- let db_params = dummy_db_params();
- let db_params_clone = db_params.clone();
-
- let pool = TaskExecutor::new();
- let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
- let test_fut = {
- let sender = ctx.sender().clone();
- Box::pin(async move {
- let mut rsw =
- RollingSessionWindow::new(sender.clone(), hash, db_params_clone).await.unwrap();
-
- let session_info = rsw.session_info.clone();
- let earliest_session = rsw.earliest_session();
-
- assert_eq!(earliest_session, 0);
- assert_eq!(session_info.len(), 101);
-
- rsw.db_save(StoredWindow { earliest_session, session_info });
- })
- };
-
- let aux_fut = Box::pin(async move {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, hash);
- let _ = s_tx.send(Ok(session));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
- s_tx,
- )) => {
- let _ = s_tx.send(Ok(finalized_header.number));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
- block_number,
- s_tx,
- )) => {
- assert_eq!(block_number, finalized_header.number);
- let _ = s_tx.send(Ok(Some(finalized_header.hash())));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, finalized_header.hash());
- let _ = s_tx.send(Ok(0));
- }
- );
-
- // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch.
- for i in genesis_session..=session {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionInfo(j, s_tx),
- )) => {
- assert_eq!(h, hash);
- assert_eq!(i, j);
- let _ = s_tx.send(Ok(Some(dummy_session_info(i))));
- }
- );
- }
- });
-
- futures::executor::block_on(futures::future::join(test_fut, aux_fut));
-
- let pool = TaskExecutor::new();
- let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
- let test_fut = {
- Box::pin(async move {
- let sender = ctx.sender().clone();
- let res = RollingSessionWindow::new(sender, hash, db_params).await;
- let rsw = res.unwrap();
- assert_eq!(rsw.earliest_session, 0);
- assert_eq!(rsw.session_info.len(), 101);
- })
- };
-
- let aux_fut = Box::pin(async move {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, hash);
- let _ = s_tx.send(Ok(session));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
- s_tx,
- )) => {
- let _ = s_tx.send(Ok(finalized_header_clone.number));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
- block_number,
- s_tx,
- )) => {
- assert_eq!(block_number, finalized_header_clone.number);
- let _ = s_tx.send(Ok(Some(finalized_header_clone.hash())));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, finalized_header_clone.hash());
- let _ = s_tx.send(Ok(0));
- }
- );
- });
-
- futures::executor::block_on(futures::future::join(test_fut, aux_fut));
- }
-
- #[test]
- fn cache_session_fails_for_gap_in_window() {
- // Session index of the tip of our fake test chain.
- let session: SessionIndex = 100;
- let genesis_session: SessionIndex = 0;
-
- let header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 5,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let finalized_header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 0,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let pool = TaskExecutor::new();
- let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
- let hash = header.hash();
-
- let test_fut = {
- let sender = ctx.sender().clone();
- Box::pin(async move {
- let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await;
-
- assert!(res.is_err());
- })
- };
-
- let aux_fut = Box::pin(async move {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, hash);
- let _ = s_tx.send(Ok(session));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
- s_tx,
- )) => {
- let _ = s_tx.send(Ok(finalized_header.number));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
- block_number,
- s_tx,
- )) => {
- assert_eq!(block_number, finalized_header.number);
- let _ = s_tx.send(Ok(Some(finalized_header.hash())));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, finalized_header.hash());
- let _ = s_tx.send(Ok(0));
- }
- );
-
- // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch.
- // First 50 sessions are missing.
- for i in genesis_session..=50 {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionInfo(j, s_tx),
- )) => {
- assert_eq!(h, hash);
- assert_eq!(i, j);
- let _ = s_tx.send(Ok(None));
- }
- );
- }
- // next 10 sessions are present
- for i in 51..=60 {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionInfo(j, s_tx),
- )) => {
- assert_eq!(h, hash);
- assert_eq!(i, j);
- let _ = s_tx.send(Ok(Some(dummy_session_info(i))));
- }
- );
- }
- // gap of 1 session
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionInfo(j, s_tx),
- )) => {
- assert_eq!(h, hash);
- assert_eq!(61, j);
- let _ = s_tx.send(Ok(None));
- }
- );
- });
-
- futures::executor::block_on(futures::future::join(test_fut, aux_fut));
- }
-
- #[test]
- fn any_session_stretch_with_failure_allowed_for_unfinalized_chain() {
- // Session index of the tip of our fake test chain.
- let session: SessionIndex = 100;
- let genesis_session: SessionIndex = 0;
-
- let header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 5,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let finalized_header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 0,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let pool = TaskExecutor::new();
- let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
- let hash = header.hash();
-
- let test_fut = {
- let sender = ctx.sender().clone();
- Box::pin(async move {
- let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await;
- assert!(res.is_ok());
- let rsw = res.unwrap();
- // Since first 50 sessions are missing the earliest should be 50.
- assert_eq!(rsw.earliest_session, 50);
- assert_eq!(rsw.session_info.len(), 51);
- })
- };
-
- let aux_fut = Box::pin(async move {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, hash);
- let _ = s_tx.send(Ok(session));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
- s_tx,
- )) => {
- let _ = s_tx.send(Ok(finalized_header.number));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
- block_number,
- s_tx,
- )) => {
- assert_eq!(block_number, finalized_header.number);
- let _ = s_tx.send(Ok(Some(finalized_header.hash())));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, finalized_header.hash());
- let _ = s_tx.send(Ok(0));
- }
- );
-
- // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch.
- // We also test if failure is allowed for 50 first missing sessions.
- for i in genesis_session..=session {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionInfo(j, s_tx),
- )) => {
- assert_eq!(h, hash);
- assert_eq!(i, j);
-
- let _ = s_tx.send(Ok(if i < 50 {
- None
- } else {
- Some(dummy_session_info(i))
- }));
- }
- );
- }
- });
-
- futures::executor::block_on(futures::future::join(test_fut, aux_fut));
- }
-
- #[test]
- fn any_session_unavailable_for_caching_means_no_change() {
- let session: SessionIndex = 6;
- let start_session = session.saturating_sub(SESSION_WINDOW_SIZE.get() - 1);
-
- let header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 5,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let finalized_header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 0,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let pool = TaskExecutor::new();
- let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
- let hash = header.hash();
-
- let test_fut = {
- let sender = ctx.sender().clone();
- Box::pin(async move {
- let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await;
- assert!(res.is_err());
- })
- };
-
- let aux_fut = Box::pin(async move {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, hash);
- let _ = s_tx.send(Ok(session));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
- s_tx,
- )) => {
- let _ = s_tx.send(Ok(finalized_header.number));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
- block_number,
- s_tx,
- )) => {
- assert_eq!(block_number, finalized_header.number);
- let _ = s_tx.send(Ok(Some(finalized_header.hash())));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, finalized_header.hash());
- let _ = s_tx.send(Ok(session));
- }
- );
-
- for i in start_session..=session {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionInfo(j, s_tx),
- )) => {
- assert_eq!(h, hash);
- assert_eq!(i, j);
-
- let _ = s_tx.send(Ok(if i == session {
- None
- } else {
- Some(dummy_session_info(i))
- }));
- }
- );
- }
- });
-
- futures::executor::block_on(futures::future::join(test_fut, aux_fut));
- }
-
- #[test]
- fn request_session_info_for_genesis() {
- let session: SessionIndex = 0;
-
- let header = Header {
- digest: Default::default(),
- extrinsics_root: Default::default(),
- number: 0,
- state_root: Default::default(),
- parent_hash: Default::default(),
- };
-
- let pool = TaskExecutor::new();
- let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
- let hash = header.hash();
-
- let test_fut = {
- Box::pin(async move {
- let sender = ctx.sender().clone();
- let window =
- RollingSessionWindow::new(sender, hash, dummy_db_params()).await.unwrap();
-
- assert_eq!(window.earliest_session, session);
- assert_eq!(window.session_info, vec![dummy_session_info(session)]);
- })
- };
-
- let aux_fut = Box::pin(async move {
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, hash);
- let _ = s_tx.send(Ok(session));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
- s_tx,
- )) => {
- let _ = s_tx.send(Ok(header.number));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
- block_number,
- s_tx,
- )) => {
- assert_eq!(block_number, header.number);
- let _ = s_tx.send(Ok(Some(header.hash())));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionIndexForChild(s_tx),
- )) => {
- assert_eq!(h, header.hash());
- let _ = s_tx.send(Ok(session));
- }
- );
-
- assert_matches!(
- handle.recv().await,
- AllMessages::RuntimeApi(RuntimeApiMessage::Request(
- h,
- RuntimeApiRequest::SessionInfo(s, s_tx),
- )) => {
- assert_eq!(h, hash);
- assert_eq!(s, session);
-
- let _ = s_tx.send(Ok(Some(dummy_session_info(s))));
- }
- );
- });
-
- futures::executor::block_on(futures::future::join(test_fut, aux_fut));
- }
-}