From e58e854a328aeac9a8876476be21918c21707e64 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 20 Mar 2024 08:55:58 +0200 Subject: [PATCH] Expose `ClaimQueue` via a runtime api and use it in `collation-generation` (#3580) The PR adds two things: 1. Runtime API exposing the whole claim queue 2. Consumes the API in `collation-generation` to fetch the next scheduled `ParaEntry` for an occupied core. Related to https://github.com/paritytech/polkadot-sdk/issues/1797 --- Cargo.lock | 1 + .../src/blockchain_rpc_client.rs | 13 +- .../src/rpc_client.rs | 17 +- polkadot/node/collation-generation/Cargo.toml | 1 + polkadot/node/collation-generation/src/lib.rs | 74 +++- .../node/collation-generation/src/tests.rs | 329 +++++++++++++++++- polkadot/node/core/runtime-api/src/cache.rs | 29 +- polkadot/node/core/runtime-api/src/lib.rs | 11 + polkadot/node/core/runtime-api/src/tests.rs | 18 +- polkadot/node/subsystem-types/src/messages.rs | 10 +- .../subsystem-types/src/runtime_client.rs | 22 +- polkadot/node/subsystem-util/src/lib.rs | 8 +- polkadot/primitives/src/runtime_api.rs | 17 +- .../src/runtime_api_impl/vstaging.rs | 21 +- prdoc/pr_3580.prdoc | 13 + 15 files changed, 532 insertions(+), 52 deletions(-) create mode 100644 prdoc/pr_3580.prdoc diff --git a/Cargo.lock b/Cargo.lock index 5c0066e97283..ee813b602183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12396,6 +12396,7 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "polkadot-primitives-test-helpers", + "rstest", "sp-core", "sp-keyring", "sp-maybe-compressed-blob", diff --git a/cumulus/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs b/cumulus/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs index ab56b62c4ca5..8d8a2920b4ef 100644 --- a/cumulus/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs +++ b/cumulus/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs @@ -14,7 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use std::pin::Pin; +use std::{ + collections::{BTreeMap, VecDeque}, + pin::Pin, +}; use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; use cumulus_relay_chain_rpc_interface::RelayChainRpcClient; @@ -25,6 +28,7 @@ use polkadot_primitives::{ async_backing::{AsyncBackingParams, BackingState}, slashing, vstaging::{ApprovalVotingParams, NodeFeatures}, + CoreIndex, }; use sc_authority_discovery::{AuthorityDiscovery, Error as AuthorityDiscoveryError}; use sc_client_api::AuxStore; @@ -442,6 +446,13 @@ impl RuntimeApiSubsystemClient for BlockChainRpcClient { async fn node_features(&self, at: Hash) -> Result { Ok(self.rpc_client.parachain_host_node_features(at).await?) } + + async fn claim_queue( + &self, + at: Hash, + ) -> Result>, ApiError> { + Ok(self.rpc_client.parachain_host_claim_queue(at).await?) + } } #[async_trait::async_trait] diff --git a/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs b/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs index 6578210a259c..8cf5ccf0c707 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -24,6 +24,7 @@ use jsonrpsee::{ }; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; +use std::collections::VecDeque; use tokio::sync::mpsc::Sender as TokioSender; use parity_scale_codec::{Decode, Encode}; @@ -34,10 +35,10 @@ use cumulus_primitives_core::{ slashing, vstaging::{ApprovalVotingParams, NodeFeatures}, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, - Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption, - PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, - ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams, + GroupRotationInfo, Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, + OccupiedCoreAssumption, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, + ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; @@ -647,6 +648,14 @@ impl RelayChainRpcClient { .await } + pub async fn parachain_host_claim_queue( + &self, + at: RelayHash, + ) -> Result>, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_claim_queue", at, None::<()>) + .await + } + pub async fn validation_code_hash( &self, at: RelayHash, diff --git a/polkadot/node/collation-generation/Cargo.toml b/polkadot/node/collation-generation/Cargo.toml index 8df0c2b1edae..f72af87c15ed 100644 --- a/polkadot/node/collation-generation/Cargo.toml +++ b/polkadot/node/collation-generation/Cargo.toml @@ -26,4 +26,5 @@ parity-scale-codec = { version = "3.6.1", default-features = false, features = [ polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../primitives/test-helpers" } assert_matches = "1.4.0" +rstest = "0.18.2" sp-keyring = { path = "../../../substrate/primitives/keyring" } diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index a89351628a08..3b1a8f5ff230 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -38,21 +38,25 @@ use polkadot_node_primitives::{ SubmitCollationParams, }; use polkadot_node_subsystem::{ - messages::{CollationGenerationMessage, CollatorProtocolMessage}, + messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest}, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem, SubsystemContext, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::{ - request_async_backing_params, request_availability_cores, request_persisted_validation_data, - request_validation_code, request_validation_code_hash, request_validators, + has_required_runtime, request_async_backing_params, request_availability_cores, + request_claim_queue, request_persisted_validation_data, request_validation_code, + request_validation_code_hash, request_validators, }; use polkadot_primitives::{ collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt, - CollatorPair, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData, - ValidationCodeHash, + CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, + PersistedValidationData, ScheduledCore, ValidationCodeHash, }; use sp_core::crypto::Pair; -use std::sync::Arc; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::Arc, +}; mod error; @@ -223,6 +227,7 @@ async fn handle_new_activations( let availability_cores = availability_cores??; let n_validators = validators??.len(); let async_backing_params = async_backing_params?.ok(); + let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?; for (core_idx, core) in availability_cores.into_iter().enumerate() { let _availability_core_timer = metrics.time_new_activations_availability_core(); @@ -239,10 +244,25 @@ async fn handle_new_activations( // TODO [now]: this assumes that next up == current. // in practice we should only set `OccupiedCoreAssumption::Included` // when the candidate occupying the core is also of the same para. - if let Some(scheduled) = occupied_core.next_up_on_available { - (scheduled, OccupiedCoreAssumption::Included) - } else { - continue + let res = match maybe_claim_queue { + Some(ref claim_queue) => { + // read what's in the claim queue for this core + fetch_next_scheduled_on_core( + claim_queue, + CoreIndex(core_idx as u32), + ) + }, + None => { + // Runtime doesn't support claim queue runtime api. Fallback to + // `next_up_on_available` + occupied_core.next_up_on_available + }, + } + .map(|scheduled| (scheduled, OccupiedCoreAssumption::Included)); + + match res { + Some(res) => res, + None => continue, } }, _ => { @@ -600,3 +620,37 @@ fn erasure_root( let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?; Ok(polkadot_erasure_coding::branches(&chunks).root()) } + +// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)` +// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller. +async fn fetch_claim_queue( + sender: &mut impl overseer::CollationGenerationSenderTrait, + relay_parent: Hash, +) -> crate::error::Result>>> { + if has_required_runtime( + sender, + relay_parent, + RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT, + ) + .await + { + let res = request_claim_queue(relay_parent, sender).await.await??; + Ok(Some(res)) + } else { + gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`"); + Ok(None) + } +} + +// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`. +// This function is supposed to be used in `handle_new_activations` hence the return type. +fn fetch_next_scheduled_on_core( + claim_queue: &BTreeMap>, + core_idx: CoreIndex, +) -> Option { + claim_queue + .get(&core_idx)? + .front() + .cloned() + .map(|para_id| ScheduledCore { para_id, collator: None }) +} diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index eb0ede6ef6b1..9b16980e6af4 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -25,15 +25,18 @@ use polkadot_node_primitives::{BlockData, Collation, CollationResult, MaybeCompr use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}, + ActivatedLeaf, }; use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::{ - CollatorPair, HeadData, Id as ParaId, PersistedValidationData, ScheduledCore, ValidationCode, + AsyncBackingParams, CollatorPair, HeadData, Id as ParaId, Id, PersistedValidationData, + ScheduledCore, ValidationCode, }; +use rstest::rstest; use sp_keyring::sr25519::Keyring as Sr25519Keyring; use std::pin::Pin; -use test_helpers::{dummy_hash, dummy_head_data, dummy_validator}; +use test_helpers::{dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator}; type VirtualOverseer = TestSubsystemContextHandle; @@ -132,8 +135,10 @@ fn scheduled_core_for>(para_id: Id) -> ScheduledCore { ScheduledCore { para_id: para_id.into(), collator: None } } -#[test] -fn requests_availability_per_relay_parent() { +#[rstest] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)] +fn requests_availability_per_relay_parent(#[case] runtime_version: u32) { let activated_hashes: Vec = vec![[1; 32].into(), [4; 32].into(), [9; 32].into(), [16; 32].into()]; @@ -159,6 +164,18 @@ fn requests_availability_per_relay_parent() { ))) => { tx.send(Err(RuntimeApiError::NotSupported { runtime_api_name: "doesnt_matter" })).unwrap(); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Version(tx), + ))) => { + tx.send(Ok(runtime_version)).unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ClaimQueue(tx), + ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { + tx.send(Ok(BTreeMap::new())).unwrap(); + }, Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg), } } @@ -184,8 +201,10 @@ fn requests_availability_per_relay_parent() { assert_eq!(requested_availability_cores, activated_hashes); } -#[test] -fn requests_validation_data_for_scheduled_matches() { +#[rstest] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)] +fn requests_validation_data_for_scheduled_matches(#[case] runtime_version: u32) { let activated_hashes: Vec = vec![ Hash::repeat_byte(1), Hash::repeat_byte(4), @@ -242,6 +261,18 @@ fn requests_validation_data_for_scheduled_matches() { })) .unwrap(); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Version(tx), + ))) => { + tx.send(Ok(runtime_version)).unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ClaimQueue(tx), + ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { + tx.send(Ok(BTreeMap::new())).unwrap(); + }, Some(msg) => { panic!("didn't expect any other overseer requests; got {:?}", msg) }, @@ -271,8 +302,10 @@ fn requests_validation_data_for_scheduled_matches() { assert_eq!(requested_validation_data, vec![[4; 32].into()]); } -#[test] -fn sends_distribute_collation_message() { +#[rstest] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)] +fn sends_distribute_collation_message(#[case] runtime_version: u32) { let activated_hashes: Vec = vec![ Hash::repeat_byte(1), Hash::repeat_byte(4), @@ -339,6 +372,18 @@ fn sends_distribute_collation_message() { })) .unwrap(); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Version(tx), + ))) => { + tx.send(Ok(runtime_version)).unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ClaimQueue(tx), + ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { + tx.send(Ok(BTreeMap::new())).unwrap(); + }, Some(msg @ AllMessages::CollatorProtocol(_)) => { inner_to_collator_protocol.lock().await.push(msg); }, @@ -423,8 +468,10 @@ fn sends_distribute_collation_message() { } } -#[test] -fn fallback_when_no_validation_code_hash_api() { +#[rstest] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)] +fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) { // This is a variant of the above test, but with the validation code hash API disabled. let activated_hashes: Vec = vec![ @@ -501,9 +548,22 @@ fn fallback_when_no_validation_code_hash_api() { })) .unwrap(); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Version(tx), + ))) => { + tx.send(Ok(runtime_version)).unwrap(); + }, Some(msg @ AllMessages::CollatorProtocol(_)) => { inner_to_collator_protocol.lock().await.push(msg); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ClaimQueue(tx), + ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { + let res = BTreeMap::>::new(); + tx.send(Ok(res)).unwrap(); + }, Some(msg) => { panic!("didn't expect any other overseer requests; got {:?}", msg) }, @@ -635,3 +695,252 @@ fn submit_collation_leads_to_distribution() { virtual_overseer }); } + +// There is one core in `Occupied` state and async backing is enabled. On new head activation +// `CollationGeneration` should produce and distribute a new collation. +#[rstest] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)] +fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] runtime_version: u32) { + let activated_hash: Hash = [1; 32].into(); + let para_id = ParaId::from(5); + + // One core, in occupied state. The data in `CoreState` and `ClaimQueue` should match. + let cores: Vec = vec![CoreState::Occupied(polkadot_primitives::OccupiedCore { + next_up_on_available: Some(ScheduledCore { para_id, collator: None }), + occupied_since: 1, + time_out_at: 10, + next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }), + availability: Default::default(), // doesn't matter + group_responsible: polkadot_primitives::GroupIndex(0), + candidate_hash: Default::default(), + candidate_descriptor: dummy_candidate_descriptor(dummy_hash()), + })]; + let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]); + + test_harness(|mut virtual_overseer| async move { + helpers::initialize_collator(&mut virtual_overseer, para_id).await; + helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + helpers::handle_runtime_calls_on_new_head_activation( + &mut virtual_overseer, + activated_hash, + AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 }, + cores, + runtime_version, + claim_queue, + ) + .await; + helpers::handle_core_processing_for_a_leaf( + &mut virtual_overseer, + activated_hash, + para_id, + // `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included` + OccupiedCoreAssumption::Included, + ) + .await; + + virtual_overseer + }); +} + +// There is one core in `Occupied` state and async backing is disabled. On new head activation +// no new collation should be generated. +#[rstest] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)] +fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled( + #[case] runtime_version: u32, +) { + let activated_hash: Hash = [1; 32].into(); + let para_id = ParaId::from(5); + + // One core, in occupied state. The data in `CoreState` and `ClaimQueue` should match. + let cores: Vec = vec![CoreState::Occupied(polkadot_primitives::OccupiedCore { + next_up_on_available: Some(ScheduledCore { para_id, collator: None }), + occupied_since: 1, + time_out_at: 10, + next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }), + availability: Default::default(), // doesn't matter + group_responsible: polkadot_primitives::GroupIndex(0), + candidate_hash: Default::default(), + candidate_descriptor: dummy_candidate_descriptor(dummy_hash()), + })]; + let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]); + + test_harness(|mut virtual_overseer| async move { + helpers::initialize_collator(&mut virtual_overseer, para_id).await; + helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + helpers::handle_runtime_calls_on_new_head_activation( + &mut virtual_overseer, + activated_hash, + AsyncBackingParams { max_candidate_depth: 0, allowed_ancestry_len: 0 }, + cores, + runtime_version, + claim_queue, + ) + .await; + + virtual_overseer + }); +} + +mod helpers { + use super::*; + + // Sends `Initialize` with a collator config + pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) { + virtual_overseer + .send(FromOrchestra::Communication { + msg: CollationGenerationMessage::Initialize(test_config(para_id)), + }) + .await; + } + + // Sends `ActiveLeaves` for a single leaf with the specified hash. Block number is hardcoded. + pub async fn activate_new_head(virtual_overseer: &mut VirtualOverseer, activated_hash: Hash) { + virtual_overseer + .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: Some(ActivatedLeaf { + hash: activated_hash, + number: 10, + unpin_handle: polkadot_node_subsystem_test_helpers::mock::dummy_unpin_handle( + activated_hash, + ), + span: Arc::new(overseer::jaeger::Span::Disabled), + }), + ..Default::default() + }))) + .await; + } + + // Handle all runtime calls performed in `handle_new_activations`. Conditionally expects a + // `CLAIM_QUEUE_RUNTIME_REQUIREMENT` call if the passed `runtime_version` is greater or equal to + // `CLAIM_QUEUE_RUNTIME_REQUIREMENT` + pub async fn handle_runtime_calls_on_new_head_activation( + virtual_overseer: &mut VirtualOverseer, + activated_hash: Hash, + async_backing_params: AsyncBackingParams, + cores: Vec, + runtime_version: u32, + claim_queue: BTreeMap>, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx))) => { + assert_eq!(hash, activated_hash); + let _ = tx.send(Ok(cores)); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::Validators(tx))) => { + assert_eq!(hash, activated_hash); + let _ = tx.send(Ok(vec![ + Sr25519Keyring::Alice.public().into(), + Sr25519Keyring::Bob.public().into(), + Sr25519Keyring::Charlie.public().into(), + ])); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::AsyncBackingParams( + tx, + ), + )) => { + assert_eq!(hash, activated_hash); + let _ = tx.send(Ok(async_backing_params)); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::Version(tx), + )) => { + assert_eq!(hash, activated_hash); + let _ = tx.send(Ok(runtime_version)); + } + ); + + if runtime_version == RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::ClaimQueue(tx), + )) => { + assert_eq!(hash, activated_hash); + let _ = tx.send(Ok(claim_queue)); + } + ); + } + } + + // Handles all runtime requests performed in `handle_new_activations` for the case when a + // collation should be prepared for the new leaf + pub async fn handle_core_processing_for_a_leaf( + virtual_overseer: &mut VirtualOverseer, + activated_hash: Hash, + para_id: ParaId, + expected_occupied_core_assumption: OccupiedCoreAssumption, + ) { + // Some hardcoded data - if needed, extract to parameters + let validation_code_hash = ValidationCodeHash::from(Hash::repeat_byte(42)); + let parent_head = HeadData::from(vec![1, 2, 3]); + let pvd = PersistedValidationData { + parent_head: parent_head.clone(), + relay_parent_number: 10, + relay_parent_storage_root: Hash::repeat_byte(1), + max_pov_size: 1024, + }; + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::PersistedValidationData(id, a, tx))) => { + assert_eq!(hash, activated_hash); + assert_eq!(id, para_id); + assert_eq!(a, expected_occupied_core_assumption); + + let _ = tx.send(Ok(Some(pvd.clone()))); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::ValidationCodeHash( + id, + assumption, + tx, + ), + )) => { + assert_eq!(hash, activated_hash); + assert_eq!(id, para_id); + assert_eq!(assumption, expected_occupied_core_assumption); + + let _ = tx.send(Ok(Some(validation_code_hash))); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation{ + candidate_receipt, + parent_head_data_hash, + .. + }) => { + assert_eq!(parent_head_data_hash, parent_head.hash()); + assert_eq!(candidate_receipt.descriptor().persisted_validation_data_hash, pvd.hash()); + assert_eq!(candidate_receipt.descriptor().para_head, dummy_head_data().hash()); + assert_eq!(candidate_receipt.descriptor().validation_code_hash, validation_code_hash); + } + ); + } +} diff --git a/polkadot/node/core/runtime-api/src/cache.rs b/polkadot/node/core/runtime-api/src/cache.rs index 5eca551db0a6..9674cda98385 100644 --- a/polkadot/node/core/runtime-api/src/cache.rs +++ b/polkadot/node/core/runtime-api/src/cache.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::collections::btree_map::BTreeMap; +use std::collections::{btree_map::BTreeMap, VecDeque}; use schnellru::{ByLength, LruMap}; use sp_consensus_babe::Epoch; @@ -23,10 +23,11 @@ use polkadot_primitives::{ async_backing, slashing, vstaging::{self, ApprovalVotingParams}, AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash, - Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, - PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, - ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams, + GroupRotationInfo, Hash, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, + OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, + SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, + ValidatorSignature, }; /// For consistency we have the same capacity for all caches. We use 128 as we'll only need that @@ -70,6 +71,7 @@ pub(crate) struct RequestResultCache { async_backing_params: LruMap, node_features: LruMap, approval_voting_params: LruMap, + claim_queue: LruMap>>, } impl Default for RequestResultCache { @@ -105,6 +107,7 @@ impl Default for RequestResultCache { para_backing_state: LruMap::new(ByLength::new(DEFAULT_CACHE_CAP)), async_backing_params: LruMap::new(ByLength::new(DEFAULT_CACHE_CAP)), node_features: LruMap::new(ByLength::new(DEFAULT_CACHE_CAP)), + claim_queue: LruMap::new(ByLength::new(DEFAULT_CACHE_CAP)), } } } @@ -525,6 +528,21 @@ impl RequestResultCache { ) { self.approval_voting_params.insert(session_index, value); } + + pub(crate) fn claim_queue( + &mut self, + relay_parent: &Hash, + ) -> Option<&BTreeMap>> { + self.claim_queue.get(relay_parent).map(|v| &*v) + } + + pub(crate) fn cache_claim_queue( + &mut self, + relay_parent: Hash, + value: BTreeMap>, + ) { + self.claim_queue.insert(relay_parent, value); + } } pub(crate) enum RequestResult { @@ -577,4 +595,5 @@ pub(crate) enum RequestResult { ParaBackingState(Hash, ParaId, Option), AsyncBackingParams(Hash, async_backing::AsyncBackingParams), NodeFeatures(SessionIndex, vstaging::NodeFeatures), + ClaimQueue(Hash, BTreeMap>), } diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 3ff1a35d0687..2b7f6fc2d609 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -177,6 +177,9 @@ where self.requests_cache.cache_async_backing_params(relay_parent, params), NodeFeatures(session_index, params) => self.requests_cache.cache_node_features(session_index, params), + ClaimQueue(relay_parent, sender) => { + self.requests_cache.cache_claim_queue(relay_parent, sender); + }, } } @@ -329,6 +332,8 @@ where Some(Request::NodeFeatures(index, sender)) } }, + Request::ClaimQueue(sender) => + query!(claim_queue(), sender).map(|sender| Request::ClaimQueue(sender)), } } @@ -626,5 +631,11 @@ where sender, result = (index) ), + Request::ClaimQueue(sender) => query!( + ClaimQueue, + claim_queue(), + ver = Request::CLAIM_QUEUE_RUNTIME_REQUIREMENT, + sender + ), } } diff --git a/polkadot/node/core/runtime-api/src/tests.rs b/polkadot/node/core/runtime-api/src/tests.rs index f91723b3d39e..fefd2d3f8624 100644 --- a/polkadot/node/core/runtime-api/src/tests.rs +++ b/polkadot/node/core/runtime-api/src/tests.rs @@ -23,15 +23,16 @@ use polkadot_primitives::{ async_backing, slashing, vstaging::{ApprovalVotingParams, NodeFeatures}, AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, - Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, - PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, - Slot, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams, + GroupRotationInfo, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, + OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, + SessionIndex, SessionInfo, Slot, ValidationCode, ValidationCodeHash, ValidatorId, + ValidatorIndex, ValidatorSignature, }; use sp_api::ApiError; use sp_core::testing::TaskExecutor; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, VecDeque}, sync::{Arc, Mutex}, }; use test_helpers::{dummy_committed_candidate_receipt, dummy_validation_code}; @@ -286,6 +287,13 @@ impl RuntimeApiSubsystemClient for MockSubsystemClient { async fn disabled_validators(&self, _: Hash) -> Result, ApiError> { todo!("Not required for tests") } + + async fn claim_queue( + &self, + _: Hash, + ) -> Result>, ApiError> { + todo!("Not required for tests") + } } #[test] diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 23773f7e3252..5115efa853c1 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -45,7 +45,7 @@ use polkadot_primitives::{ async_backing, slashing, vstaging::{ApprovalVotingParams, NodeFeatures}, AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent, CandidateHash, - CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState, + CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams, GroupIndex, GroupRotationInfo, Hash, HeadData, Header as BlockHeader, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, @@ -55,7 +55,7 @@ use polkadot_primitives::{ }; use polkadot_statement_table::v2::Misbehavior; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet, VecDeque}, sync::Arc, }; @@ -729,6 +729,9 @@ pub enum RuntimeApiRequest { /// Approval voting params /// `V10` ApprovalVotingParams(SessionIndex, RuntimeApiSender), + /// Fetch the `ClaimQueue` from scheduler pallet + /// `V11` + ClaimQueue(RuntimeApiSender>>), } impl RuntimeApiRequest { @@ -763,6 +766,9 @@ impl RuntimeApiRequest { /// `approval_voting_params` pub const APPROVAL_VOTING_PARAMS_REQUIREMENT: u32 = 10; + + /// `ClaimQueue` + pub const CLAIM_QUEUE_RUNTIME_REQUIREMENT: u32 = 11; } /// A message to the Runtime API subsystem. diff --git a/polkadot/node/subsystem-types/src/runtime_client.rs b/polkadot/node/subsystem-types/src/runtime_client.rs index 4039fc9127da..7474b4120cc9 100644 --- a/polkadot/node/subsystem-types/src/runtime_client.rs +++ b/polkadot/node/subsystem-types/src/runtime_client.rs @@ -21,10 +21,11 @@ use polkadot_primitives::{ slashing, vstaging::{self, ApprovalVotingParams}, Block, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash, - Header, Id, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, - PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, - ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams, + GroupRotationInfo, Hash, Header, Id, InboundDownwardMessage, InboundHrmpMessage, + OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, + SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, + ValidatorSignature, }; use sc_client_api::{AuxStore, HeaderBackend}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; @@ -33,7 +34,10 @@ use sp_authority_discovery::AuthorityDiscoveryApi; use sp_blockchain::{BlockStatus, Info}; use sp_consensus_babe::{BabeApi, Epoch}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::Arc, +}; /// Offers header utilities. /// @@ -329,6 +333,10 @@ pub trait RuntimeApiSubsystemClient { at: Hash, session_index: SessionIndex, ) -> Result; + + // == v11: Claim queue == + /// Fetch the `ClaimQueue` from scheduler pallet + async fn claim_queue(&self, at: Hash) -> Result>, ApiError>; } /// Default implementation of [`RuntimeApiSubsystemClient`] using the client. @@ -594,6 +602,10 @@ where ) -> Result { self.client.runtime_api().approval_voting_params(at) } + + async fn claim_queue(&self, at: Hash) -> Result>, ApiError> { + self.client.runtime_api().claim_queue(at) + } } impl HeaderBackend for DefaultSubsystemClient diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index f13beb3502fc..aaae30db50cd 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -30,7 +30,7 @@ use polkadot_node_subsystem::{ messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender}, overseer, SubsystemSender, }; -use polkadot_primitives::{slashing, ExecutorParams}; +use polkadot_primitives::{slashing, CoreIndex, ExecutorParams}; pub use overseer::{ gen::{OrchestraError as OverseerError, Timeout}, @@ -53,7 +53,10 @@ pub use rand; use sp_application_crypto::AppCrypto; use sp_core::ByteArray; use sp_keystore::{Error as KeystoreError, KeystorePtr}; -use std::time::Duration; +use std::{ + collections::{BTreeMap, VecDeque}, + time::Duration, +}; use thiserror::Error; use vstaging::get_disabled_validators_with_fallback; @@ -304,6 +307,7 @@ specialize_requests! { fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost; fn request_disabled_validators() -> Vec; DisabledValidators; fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams; + fn request_claim_queue() -> BTreeMap>; ClaimQueue; } /// Requests executor parameters from the runtime effective at given relay-parent. First obtains diff --git a/polkadot/primitives/src/runtime_api.rs b/polkadot/primitives/src/runtime_api.rs index d661005e32ff..6dca33f88234 100644 --- a/polkadot/primitives/src/runtime_api.rs +++ b/polkadot/primitives/src/runtime_api.rs @@ -117,14 +117,18 @@ use crate::{ async_backing, slashing, vstaging::{self, ApprovalVotingParams}, AsyncBackingParams, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash, - OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, - SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorSignature, + CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams, + GroupRotationInfo, Hash, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, + ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, + ValidatorSignature, }; use polkadot_core_primitives as pcp; use polkadot_parachain_primitives::primitives as ppp; -use sp_std::{collections::btree_map::BTreeMap, prelude::*}; +use sp_std::{ + collections::{btree_map::BTreeMap, vec_deque::VecDeque}, + prelude::*, +}; sp_api::decl_runtime_apis! { /// The API for querying the state of parachains on-chain. @@ -281,5 +285,10 @@ sp_api::decl_runtime_apis! { /// Approval voting configuration parameters #[api_version(10)] fn approval_voting_params() -> ApprovalVotingParams; + + /***** Added in v11 *****/ + /// Claim queue + #[api_version(11)] + fn claim_queue() -> BTreeMap>; } } diff --git a/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs b/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs index 1fee1a4097d8..296b872e8d41 100644 --- a/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs +++ b/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs @@ -16,12 +16,15 @@ //! Put implementations of functions from staging APIs here. -use crate::{configuration, initializer, shared}; +use crate::{configuration, initializer, scheduler, shared}; use primitives::{ vstaging::{ApprovalVotingParams, NodeFeatures}, - ValidatorIndex, + CoreIndex, Id as ParaId, ValidatorIndex, +}; +use sp_std::{ + collections::{btree_map::BTreeMap, vec_deque::VecDeque}, + prelude::Vec, }; -use sp_std::prelude::Vec; /// Implementation for `DisabledValidators` // CAVEAT: this should only be called on the node side @@ -38,8 +41,18 @@ pub fn node_features() -> NodeFeatures { >::config().node_features } -/// Approval voting subsystem configuration parameteres +/// Approval voting subsystem configuration parameters pub fn approval_voting_params() -> ApprovalVotingParams { let config = >::config(); config.approval_voting_params } + +/// Returns the claimqueue from the scheduler +pub fn claim_queue() -> BTreeMap> { + >::claimqueue() + .into_iter() + .map(|(core_index, entries)| { + (core_index, entries.into_iter().map(|e| e.para_id()).collect()) + }) + .collect() +} diff --git a/prdoc/pr_3580.prdoc b/prdoc/pr_3580.prdoc new file mode 100644 index 000000000000..042fcf7a1a84 --- /dev/null +++ b/prdoc/pr_3580.prdoc @@ -0,0 +1,13 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Expose `ClaimQueue` via a runtime api and consume it in `collation-generation` + +doc: + - audience: Node Dev + description: | + Creates a new runtime api exposing the `ClaimQueue` from `scheduler` pallet. Consume the api + in collation generation (if available) by getting what's scheduled on a core from the + `ClaimQueue` instead of from `next_up_on_available` (from `AvailabilityCores` runtime api). + +crates: [ ]