Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Look further in ClaimQueue during collation generation #4049

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 97 additions & 52 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ use polkadot_node_subsystem_util::{
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScheduledCore, ValidationCodeHash,
PersistedValidationData, ValidationCodeHash,
};
use sp_core::crypto::Pair;
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};

mod error;

Expand Down Expand Up @@ -90,10 +90,11 @@ impl CollationGenerationSubsystem {
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
/// Otherwise, most are logged and then discarded.
async fn run<Context>(mut self, mut ctx: Context) {
let mut last_spawned_work = HashSet::<CoreIndex>::new();
loop {
select! {
incoming = ctx.recv().fuse() => {
if self.handle_incoming::<Context>(incoming, &mut ctx).await {
if self.handle_incoming::<Context>(incoming, &mut ctx, &mut last_spawned_work).await {
break;
}
},
Expand All @@ -110,6 +111,7 @@ impl CollationGenerationSubsystem {
&mut self,
incoming: SubsystemResult<FromOrchestra<<Context as SubsystemContext>::Message>>,
ctx: &mut Context,
last_spawned_work: &mut HashSet<CoreIndex>,
) -> bool {
match incoming {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
Expand All @@ -124,6 +126,7 @@ impl CollationGenerationSubsystem {
activated.into_iter().map(|v| v.hash),
ctx,
metrics,
last_spawned_work,
)
.await
{
Expand Down Expand Up @@ -202,6 +205,7 @@ async fn handle_new_activations<Context>(
activated: impl IntoIterator<Item = Hash>,
ctx: &mut Context,
metrics: Metrics,
last_spawned_work: &mut HashSet<CoreIndex>,
) -> crate::error::Result<()> {
// follow the procedure from the guide:
// https://paritytech.github.io/polkadot-sdk/book/node/collators/collation-generation.html
Expand All @@ -228,74 +232,115 @@ async fn handle_new_activations<Context>(
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent)
.await
.map_err(crate::error::Error::UtilRuntime)?;
let max_candidate_depth =
async_backing_params.map(|params| params.max_candidate_depth).unwrap_or(0);

// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();

for (core_idx, core) in availability_cores.into_iter().enumerate() {
let scheduled_core = match core {
CoreState::Scheduled(scheduled_core) => scheduled_core,
CoreState::Occupied(occupied_core) => match async_backing_params {
Some(params) if params.max_candidate_depth >= 1 => {
// maximum candidate depth when building on top of a block
// pending availability is necessarily 1 - the depth of the
// pending block is 0 so the child has depth 1.

// Use claim queue if available, or fallback to `next_up_on_available`
let res = match maybe_claim_queue {
Some(ref claim_queue) => {
// read what's in the claim queue for this core at depth 0.
claim_queue
.get_claim_for(CoreIndex(core_idx as u32), 0)
.map(|para_id| ScheduledCore { para_id, collator: None })
},
None => {
// Runtime doesn't support claim queue runtime api. Fallback to
// `next_up_on_available`
occupied_core.next_up_on_available
},
};
// Check if an assignment is for our para id
let our_para_id = |next_scheduled_para_id| {
if next_scheduled_para_id != config.para_id {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
relay_parent = ?relay_parent,
?next_scheduled_para_id,
"core is not assigned to our para id",
);
return None
}

match res {
Some(res) => res,
None => continue,
}
},
_ => {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
relay_parent = ?relay_parent,
"core is occupied. Keep going.",
);
continue
},
Some(next_scheduled_para_id)
};

// Get the second scheduled `ParaId` from the claim queue for the core and return true
// if it matches the configured `ParaId`. With async backing enabled (assuming
// max_candidate_depth is bigger than 1) we want to build a candidate for this slot in
// advance as it is the one most likely to be included before timing out. Get it before
// the scheduled one because `core` is not consumed for this operation.
let our_para_scheduled_at_depth_one = match &core {
CoreState::Occupied(_) if max_candidate_depth > 1 =>
maybe_claim_queue.as_ref().and_then(|claim_queue| {
claim_queue.get_claim_for(CoreIndex(core_idx as u32), 1)
}),
CoreState::Scheduled(_) | CoreState::Occupied(_) | CoreState::Free => None,
}
.and_then(our_para_id)
.is_some();

// Then get the first scheduled `ParaId`. We will build a candidate for this slot in two
// cases:
// 1. On startup. In this case we will build two candidates. The one built here will
// probably time out but still we want it prepared for the second attempt.
// 2. When async backing is disabled we don't use the claim queue so this will be the
// only slot we will build a candidate for.
let maybe_scheduled_next = match core {
CoreState::Scheduled(core) => Some(core.para_id),
CoreState::Occupied(occupied_core) if max_candidate_depth > 0 => {
// maximum candidate depth when building on top of a block
// pending availability is necessarily 1 - the depth of the
// pending block is 0 so the child has depth 1.

// Use claim queue if available, or fallback to `next_up_on_available`
match maybe_claim_queue {
Some(ref claim_queue) => {
// read what's in the claim queue for this core at depth 0.
claim_queue.get_claim_for(CoreIndex(core_idx as u32), 0)
},
None => {
// Runtime doesn't support claim queue runtime api. Fallback to
// `next_up_on_available`
occupied_core
.next_up_on_available
.map(|occupied_core| occupied_core.para_id)
},
}
},
CoreState::Occupied(_) => {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
relay_parent = ?relay_parent,
"core is occupied",
);
None
},
CoreState::Free => {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
"core is not assigned to any para. Keep going.",
"core is not assigned to any para",
);
continue
None
},
};
}
.and_then(our_para_id);

if scheduled_core.para_id != config.para_id {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
relay_parent = ?relay_parent,
our_para = %config.para_id,
their_para = %scheduled_core.para_id,
"core is not assigned to our para. Keep going.",
);
// Check if `maybe_scheduled` was already processed in a previous iteration
let our_para_scheduled_at_depth_zero = if maybe_scheduled_next.is_some() {
last_spawned_work.contains(&CoreIndex(core_idx as u32))
} else {
// Accumulate cores for building collation(s) outside the loop.
false
};

if !our_para_scheduled_at_depth_zero && !our_para_scheduled_at_depth_one {
// nothing to do
continue
}

if our_para_scheduled_at_depth_zero {
cores_to_build_on.push(CoreIndex(core_idx as u32));
}
if our_para_scheduled_at_depth_one {
cores_to_build_on.push(CoreIndex(core_idx as u32));
}
}

// Save the work we are about to spawn for the next run
*last_spawned_work = HashSet::from_iter(cores_to_build_on.iter().cloned());

// Skip to next relay parent if there is no core assigned to us.
if cores_to_build_on.is_empty() {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ async fn distribute_collation<Context>(

// Determine which core(s) the para collated-on is assigned to.
// If it is not scheduled then ignore the message.
let (our_cores, num_cores) =
let (our_cores, num_cores) = //TODO: Use claimqueue here?
match determine_cores(ctx.sender(), id, candidate_relay_parent, relay_parent_mode).await? {
(cores, _num_cores) if cores.is_empty() => {
gum::warn!(
Expand Down
Loading