Skip to content

Commit

Permalink
paras-scheduler: Fix migration to V1 (#1969)
Browse files Browse the repository at this point in the history
The migration was missing to migrate `AvailabilityCores`. If this isn't
migrated, all parachains in the availability phase would stall until the
next session is started. This pull request fixes this by migrating this
data. Besides that it is doing some cosmetics.
  • Loading branch information
bkchr authored Oct 23, 2023
1 parent 38c3c62 commit f678b61
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 50 deletions.
3 changes: 2 additions & 1 deletion polkadot/runtime/parachains/src/assigner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ impl<T: Config> Pallet<T> {
fn is_bulk_core(core_idx: &CoreIndex) -> bool {
let parachain_cores =
<ParachainAssigner<T> as AssignmentProvider<BlockNumberFor<T>>>::session_core_count();
(0..parachain_cores).contains(&core_idx.0)

core_idx.0 < parachain_cores
}
}

Expand Down
4 changes: 2 additions & 2 deletions polkadot/runtime/parachains/src/assigner_parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod pallet {

impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
fn session_core_count() -> u32 {
<paras::Pallet<T>>::parachains().len() as u32
paras::Parachains::<T>::decode_len().unwrap_or(0) as u32
}

fn pop_assignment_for_core(
Expand All @@ -62,7 +62,7 @@ impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
max_availability_timeouts: 0,
// The next assignment already goes to the same [`ParaId`], this can be any number
// that's high enough to clear the time it takes to clear backing/availability.
ttl: BlockNumberFor::<T>::from(10u32),
ttl: 10u32.into(),
}
}
}
16 changes: 6 additions & 10 deletions polkadot/runtime/parachains/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,10 @@ impl<T: Config> Pallet<T> {
/// Moves all elements in the claimqueue forward.
fn move_claimqueue_forward() {
let mut cq = ClaimQueue::<T>::get();
for (_, core_queue) in cq.iter_mut() {
for core_queue in cq.values_mut() {
// First pop the finished claims from the front.
match core_queue.front() {
None => {},
Some(None) => {
core_queue.pop_front();
},
Some(_) => {},
if let Some(None) = core_queue.front() {
core_queue.pop_front();
}
}

Expand All @@ -628,9 +624,10 @@ impl<T: Config> Pallet<T> {

// This can only happen on new sessions at which we move all assignments back to the
// provider. Hence, there's nothing we need to do here.
if ValidatorGroups::<T>::get().is_empty() {
if ValidatorGroups::<T>::decode_len().map_or(true, |l| l == 0) {
return
}

let n_lookahead = Self::claimqueue_lookahead();
let n_session_cores = T::AssignmentProvider::session_core_count();
let cq = ClaimQueue::<T>::get();
Expand Down Expand Up @@ -686,8 +683,7 @@ impl<T: Config> Pallet<T> {

fn add_to_claimqueue(core_idx: CoreIndex, pe: ParasEntry<BlockNumberFor<T>>) {
ClaimQueue::<T>::mutate(|la| {
let la_deque = la.entry(core_idx).or_insert_with(|| VecDeque::new());
la_deque.push_back(Some(pe));
la.entry(core_idx).or_default().push_back(Some(pe));
});
}

Expand Down
111 changes: 74 additions & 37 deletions polkadot/runtime/parachains/src/scheduler/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,45 @@ use frame_support::{
mod v0 {
use super::*;

use primitives::CollatorId;
use primitives::{CollatorId, Id};

#[storage_alias]
pub(super) type Scheduled<T: Config> = StorageValue<Pallet<T>, Vec<CoreAssignment>, ValueQuery>;

#[derive(Encode, Decode)]
pub struct QueuedParathread {
claim: primitives::ParathreadEntry,
core_offset: u32,
}
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct ParathreadClaim(pub Id, pub CollatorId);

#[derive(Encode, Decode, Default)]
pub struct ParathreadClaimQueue {
queue: Vec<QueuedParathread>,
next_core_offset: u32,
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct ParathreadEntry {
/// The claim.
pub claim: ParathreadClaim,
/// Number of retries.
pub retries: u32,
}

// Only here to facilitate the migration.
impl ParathreadClaimQueue {
pub fn len(self) -> usize {
self.queue.len()
}
/// What is occupying a specific availability core.
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub enum CoreOccupied {
/// A parathread.
Parathread(ParathreadEntry),
/// A parachain.
Parachain,
}

/// The actual type isn't important, as we only delete the key in the state.
#[storage_alias]
pub(super) type ParathreadQueue<T: Config> =
StorageValue<Pallet<T>, ParathreadClaimQueue, ValueQuery>;
pub(crate) type AvailabilityCores<T: Config> =
StorageValue<Pallet<T>, Vec<Option<CoreOccupied>>, ValueQuery>;

/// The actual type isn't important, as we only delete the key in the state.
#[storage_alias]
pub(super) type ParathreadClaimIndex<T: Config> =
StorageValue<Pallet<T>, Vec<ParaId>, ValueQuery>;
pub(super) type ParathreadQueue<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;

#[storage_alias]
pub(super) type ParathreadClaimIndex<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;

/// The assignment type.
#[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
Expand Down Expand Up @@ -108,30 +117,36 @@ pub mod v1 {

#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
log::trace!(
let n: u32 = v0::Scheduled::<T>::get().len() as u32 +
v0::AvailabilityCores::<T>::get().iter().filter(|c| c.is_some()).count() as u32;

log::info!(
target: crate::scheduler::LOG_TARGET,
"Scheduled before migration: {}",
v0::Scheduled::<T>::get().len()
"Number of scheduled and waiting for availability before: {n}",
);

let bytes = u32::to_be_bytes(v0::Scheduled::<T>::get().len() as u32);

Ok(bytes.to_vec())
Ok(n.encode())
}

#[cfg(feature = "try-runtime")]
fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
log::info!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");

ensure!(
v0::Scheduled::<T>::get().len() == 0,
v0::Scheduled::<T>::get().is_empty(),
"Scheduled should be empty after the migration"
);

let sched_len = u32::from_be_bytes(state.try_into().unwrap());
let expected_len = u32::decode(&mut &state[..]).unwrap();
let availability_cores_waiting = super::AvailabilityCores::<T>::get()
.iter()
.filter(|c| !matches!(c, CoreOccupied::Free))
.count();

ensure!(
Pallet::<T>::claimqueue_len() as u32 == sched_len,
"Scheduled completely moved to ClaimQueue after migration"
Pallet::<T>::claimqueue_len() as u32 + availability_cores_waiting as u32 ==
expected_len,
"ClaimQueue and AvailabilityCores should have the correct length",
);

Ok(())
Expand All @@ -142,11 +157,8 @@ pub mod v1 {
pub fn migrate_to_v1<T: crate::scheduler::Config>() -> Weight {
let mut weight: Weight = Weight::zero();

let pq = v0::ParathreadQueue::<T>::take();
let pq_len = pq.len() as u64;

let pci = v0::ParathreadClaimIndex::<T>::take();
let pci_len = pci.len() as u64;
v0::ParathreadQueue::<T>::kill();
v0::ParathreadClaimIndex::<T>::kill();

let now = <frame_system::Pallet<T>>::block_number();
let scheduled = v0::Scheduled::<T>::take();
Expand All @@ -158,10 +170,35 @@ pub fn migrate_to_v1<T: crate::scheduler::Config>() -> Weight {
Pallet::<T>::add_to_claimqueue(core_idx, pe);
}

let parachains = paras::Pallet::<T>::parachains();
let availability_cores = v0::AvailabilityCores::<T>::take();
let mut new_availability_cores = Vec::new();

for (core_index, core) in availability_cores.into_iter().enumerate() {
let new_core = if let Some(core) = core {
match core {
v0::CoreOccupied::Parachain => CoreOccupied::Paras(ParasEntry::new(
Assignment::new(parachains[core_index]),
now,
)),
v0::CoreOccupied::Parathread(entry) =>
CoreOccupied::Paras(ParasEntry::new(Assignment::new(entry.claim.0), now)),
}
} else {
CoreOccupied::Free
};

new_availability_cores.push(new_core);
}

super::AvailabilityCores::<T>::set(new_availability_cores);

// 2x as once for Scheduled and once for Claimqueue
weight = weight.saturating_add(T::DbWeight::get().reads_writes(2 * sched_len, 2 * sched_len));
weight = weight.saturating_add(T::DbWeight::get().reads_writes(pq_len, pq_len));
weight = weight.saturating_add(T::DbWeight::get().reads_writes(pci_len, pci_len));
// reading parachains + availability_cores, writing AvailabilityCores
weight = weight.saturating_add(T::DbWeight::get().reads_writes(2, 1));
// 2x kill
weight = weight.saturating_add(T::DbWeight::get().writes(2));

weight
}

0 comments on commit f678b61

Please sign in to comment.