diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml
index ac4bdac7ad15..e722239d890c 100644
--- a/.gitlab/pipeline/zombienet/polkadot.yml
+++ b/.gitlab/pipeline/zombienet/polkadot.yml
@@ -252,6 +252,17 @@ zombienet-polkadot-functional-0018-shared-core-idle-parachain:
--local-dir="${LOCAL_DIR}/functional"
--test="0018-shared-core-idle-parachain.zndsl"
+zombienet-polkadot-functional-0019-coretime-collation-fetching-fairness:
+ extends:
+ - .zombienet-polkadot-common
+ before_script:
+ - !reference [ .zombienet-polkadot-common, before_script ]
+ - cp --remove-destination ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/functional
+ script:
+ - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
+ --local-dir="${LOCAL_DIR}/functional"
+ --test="0019-coretime-collation-fetching-fairness.zndsl"
+
zombienet-polkadot-smoke-0001-parachains-smoke-test:
extends:
- .zombienet-polkadot-common
diff --git a/polkadot/node/network/collator-protocol/src/error.rs b/polkadot/node/network/collator-protocol/src/error.rs
index 598cdcf43900..97fd4076bb8f 100644
--- a/polkadot/node/network/collator-protocol/src/error.rs
+++ b/polkadot/node/network/collator-protocol/src/error.rs
@@ -70,6 +70,9 @@ pub enum Error {
#[error("Response receiver for claim queue request cancelled")]
CancelledClaimQueue(oneshot::Canceled),
+
+ #[error("No state for the relay parent")]
+ RelayParentStateNotFound,
}
/// An error happened on the validator side of the protocol when attempting
diff --git a/polkadot/node/network/collator-protocol/src/validator_side/claim_queue_state.rs b/polkadot/node/network/collator-protocol/src/validator_side/claim_queue_state.rs
new file mode 100644
index 000000000000..3a34cf52fec6
--- /dev/null
+++ b/polkadot/node/network/collator-protocol/src/validator_side/claim_queue_state.rs
@@ -0,0 +1,1055 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! `ClaimQueueState` tracks the state of the claim queue over a set of relay blocks. Refer to
+//! [`ClaimQueueState`] for more details.
+
+use std::collections::VecDeque;
+
+use crate::LOG_TARGET;
+use polkadot_primitives::{Hash, Id as ParaId};
+
+/// Represents a single claim from the claim queue, mapped to the relay chain block where it could
+/// be backed on-chain.
+#[derive(Debug, PartialEq)]
+struct ClaimInfo {
+ // Hash of the relay chain block. Can be `None` if it is still not known (a future block).
+ hash: Option,
+ /// Represents the `ParaId` scheduled for the block. Can be `None` if nothing is scheduled.
+ claim: Option,
+ /// The length of the claim queue at the block. It is used to determine the 'block window'
+ /// where a claim can be made.
+ claim_queue_len: usize,
+ /// A flag that indicates if the slot is claimed or not.
+ claimed: bool,
+}
+
+/// Tracks the state of the claim queue over a set of relay blocks.
+///
+/// Generally the claim queue represents the `ParaId` that should be scheduled at the current block
+/// (the first element of the claim queue) and N other `ParaId`s which are supposed to be scheduled
+/// on the next relay blocks. In other words the claim queue is a rolling window giving a hint what
+/// should be built/fetched/accepted (depending on the context) at each block.
+///
+/// Since the claim queue peeks into the future blocks there is a relation between the claim queue
+/// state between the current block and the future blocks.
+/// Let's see an example with 2 co-scheduled parachains:
+/// - relay parent 1; Claim queue: [A, B, A]
+/// - relay parent 2; Claim queue: [B, A, B]
+/// - relay parent 3; Claim queue: [A, B, A]
+/// - and so on
+///
+/// Note that at rp1 the second element in the claim queue is equal to the first one in rp2. Also
+/// the third element of the claim queue at rp1 is equal to the second one in rp2 and the first one
+/// in rp3.
+///
+/// So if we want to claim the third slot at rp 1 we are also claiming the second at rp2 and first
+/// at rp3. To track this in a simple way we can project the claim queue onto the relay blocks like
+/// this:
+/// [A] [B] [A] -> this is the claim queue at rp3
+/// [B] [A] [B] -> this is the claim queue at rp2
+/// [A] [B] [A] -> this is the claim queue at rp1
+/// [RP 1][RP 2][RP 3][RP X][RP Y] -> relay blocks, RP x and RP Y are future blocks
+///
+/// Note that the claims at each column are the same so we can simplify this by just projecting a
+/// single claim over a block:
+/// [A] [B] [A] [B] [A] -> claims effectively are the same
+/// [RP 1][RP 2][RP 3][RP X][RP Y] -> relay blocks, RP x and RP Y are future blocks
+///
+/// Basically this is how `ClaimQueueState` works. It keeps track of claims at each block by mapping
+/// claims to relay blocks.
+///
+/// How making a claim works?
+/// At each relay block we keep track how long is the claim queue. This is a 'window' where we can
+/// make a claim. So adding a claim just looks for a free spot at this window and claims it.
+///
+/// Note on adding a new leaf.
+/// When a new leaf is added we check if the first element in its claim queue matches with the
+/// projection on the first element in 'future blocks'. If yes - the new relay block inherits this
+/// claim. If not - this means that the claim queue changed for some reason so the claim can't be
+/// inherited. This should not happen under normal circumstances. But if it happens it means that we
+/// have got one claim which won't be satisfied in the worst case scenario.
+pub(crate) struct ClaimQueueState {
+ block_state: VecDeque,
+ future_blocks: VecDeque,
+}
+
+impl ClaimQueueState {
+ pub(crate) fn new() -> Self {
+ Self { block_state: VecDeque::new(), future_blocks: VecDeque::new() }
+ }
+
+ // Appends a new leaf
+ pub(crate) fn add_leaf(&mut self, hash: &Hash, claim_queue: &Vec) {
+ if self.block_state.iter().any(|s| s.hash == Some(*hash)) {
+ return
+ }
+
+ // First check if our view for the future blocks is consistent with the one in the claim
+ // queue of the new block. If not - the claim queue has changed for some reason and we need
+ // to readjust our view.
+ for (idx, expected_claim) in claim_queue.iter().enumerate() {
+ match self.future_blocks.get_mut(idx) {
+ Some(future_block) =>
+ if future_block.claim.as_ref() != Some(expected_claim) {
+ // There is an inconsistency. Update our view with the one from the claim
+ // queue. `claimed` can't be true anymore since the `ParaId` has changed.
+ future_block.claimed = false;
+ future_block.claim = Some(*expected_claim);
+ },
+ None => {
+ self.future_blocks.push_back(ClaimInfo {
+ hash: None,
+ claim: Some(*expected_claim),
+ // For future blocks we don't know the size of the claim queue.
+ // `claim_queue_len` could be an option but there is not much benefit from
+ // the extra boilerplate code to handle it. We set it to one since we
+ // usually know about one claim at each future block but this value is not
+ // used anywhere in the code.
+ claim_queue_len: 1,
+ claimed: false,
+ });
+ },
+ }
+ }
+
+ // Now pop the first future block and add it as a leaf
+ let claim_info = if let Some(new_leaf) = self.future_blocks.pop_front() {
+ ClaimInfo {
+ hash: Some(*hash),
+ claim: claim_queue.first().copied(),
+ claim_queue_len: claim_queue.len(),
+ claimed: new_leaf.claimed,
+ }
+ } else {
+ // maybe the claim queue was empty but we still need to add a leaf
+ ClaimInfo {
+ hash: Some(*hash),
+ claim: claim_queue.first().copied(),
+ claim_queue_len: claim_queue.len(),
+ claimed: false,
+ }
+ };
+
+ // `future_blocks` can't be longer than the length of the claim queue at the last block - 1.
+ // For example this can happen if at relay block N we have got a claim queue of a length 4
+ // and it's shrunk to 2.
+ self.future_blocks.truncate(claim_queue.len().saturating_sub(1));
+
+ self.block_state.push_back(claim_info);
+ }
+
+ fn get_window<'a>(
+ &'a mut self,
+ relay_parent: &'a Hash,
+ ) -> impl Iterator- + 'a {
+ let mut window = self
+ .block_state
+ .iter_mut()
+ .skip_while(|b| b.hash != Some(*relay_parent))
+ .peekable();
+ let cq_len = window.peek().map_or(0, |b| b.claim_queue_len);
+ window.chain(self.future_blocks.iter_mut()).take(cq_len)
+ }
+
+ pub(crate) fn claim_at(&mut self, relay_parent: &Hash, para_id: &ParaId) -> bool {
+ gum::trace!(
+ target: LOG_TARGET,
+ ?para_id,
+ ?relay_parent,
+ "claim_at"
+ );
+ self.find_a_claim(relay_parent, para_id, true)
+ }
+
+ pub(crate) fn can_claim_at(&mut self, relay_parent: &Hash, para_id: &ParaId) -> bool {
+ gum::trace!(
+ target: LOG_TARGET,
+ ?para_id,
+ ?relay_parent,
+ "can_claim_at"
+ );
+
+ self.find_a_claim(relay_parent, para_id, false)
+ }
+
+ // Returns `true` if there is a claim within `relay_parent`'s view of the claim queue for
+ // `para_id`. If `claim_it` is set to `true` the slot is claimed. Otherwise the function just
+ // reports the availability of the slot.
+ fn find_a_claim(&mut self, relay_parent: &Hash, para_id: &ParaId, claim_it: bool) -> bool {
+ let window = self.get_window(relay_parent);
+
+ for w in window {
+ gum::trace!(
+ target: LOG_TARGET,
+ ?para_id,
+ ?relay_parent,
+ claim_info=?w,
+ ?claim_it,
+ "Checking claim"
+ );
+
+ if !w.claimed && w.claim == Some(*para_id) {
+ w.claimed = claim_it;
+ return true
+ }
+ }
+
+ false
+ }
+
+ pub(crate) fn unclaimed_at(&mut self, relay_parent: &Hash) -> Vec {
+ let window = self.get_window(relay_parent);
+
+ window.filter(|b| !b.claimed).filter_map(|b| b.claim).collect()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn sane_initial_state() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent = Hash::from_low_u64_be(1);
+ let para_id = ParaId::new(1);
+
+ assert!(!state.can_claim_at(&relay_parent, ¶_id));
+ assert!(!state.claim_at(&relay_parent, ¶_id));
+ assert_eq!(state.unclaimed_at(&relay_parent), vec![]);
+ }
+
+ #[test]
+ fn add_leaf_works() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id = ParaId::new(1);
+ let claim_queue = vec![para_id, para_id, para_id];
+
+ state.add_leaf(&relay_parent_a, &claim_queue);
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id, para_id, para_id]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: false,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: false },
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: false }
+ ])
+ );
+
+ // should be no op
+ state.add_leaf(&relay_parent_a, &claim_queue);
+ assert_eq!(state.block_state.len(), 1);
+ assert_eq!(state.future_blocks.len(), 2);
+
+ // add another leaf
+ let relay_parent_b = Hash::from_low_u64_be(2);
+ state.add_leaf(&relay_parent_b, &claim_queue);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: false,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: false,
+ }
+ ])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: false },
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: false }
+ ])
+ );
+
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id, para_id, para_id]);
+ assert_eq!(state.unclaimed_at(&relay_parent_b), vec![para_id, para_id, para_id]);
+ }
+
+ #[test]
+ fn claims_at_separate_relay_parents_work() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let relay_parent_b = Hash::from_low_u64_be(2);
+ let para_id = ParaId::new(1);
+ let claim_queue = vec![para_id, para_id, para_id];
+
+ state.add_leaf(&relay_parent_a, &claim_queue);
+ state.add_leaf(&relay_parent_b, &claim_queue);
+
+ // add one claim for a
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id, para_id, para_id]);
+ assert!(state.claim_at(&relay_parent_a, ¶_id));
+
+ // and one for b
+ assert!(state.can_claim_at(&relay_parent_b, ¶_id));
+ assert_eq!(state.unclaimed_at(&relay_parent_b), vec![para_id, para_id, para_id]);
+ assert!(state.claim_at(&relay_parent_b, ¶_id));
+
+ // a should have one claim since the one for b was claimed
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id]);
+ // and two more for b
+ assert_eq!(state.unclaimed_at(&relay_parent_b), vec![para_id, para_id]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ }
+ ])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: false },
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: false }
+ ])
+ );
+ }
+
+ #[test]
+ fn claims_are_transferred_to_next_slot() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id = ParaId::new(1);
+ let claim_queue = vec![para_id, para_id, para_id];
+
+ state.add_leaf(&relay_parent_a, &claim_queue);
+
+ // add two claims, 2nd should be transferred to a new leaf
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id, para_id, para_id]);
+ assert!(state.claim_at(&relay_parent_a, ¶_id));
+
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id, para_id]);
+ assert!(state.claim_at(&relay_parent_a, ¶_id));
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: true },
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: false }
+ ])
+ );
+
+ // one more
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id]);
+ assert!(state.claim_at(&relay_parent_a, ¶_id));
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: true },
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: true }
+ ])
+ );
+
+ // no more claims
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![]);
+ }
+
+ #[test]
+ fn claims_are_transferred_to_new_leaves() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id = ParaId::new(1);
+ let claim_queue = vec![para_id, para_id, para_id];
+
+ state.add_leaf(&relay_parent_a, &claim_queue);
+
+ for _ in 0..3 {
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id));
+ assert!(state.claim_at(&relay_parent_a, ¶_id));
+ }
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: true },
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: true }
+ ])
+ );
+
+ // no more claims
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id));
+
+ // new leaf
+ let relay_parent_b = Hash::from_low_u64_be(2);
+ state.add_leaf(&relay_parent_b, &claim_queue);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ }
+ ])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: true },
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: false }
+ ])
+ );
+
+ // still no claims for a
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id));
+
+ // but can accept for b
+ assert!(state.can_claim_at(&relay_parent_b, ¶_id));
+ assert!(state.claim_at(&relay_parent_b, ¶_id));
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id),
+ claim_queue_len: 3,
+ claimed: true,
+ }
+ ])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: true },
+ ClaimInfo { hash: None, claim: Some(para_id), claim_queue_len: 1, claimed: true }
+ ])
+ );
+ }
+
+ #[test]
+ fn two_paras() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id_a = ParaId::new(1);
+ let para_id_b = ParaId::new(2);
+ let claim_queue = vec![para_id_a, para_id_b, para_id_a];
+
+ state.add_leaf(&relay_parent_a, &claim_queue);
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_b));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id_a, para_id_b, para_id_a]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: false,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_b),
+ claim_queue_len: 1,
+ claimed: false
+ },
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false
+ }
+ ])
+ );
+
+ assert!(state.claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_b));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id_b, para_id_a]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: true,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_b),
+ claim_queue_len: 1,
+ claimed: false
+ },
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false
+ }
+ ])
+ );
+
+ assert!(state.claim_at(&relay_parent_a, ¶_id_a));
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_b));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id_b]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: true,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_b),
+ claim_queue_len: 1,
+ claimed: false
+ },
+ ClaimInfo { hash: None, claim: Some(para_id_a), claim_queue_len: 1, claimed: true }
+ ])
+ );
+
+ assert!(state.claim_at(&relay_parent_a, ¶_id_b));
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id_b));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: true,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id_b), claim_queue_len: 1, claimed: true },
+ ClaimInfo { hash: None, claim: Some(para_id_a), claim_queue_len: 1, claimed: true }
+ ])
+ );
+ }
+
+ #[test]
+ fn claim_queue_changes_unexpectedly() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id_a = ParaId::new(1);
+ let para_id_b = ParaId::new(2);
+ let claim_queue_a = vec![para_id_a, para_id_b, para_id_a];
+
+ state.add_leaf(&relay_parent_a, &claim_queue_a);
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_b));
+ assert!(state.claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.claim_at(&relay_parent_a, ¶_id_b));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: true,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id_b), claim_queue_len: 1, claimed: true },
+ ClaimInfo { hash: None, claim: Some(para_id_a), claim_queue_len: 1, claimed: true }
+ ])
+ );
+
+ let relay_parent_b = Hash::from_low_u64_be(2);
+ let claim_queue_b = vec![para_id_a, para_id_a, para_id_a]; // should be [b, a, ...]
+ state.add_leaf(&relay_parent_b, &claim_queue_b);
+
+ // because of the unexpected change in claim queue we lost the claim for paraB and have one
+ // unclaimed for paraA
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id_a]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: true,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: false,
+ }
+ ])
+ );
+ assert_eq!(
+ state.future_blocks,
+ // since the 3rd slot of the claim queue at rp1 is equal to the second one in rp2, this
+ // claim still exists
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id_a), claim_queue_len: 1, claimed: true },
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false
+ }
+ ])
+ );
+ }
+
+ #[test]
+ fn claim_queue_changes_unexpectedly_with_two_blocks() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id_a = ParaId::new(1);
+ let para_id_b = ParaId::new(2);
+ let claim_queue_a = vec![para_id_a, para_id_b, para_id_b];
+
+ state.add_leaf(&relay_parent_a, &claim_queue_a);
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.can_claim_at(&relay_parent_a, ¶_id_b));
+ assert!(state.claim_at(&relay_parent_a, ¶_id_a));
+ assert!(state.claim_at(&relay_parent_a, ¶_id_b));
+ assert!(state.claim_at(&relay_parent_a, ¶_id_b));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: true,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo { hash: None, claim: Some(para_id_b), claim_queue_len: 1, claimed: true },
+ ClaimInfo { hash: None, claim: Some(para_id_b), claim_queue_len: 1, claimed: true }
+ ])
+ );
+
+ let relay_parent_b = Hash::from_low_u64_be(2);
+ let claim_queue_b = vec![para_id_a, para_id_a, para_id_a]; // should be [b, b, ...]
+ state.add_leaf(&relay_parent_b, &claim_queue_b);
+
+ // because of the unexpected change in claim queue we lost both claims for paraB and have
+ // two unclaimed for paraA
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id_a, para_id_a]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: true,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: false,
+ }
+ ])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false
+ },
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false
+ }
+ ])
+ );
+ }
+
+ #[test]
+ fn empty_claim_queue() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id_a = ParaId::new(1);
+ let claim_queue_a = vec![];
+
+ state.add_leaf(&relay_parent_a, &claim_queue_a);
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: None,
+ claim_queue_len: 0,
+ claimed: false,
+ },])
+ );
+ // no claim queue so we know nothing about future blocks
+ assert!(state.future_blocks.is_empty());
+
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(!state.claim_at(&relay_parent_a, ¶_id_a));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![]);
+
+ let relay_parent_b = Hash::from_low_u64_be(2);
+ let claim_queue_b = vec![para_id_a];
+ state.add_leaf(&relay_parent_b, &claim_queue_b);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: None,
+ claim_queue_len: 0,
+ claimed: false,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false,
+ },
+ ])
+ );
+ // claim queue with length 1 doesn't say anything about future blocks
+ assert!(state.future_blocks.is_empty());
+
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(!state.claim_at(&relay_parent_a, ¶_id_a));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![]);
+
+ assert!(state.can_claim_at(&relay_parent_b, ¶_id_a));
+ assert_eq!(state.unclaimed_at(&relay_parent_b), vec![para_id_a]);
+ assert!(state.claim_at(&relay_parent_b, ¶_id_a));
+
+ let relay_parent_c = Hash::from_low_u64_be(3);
+ let claim_queue_c = vec![para_id_a, para_id_a];
+ state.add_leaf(&relay_parent_c, &claim_queue_c);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: None,
+ claim_queue_len: 0,
+ claimed: false,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: true,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_c),
+ claim: Some(para_id_a),
+ claim_queue_len: 2,
+ claimed: false,
+ },
+ ])
+ );
+ // claim queue with length 2 fills only one future block
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false,
+ },])
+ );
+
+ assert!(!state.can_claim_at(&relay_parent_a, ¶_id_a));
+ assert!(!state.claim_at(&relay_parent_a, ¶_id_a));
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![]);
+
+ // already claimed
+ assert!(!state.can_claim_at(&relay_parent_b, ¶_id_a));
+ assert_eq!(state.unclaimed_at(&relay_parent_b), vec![]);
+ assert!(!state.claim_at(&relay_parent_b, ¶_id_a));
+
+ assert!(state.can_claim_at(&relay_parent_c, ¶_id_a));
+ assert_eq!(state.unclaimed_at(&relay_parent_c), vec![para_id_a, para_id_a]);
+ }
+
+ #[test]
+ fn claim_queue_becomes_shorter() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id_a = ParaId::new(1);
+ let para_id_b = ParaId::new(2);
+ let claim_queue_a = vec![para_id_a, para_id_b, para_id_a];
+
+ state.add_leaf(&relay_parent_a, &claim_queue_a);
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id_a, para_id_b, para_id_a]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: false,
+ },])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_b),
+ claim_queue_len: 1,
+ claimed: false
+ },
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false
+ }
+ ])
+ );
+
+ let relay_parent_b = Hash::from_low_u64_be(2);
+ let claim_queue_b = vec![para_id_a, para_id_b]; // should be [b, a]
+ state.add_leaf(&relay_parent_b, &claim_queue_b);
+
+ assert_eq!(state.unclaimed_at(&relay_parent_b), vec![para_id_a, para_id_b]);
+ // claims for `relay_parent_a` has changed.
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id_a, para_id_a, para_id_b]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 3,
+ claimed: false,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id_a),
+ claim_queue_len: 2,
+ claimed: false,
+ }
+ ])
+ );
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![ClaimInfo {
+ hash: None,
+ claim: Some(para_id_b),
+ claim_queue_len: 1,
+ claimed: false
+ },])
+ );
+ }
+
+ #[test]
+ fn claim_queue_becomes_shorter_and_drops_future_claims() {
+ let mut state = ClaimQueueState::new();
+ let relay_parent_a = Hash::from_low_u64_be(1);
+ let para_id_a = ParaId::new(1);
+ let para_id_b = ParaId::new(2);
+ let claim_queue_a = vec![para_id_a, para_id_b, para_id_a, para_id_b];
+
+ state.add_leaf(&relay_parent_a, &claim_queue_a);
+
+ assert_eq!(
+ state.unclaimed_at(&relay_parent_a),
+ vec![para_id_a, para_id_b, para_id_a, para_id_b]
+ );
+
+ // We start with claim queue len 4.
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 4,
+ claimed: false,
+ },])
+ );
+ // we have got three future blocks
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_b),
+ claim_queue_len: 1,
+ claimed: false
+ },
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false
+ },
+ ClaimInfo {
+ hash: None,
+ claim: Some(para_id_b),
+ claim_queue_len: 1,
+ claimed: false
+ }
+ ])
+ );
+
+ // The next claim len is 2, so we loose one future block
+ let relay_parent_b = Hash::from_low_u64_be(2);
+ let para_id_a = ParaId::new(1);
+ let para_id_b = ParaId::new(2);
+ let claim_queue_b = vec![para_id_b, para_id_a];
+ state.add_leaf(&relay_parent_b, &claim_queue_b);
+
+ assert_eq!(state.unclaimed_at(&relay_parent_a), vec![para_id_a, para_id_b, para_id_a]);
+ assert_eq!(state.unclaimed_at(&relay_parent_b), vec![para_id_b, para_id_a]);
+
+ assert_eq!(
+ state.block_state,
+ VecDeque::from(vec![
+ ClaimInfo {
+ hash: Some(relay_parent_a),
+ claim: Some(para_id_a),
+ claim_queue_len: 4,
+ claimed: false,
+ },
+ ClaimInfo {
+ hash: Some(relay_parent_b),
+ claim: Some(para_id_b),
+ claim_queue_len: 2,
+ claimed: false,
+ }
+ ])
+ );
+
+ assert_eq!(
+ state.future_blocks,
+ VecDeque::from(vec![ClaimInfo {
+ hash: None,
+ claim: Some(para_id_a),
+ claim_queue_len: 1,
+ claimed: false
+ },])
+ );
+ }
+}
diff --git a/polkadot/node/network/collator-protocol/src/validator_side/collation.rs b/polkadot/node/network/collator-protocol/src/validator_side/collation.rs
index cc0de1cb70f6..625140a73966 100644
--- a/polkadot/node/network/collator-protocol/src/validator_side/collation.rs
+++ b/polkadot/node/network/collator-protocol/src/validator_side/collation.rs
@@ -18,16 +18,28 @@
//!
//! Usually a path of collations is as follows:
//! 1. First, collation must be advertised by collator.
-//! 2. If the advertisement was accepted, it's queued for fetch (per relay parent).
-//! 3. Once it's requested, the collation is said to be Pending.
-//! 4. Pending collation becomes Fetched once received, we send it to backing for validation.
-//! 5. If it turns to be invalid or async backing allows seconding another candidate, carry on
+//! 2. The validator inspects the claim queue and decides if the collation should be fetched
+//! based on the entries there. A parachain can't have more fetched collations than the
+//! entries in the claim queue at a specific relay parent. When calculating this limit the
+//! validator counts all advertisements within its view not just at the relay parent.
+//! 3. If the advertisement was accepted, it's queued for fetch (per relay parent).
+//! 4. Once it's requested, the collation is said to be pending fetch
+//! (`CollationStatus::Fetching`).
+//! 5. Pending fetch collation becomes pending validation
+//! (`CollationStatus::WaitingOnValidation`) once received, we send it to backing for
+//! validation.
+//! 6. If it turns to be invalid or async backing allows seconding another candidate, carry on
//! with the next advertisement, otherwise we're done with this relay parent.
//!
-//! ┌──────────────────────────────────────────┐
-//! └─▶Advertised ─▶ Pending ─▶ Fetched ─▶ Validated
-
-use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};
+//! ┌───────────────────────────────────┐
+//! └─▶Waiting ─▶ Fetching ─▶ WaitingOnValidation
+
+use std::{
+ collections::{BTreeMap, VecDeque},
+ future::Future,
+ pin::Pin,
+ task::Poll,
+};
use futures::{future::BoxFuture, FutureExt};
use polkadot_node_network_protocol::{
@@ -36,9 +48,7 @@ use polkadot_node_network_protocol::{
PeerId,
};
use polkadot_node_primitives::PoV;
-use polkadot_node_subsystem_util::{
- metrics::prometheus::prometheus::HistogramTimer, runtime::ProspectiveParachainsMode,
-};
+use polkadot_node_subsystem_util::metrics::prometheus::prometheus::HistogramTimer;
use polkadot_primitives::{
vstaging::CandidateReceiptV2 as CandidateReceipt, CandidateHash, CollatorId, Hash, HeadData,
Id as ParaId, PersistedValidationData,
@@ -187,12 +197,10 @@ pub struct PendingCollationFetch {
pub enum CollationStatus {
/// We are waiting for a collation to be advertised to us.
Waiting,
- /// We are currently fetching a collation.
- Fetching,
+ /// We are currently fetching a collation for the specified `ParaId`.
+ Fetching(ParaId),
/// We are waiting that a collation is being validated.
WaitingOnValidation,
- /// We have seconded a collation.
- Seconded,
}
impl Default for CollationStatus {
@@ -202,22 +210,22 @@ impl Default for CollationStatus {
}
impl CollationStatus {
- /// Downgrades to `Waiting`, but only if `self != Seconded`.
- fn back_to_waiting(&mut self, relay_parent_mode: ProspectiveParachainsMode) {
- match self {
- Self::Seconded =>
- if relay_parent_mode.is_enabled() {
- // With async backing enabled it's allowed to
- // second more candidates.
- *self = Self::Waiting
- },
- _ => *self = Self::Waiting,
- }
+ /// Downgrades to `Waiting`
+ pub fn back_to_waiting(&mut self) {
+ *self = Self::Waiting
}
}
+/// The number of claims in the claim queue and seconded candidates count for a specific `ParaId`.
+#[derive(Default, Debug)]
+struct CandidatesStatePerPara {
+ /// How many collations have been seconded.
+ pub seconded_per_para: usize,
+ // Claims in the claim queue for the `ParaId`.
+ pub claims_per_para: usize,
+}
+
/// Information about collations per relay parent.
-#[derive(Default)]
pub struct Collations {
/// What is the current status in regards to a collation for this relay parent?
pub status: CollationStatus,
@@ -226,75 +234,89 @@ pub struct Collations {
/// This is the currently last started fetch, which did not exceed `MAX_UNSHARED_DOWNLOAD_TIME`
/// yet.
pub fetching_from: Option<(CollatorId, Option)>,
- /// Collation that were advertised to us, but we did not yet fetch.
- pub waiting_queue: VecDeque<(PendingCollation, CollatorId)>,
- /// How many collations have been seconded.
- pub seconded_count: usize,
+ /// Collation that were advertised to us, but we did not yet request or fetch. Grouped by
+ /// `ParaId`.
+ waiting_queue: BTreeMap>,
+ /// Number of seconded candidates and claims in the claim queue per `ParaId`.
+ candidates_state: BTreeMap,
}
impl Collations {
+ pub(super) fn new(group_assignments: &Vec) -> Self {
+ let mut candidates_state = BTreeMap::::new();
+
+ for para_id in group_assignments {
+ candidates_state.entry(*para_id).or_default().claims_per_para += 1;
+ }
+
+ Self {
+ status: Default::default(),
+ fetching_from: None,
+ waiting_queue: Default::default(),
+ candidates_state,
+ }
+ }
+
/// Note a seconded collation for a given para.
- pub(super) fn note_seconded(&mut self) {
- self.seconded_count += 1
+ pub(super) fn note_seconded(&mut self, para_id: ParaId) {
+ self.candidates_state.entry(para_id).or_default().seconded_per_para += 1;
+ gum::trace!(
+ target: LOG_TARGET,
+ ?para_id,
+ new_count=self.candidates_state.entry(para_id).or_default().seconded_per_para,
+ "Note seconded."
+ );
+ self.status.back_to_waiting();
}
- /// Returns the next collation to fetch from the `waiting_queue`.
+ /// Adds a new collation to the waiting queue for the relay parent. This function doesn't
+ /// perform any limits check. The caller should assure that the collation limit is respected.
+ pub(super) fn add_to_waiting_queue(&mut self, collation: (PendingCollation, CollatorId)) {
+ self.waiting_queue.entry(collation.0.para_id).or_default().push_back(collation);
+ }
+
+ /// Picks a collation to fetch from the waiting queue.
+ /// When fetching collations we need to ensure that each parachain has got a fair core time
+ /// share depending on its assignments in the claim queue. This means that the number of
+ /// collations seconded per parachain should ideally be equal to the number of claims for the
+ /// particular parachain in the claim queue.
///
- /// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`].
+ /// To achieve this each seconded collation is mapped to an entry from the claim queue. The next
+ /// fetch is the first unfulfilled entry from the claim queue for which there is an
+ /// advertisement.
///
- /// Returns `Some(_)` if there is any collation to fetch, the `status` is not `Seconded` and
- /// the passed in `finished_one` is the currently `waiting_collation`.
- pub(super) fn get_next_collation_to_fetch(
+ /// `unfulfilled_claim_queue_entries` represents all claim queue entries which are still not
+ /// fulfilled.
+ pub(super) fn pick_a_collation_to_fetch(
&mut self,
- finished_one: &(CollatorId, Option),
- relay_parent_mode: ProspectiveParachainsMode,
+ unfulfilled_claim_queue_entries: Vec,
) -> Option<(PendingCollation, CollatorId)> {
- // If finished one does not match waiting_collation, then we already dequeued another fetch
- // to replace it.
- if let Some((collator_id, maybe_candidate_hash)) = self.fetching_from.as_ref() {
- // If a candidate hash was saved previously, `finished_one` must include this too.
- if collator_id != &finished_one.0 &&
- maybe_candidate_hash.map_or(true, |hash| Some(&hash) != finished_one.1.as_ref())
+ gum::trace!(
+ target: LOG_TARGET,
+ waiting_queue=?self.waiting_queue,
+ candidates_state=?self.candidates_state,
+ "Pick a collation to fetch."
+ );
+
+ for assignment in unfulfilled_claim_queue_entries {
+ // if there is an unfulfilled assignment - return it
+ if let Some(collation) = self
+ .waiting_queue
+ .get_mut(&assignment)
+ .and_then(|collations| collations.pop_front())
{
- gum::trace!(
- target: LOG_TARGET,
- waiting_collation = ?self.fetching_from,
- ?finished_one,
- "Not proceeding to the next collation - has already been done."
- );
- return None
+ return Some(collation)
}
}
- self.status.back_to_waiting(relay_parent_mode);
-
- match self.status {
- // We don't need to fetch any other collation when we already have seconded one.
- CollationStatus::Seconded => None,
- CollationStatus::Waiting =>
- if self.is_seconded_limit_reached(relay_parent_mode) {
- None
- } else {
- self.waiting_queue.pop_front()
- },
- CollationStatus::WaitingOnValidation | CollationStatus::Fetching =>
- unreachable!("We have reset the status above!"),
- }
+
+ None
}
- /// Checks the limit of seconded candidates.
- pub(super) fn is_seconded_limit_reached(
- &self,
- relay_parent_mode: ProspectiveParachainsMode,
- ) -> bool {
- let seconded_limit =
- if let ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } =
- relay_parent_mode
- {
- max_candidate_depth + 1
- } else {
- 1
- };
- self.seconded_count >= seconded_limit
+ pub(super) fn seconded_for_para(&self, para_id: &ParaId) -> usize {
+ self.candidates_state
+ .get(¶_id)
+ .map(|state| state.seconded_per_para)
+ .unwrap_or_default()
}
}
diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
index 36ec959c3406..5f5effcde9a8 100644
--- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
@@ -49,22 +49,25 @@ use polkadot_node_subsystem::{
use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
- request_claim_queue, request_session_index_for_child,
- runtime::{prospective_parachains_mode, request_node_features, ProspectiveParachainsMode},
+ request_async_backing_params, request_claim_queue, request_session_index_for_child,
+ runtime::{recv_runtime, request_node_features},
};
use polkadot_primitives::{
node_features,
- vstaging::{CandidateDescriptorV2, CandidateDescriptorVersion, CoreState},
- CandidateHash, CollatorId, CoreIndex, Hash, HeadData, Id as ParaId, OccupiedCoreAssumption,
- PersistedValidationData, SessionIndex,
+ vstaging::{CandidateDescriptorV2, CandidateDescriptorVersion},
+ AsyncBackingParams, CandidateHash, CollatorId, CoreIndex, Hash, HeadData, Id as ParaId,
+ OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
};
use crate::error::{Error, FetchError, Result, SecondingError};
use self::collation::BlockedCollationId;
+use self::claim_queue_state::ClaimQueueState;
+
use super::{modify_reputation, tick_stream, LOG_TARGET};
+mod claim_queue_state;
mod collation;
mod metrics;
@@ -163,27 +166,19 @@ impl PeerData {
fn update_view(
&mut self,
implicit_view: &ImplicitView,
- active_leaves: &HashMap,
- per_relay_parent: &HashMap,
+ active_leaves: &HashMap,
new_view: View,
) {
let old_view = std::mem::replace(&mut self.view, new_view);
if let PeerState::Collating(ref mut peer_state) = self.state {
for removed in old_view.difference(&self.view) {
- // Remove relay parent advertisements if it went out
- // of our (implicit) view.
- let keep = per_relay_parent
- .get(removed)
- .map(|s| {
- is_relay_parent_in_implicit_view(
- removed,
- s.prospective_parachains_mode,
- implicit_view,
- active_leaves,
- peer_state.para_id,
- )
- })
- .unwrap_or(false);
+ // Remove relay parent advertisements if it went out of our (implicit) view.
+ let keep = is_relay_parent_in_implicit_view(
+ removed,
+ implicit_view,
+ active_leaves,
+ peer_state.para_id,
+ );
if !keep {
peer_state.advertisements.remove(&removed);
@@ -196,8 +191,7 @@ impl PeerData {
fn prune_old_advertisements(
&mut self,
implicit_view: &ImplicitView,
- active_leaves: &HashMap,
- per_relay_parent: &HashMap,
+ active_leaves: &HashMap,
) {
if let PeerState::Collating(ref mut peer_state) = self.state {
peer_state.advertisements.retain(|hash, _| {
@@ -205,36 +199,30 @@ impl PeerData {
// - Relay parent is an active leaf
// - It belongs to allowed ancestry under some leaf
// Discard otherwise.
- per_relay_parent.get(hash).map_or(false, |s| {
- is_relay_parent_in_implicit_view(
- hash,
- s.prospective_parachains_mode,
- implicit_view,
- active_leaves,
- peer_state.para_id,
- )
- })
+ is_relay_parent_in_implicit_view(
+ hash,
+ implicit_view,
+ active_leaves,
+ peer_state.para_id,
+ )
});
}
}
- /// Note an advertisement by the collator. Returns `true` if the advertisement was imported
- /// successfully. Fails if the advertisement is duplicate, out of view, or the peer has not
- /// declared itself a collator.
+ /// Performs sanity check for an advertisement and notes it as advertised.
fn insert_advertisement(
&mut self,
on_relay_parent: Hash,
- relay_parent_mode: ProspectiveParachainsMode,
candidate_hash: Option,
implicit_view: &ImplicitView,
- active_leaves: &HashMap,
+ active_leaves: &HashMap,
+ per_relay_parent: &PerRelayParent,
) -> std::result::Result<(CollatorId, ParaId), InsertAdvertisementError> {
match self.state {
PeerState::Connected(_) => Err(InsertAdvertisementError::UndeclaredCollator),
PeerState::Collating(ref mut state) => {
if !is_relay_parent_in_implicit_view(
&on_relay_parent,
- relay_parent_mode,
implicit_view,
active_leaves,
state.para_id,
@@ -242,53 +230,41 @@ impl PeerData {
return Err(InsertAdvertisementError::OutOfOurView)
}
- match (relay_parent_mode, candidate_hash) {
- (ProspectiveParachainsMode::Disabled, candidate_hash) => {
- if state.advertisements.contains_key(&on_relay_parent) {
- return Err(InsertAdvertisementError::Duplicate)
- }
- state
- .advertisements
- .insert(on_relay_parent, HashSet::from_iter(candidate_hash));
- },
- (
- ProspectiveParachainsMode::Enabled { max_candidate_depth, .. },
- candidate_hash,
- ) => {
- if let Some(candidate_hash) = candidate_hash {
- if state
- .advertisements
- .get(&on_relay_parent)
- .map_or(false, |candidates| candidates.contains(&candidate_hash))
- {
- return Err(InsertAdvertisementError::Duplicate)
- }
-
- let candidates =
- state.advertisements.entry(on_relay_parent).or_default();
-
- if candidates.len() > max_candidate_depth {
- return Err(InsertAdvertisementError::PeerLimitReached)
- }
- candidates.insert(candidate_hash);
- } else {
- if self.version != CollationVersion::V1 {
- gum::error!(
- target: LOG_TARGET,
- "Programming error, `candidate_hash` can not be `None` \
- for non `V1` networking.",
- );
- }
-
- if state.advertisements.contains_key(&on_relay_parent) {
- return Err(InsertAdvertisementError::Duplicate)
- }
- state
- .advertisements
- .insert(on_relay_parent, HashSet::from_iter(candidate_hash));
- };
- },
- }
+ if let Some(candidate_hash) = candidate_hash {
+ if state
+ .advertisements
+ .get(&on_relay_parent)
+ .map_or(false, |candidates| candidates.contains(&candidate_hash))
+ {
+ return Err(InsertAdvertisementError::Duplicate)
+ }
+
+ let candidates = state.advertisements.entry(on_relay_parent).or_default();
+
+ // Current assignments is equal to the length of the claim queue. No honest
+ // collator should send that many advertisements.
+ if candidates.len() > per_relay_parent.assignment.current.len() {
+ return Err(InsertAdvertisementError::PeerLimitReached)
+ }
+
+ candidates.insert(candidate_hash);
+ } else {
+ if self.version != CollationVersion::V1 {
+ gum::error!(
+ target: LOG_TARGET,
+ "Programming error, `candidate_hash` can not be `None` \
+ for non `V1` networking.",
+ );
+ }
+
+ if state.advertisements.contains_key(&on_relay_parent) {
+ return Err(InsertAdvertisementError::Duplicate)
+ }
+
+ state
+ .advertisements
+ .insert(on_relay_parent, HashSet::from_iter(candidate_hash));
+ };
state.last_active = Instant::now();
Ok((state.collator_id.clone(), state.para_id))
@@ -369,7 +345,6 @@ struct GroupAssignments {
}
struct PerRelayParent {
- prospective_parachains_mode: ProspectiveParachainsMode,
assignment: GroupAssignments,
collations: Collations,
v2_receipts: bool,
@@ -390,11 +365,10 @@ struct State {
/// ancestry of some active leaf, then it does support prospective parachains.
implicit_view: ImplicitView,
- /// All active leaves observed by us, including both that do and do not
- /// support prospective parachains. This mapping works as a replacement for
+ /// All active leaves observed by us. This mapping works as a replacement for
/// [`polkadot_node_network_protocol::View`] and can be dropped once the transition
/// to asynchronous backing is done.
- active_leaves: HashMap,
+ active_leaves: HashMap,
/// State tracked per relay parent.
per_relay_parent: HashMap,
@@ -437,23 +411,69 @@ struct State {
reputation: ReputationAggregator,
}
+impl State {
+ // Returns the number of seconded and pending collations for a specific `ParaId`. Pending
+ // collations are:
+ // 1. Collations being fetched from a collator.
+ // 2. Collations waiting for validation from backing subsystem.
+ // 3. Collations blocked from seconding due to parent not being known by backing subsystem.
+ fn seconded_and_pending_for_para(&self, relay_parent: &Hash, para_id: &ParaId) -> usize {
+ let seconded = self
+ .per_relay_parent
+ .get(relay_parent)
+ .map_or(0, |per_relay_parent| per_relay_parent.collations.seconded_for_para(para_id));
+
+ let pending_fetch = self.per_relay_parent.get(relay_parent).map_or(0, |rp_state| {
+ match rp_state.collations.status {
+ CollationStatus::Fetching(pending_para_id) if pending_para_id == *para_id => 1,
+ _ => 0,
+ }
+ });
+
+ let waiting_for_validation = self
+ .fetched_candidates
+ .keys()
+ .filter(|fc| fc.relay_parent == *relay_parent && fc.para_id == *para_id)
+ .count();
+
+ let blocked_from_seconding =
+ self.blocked_from_seconding.values().fold(0, |acc, blocked_collations| {
+ acc + blocked_collations
+ .iter()
+ .filter(|pc| {
+ pc.candidate_receipt.descriptor.para_id() == *para_id &&
+ pc.candidate_receipt.descriptor.relay_parent() == *relay_parent
+ })
+ .count()
+ });
+
+ gum::trace!(
+ target: LOG_TARGET,
+ ?relay_parent,
+ ?para_id,
+ seconded,
+ pending_fetch,
+ waiting_for_validation,
+ blocked_from_seconding,
+ "Seconded and pending collations for para",
+ );
+
+ seconded + pending_fetch + waiting_for_validation + blocked_from_seconding
+ }
+}
+
fn is_relay_parent_in_implicit_view(
relay_parent: &Hash,
- relay_parent_mode: ProspectiveParachainsMode,
implicit_view: &ImplicitView,
- active_leaves: &HashMap,
+ active_leaves: &HashMap,
para_id: ParaId,
) -> bool {
- match relay_parent_mode {
- ProspectiveParachainsMode::Disabled => active_leaves.contains_key(relay_parent),
- ProspectiveParachainsMode::Enabled { .. } => active_leaves.iter().any(|(hash, mode)| {
- mode.is_enabled() &&
- implicit_view
- .known_allowed_relay_parents_under(hash, Some(para_id))
- .unwrap_or_default()
- .contains(relay_parent)
- }),
- }
+ active_leaves.iter().any(|(hash, _)| {
+ implicit_view
+ .known_allowed_relay_parents_under(hash, Some(para_id))
+ .unwrap_or_default()
+ .contains(relay_parent)
+ })
}
async fn construct_per_relay_parent(
@@ -461,7 +481,6 @@ async fn construct_per_relay_parent(
current_assignments: &mut HashMap,
keystore: &KeystorePtr,
relay_parent: Hash,
- relay_parent_mode: ProspectiveParachainsMode,
v2_receipts: bool,
session_index: SessionIndex,
) -> Result