Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collation-generation: Avoid using para_backing_state if runtime is ancient #4070

Merged
merged 8 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 47 additions & 26 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,12 @@ async fn handle_new_activations<Context>(
// follow the procedure from the guide:
// https://paritytech.github.io/polkadot-sdk/book/node/collators/collation-generation.html

// If there is no collation function provided, bail out early.
// Important: Lookahead collator and slot based collator do not use `CollatorFn`.
if config.collator.is_none() {
sandreim marked this conversation as resolved.
Show resolved Hide resolved
return Ok(())
}

let para_id = config.para_id;

let _overall_timer = metrics.time_new_activations();
Expand All @@ -232,9 +235,14 @@ async fn handle_new_activations<Context>(
// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();

// This assumption refers to all cores of the parachain, taking elastic scaling
// into account.
let mut para_assumption = None;
for (core_idx, core) in availability_cores.into_iter().enumerate() {
let scheduled_core = match core {
CoreState::Scheduled(scheduled_core) => scheduled_core,
// This nested assumption refers only to the core being iterated.
let (core_assumption, scheduled_core) = match core {
CoreState::Scheduled(scheduled_core) =>
(OccupiedCoreAssumption::Free, scheduled_core),
CoreState::Occupied(occupied_core) => match async_backing_params {
Some(params) if params.max_candidate_depth >= 1 => {
// maximum candidate depth when building on top of a block
Expand All @@ -257,7 +265,7 @@ async fn handle_new_activations<Context>(
};

match res {
Some(res) => res,
Some(res) => (OccupiedCoreAssumption::Included, res),
None => continue,
}
},
Expand Down Expand Up @@ -291,6 +299,10 @@ async fn handle_new_activations<Context>(
"core is not assigned to our para. Keep going.",
);
} else {
// This does not work for elastic scaling, but it should be enough for single
// core parachains. If async backing runtime is available we later override
// the assumption based on the `para_backing_state` API response.
para_assumption = Some(core_assumption);
// Accumulate cores for building collation(s) outside the loop.
cores_to_build_on.push(CoreIndex(core_idx as u32));
}
Expand All @@ -301,34 +313,43 @@ async fn handle_new_activations<Context>(
continue
}

let para_backing_state =
request_para_backing_state(relay_parent, config.para_id, ctx.sender())
.await
.await??
.ok_or(crate::error::Error::MissingParaBackingState)?;

// We are being very optimistic here, but one of the cores could pend availability some more
// block, ore even time out.
// For timeout assumption the collator can't really know because it doesn't receive bitfield
// gossip.
let assumption = if para_backing_state.pending_availability.is_empty() {
OccupiedCoreAssumption::Free
} else {
OccupiedCoreAssumption::Included
};
// If at least one core is assigned to us, `para_assumption` is `Some`.
let Some(mut para_assumption) = para_assumption else { continue };

// If it is none it means that neither async backing or elastic scaling (which
// depends on it) are supported. We'll use the `para_assumption` we got from
// iterating cores.
if async_backing_params.is_some() {
// We are being very optimistic here, but one of the cores could pend availability some
// more block, ore even time out.
// For timeout assumption the collator can't really know because it doesn't receive
// bitfield gossip.
let para_backing_state =
request_para_backing_state(relay_parent, config.para_id, ctx.sender())
.await
.await??
.ok_or(crate::error::Error::MissingParaBackingState)?;

// Override the assumption about the para's assigned cores.
para_assumption = if para_backing_state.pending_availability.is_empty() {
OccupiedCoreAssumption::Free
} else {
OccupiedCoreAssumption::Included
}
}

gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
?assumption,
our_para = %para_id,
?para_assumption,
"Occupied core(s) assumption",
);

let mut validation_data = match request_persisted_validation_data(
relay_parent,
config.para_id,
assumption,
para_id,
para_assumption,
ctx.sender(),
)
.await
Expand All @@ -339,7 +360,7 @@ async fn handle_new_activations<Context>(
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
our_para = %para_id,
"validation data is not available",
);
continue
Expand All @@ -348,8 +369,8 @@ async fn handle_new_activations<Context>(

let validation_code_hash = match obtain_validation_code_hash_with_assumption(
relay_parent,
config.para_id,
assumption,
para_id,
para_assumption,
ctx.sender(),
)
.await?
Expand All @@ -359,7 +380,7 @@ async fn handle_new_activations<Context>(
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
our_para = %para_id,
"validation code hash is not found.",
);
continue
Expand Down
90 changes: 77 additions & 13 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,56 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
OccupiedCoreAssumption::Included,
1,
pending_availability,
runtime_version,
)
.await;

virtual_overseer
});
}

#[test]
fn distribute_collation_for_occupied_core_pre_async_backing() {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
let total_cores = 3;

// Use runtime version before async backing
let runtime_version = RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT - 1;

let cores = (0..total_cores)
.into_iter()
.map(|_idx| CoreState::Scheduled(ScheduledCore { para_id, collator: None }))
.collect::<Vec<_>>();

let claim_queue = cores
.iter()
.enumerate()
.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
.collect::<BTreeMap<_, _>>();

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores,
runtime_version,
claim_queue,
)
.await;

helpers::handle_cores_processing_for_a_leaf(
&mut virtual_overseer,
activated_hash,
para_id,
// `CoreState` is `Free` => `OccupiedCoreAssumption` is `Free`
OccupiedCoreAssumption::Free,
total_cores,
vec![],
runtime_version,
)
.await;

Expand All @@ -826,6 +876,8 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
) {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
// Using latest runtime with the fancy claim queue exposed.
let runtime_version = RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT;

let cores = (0..3)
.into_iter()
Expand Down Expand Up @@ -863,8 +915,7 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores,
// Using latest runtime with the fancy claim queue exposed.
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
runtime_version,
claim_queue,
)
.await;
Expand All @@ -882,6 +933,7 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
},
total_cores,
pending_availability,
runtime_version,
)
.await;

Expand All @@ -901,6 +953,8 @@ fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_sc
) {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
// Using latest runtime with the fancy claim queue exposed.
let runtime_version = RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT;

let cores = (0..total_cores)
.into_iter()
Expand All @@ -921,8 +975,7 @@ fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_sc
activated_hash,
AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
cores,
// Using latest runtime with the fancy claim queue exposed.
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
runtime_version,
claim_queue,
)
.await;
Expand All @@ -935,6 +988,7 @@ fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_sc
OccupiedCoreAssumption::Free,
total_cores,
vec![],
runtime_version,
)
.await;

Expand Down Expand Up @@ -1074,6 +1128,13 @@ mod helpers {
}
);

let async_backing_response =
if runtime_version >= RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT {
Ok(async_backing_params)
} else {
Err(RuntimeApiError::NotSupported { runtime_api_name: "async_backing_params" })
};

assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
Expand All @@ -1083,7 +1144,7 @@ mod helpers {
),
)) => {
assert_eq!(hash, activated_hash);
let _ = tx.send(Ok(async_backing_params));
let _ = tx.send(async_backing_response);
}
);

Expand Down Expand Up @@ -1121,6 +1182,7 @@ mod helpers {
expected_occupied_core_assumption: OccupiedCoreAssumption,
cores_assigned: usize,
pending_availability: Vec<CandidatePendingAvailability>,
runtime_version: u32,
) {
// Expect no messages if no cores is assigned to the para
if cores_assigned == 0 {
Expand All @@ -1138,14 +1200,16 @@ mod helpers {
max_pov_size: 1024,
};

assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::ParaBackingState(p_id, tx))
) if parent == activated_hash && p_id == para_id => {
tx.send(Ok(Some(dummy_backing_state(pending_availability)))).unwrap();
}
);
if runtime_version >= RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::ParaBackingState(p_id, tx))
) if parent == activated_hash && p_id == para_id => {
tx.send(Ok(Some(dummy_backing_state(pending_availability)))).unwrap();
}
);
}

assert_matches!(
overseer_recv(virtual_overseer).await,
Expand Down
12 changes: 12 additions & 0 deletions prdoc/pr_4070.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Avoid using `para_backing_state` if runtime doesn't support async backing

doc:
- audience: Node Operator
description: |
Fixes https://github.com/paritytech/polkadot-sdk/issues/4067 which prevents collators to
upgrade to latest release (v1.10.0)

crates: [ ]
Loading