From 8217ca6764cce626a440bf83dc6418b6b396b9a0 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Thu, 23 Jul 2020 16:05:48 +0200 Subject: [PATCH] implement bitfield signing subsystem (#1364) * update guide to reduce confusion and TODOs * work from previous bitfield signing effort There were large merge issues with the old bitfield signing PR, so we're just copying all the work from that onto this and restarting. Much of the existing work will be discarded because we now have better tools available, but that's fine. * start rewriting bitfield signing in terms of the util module * implement construct_availability_bitvec It's not an ideal implementation--we can make it much more concurrent-- but at least it compiles. * implement the unimplemented portions of bitfield signing * get core availability concurrently, not sequentially * use sp-std instead of std for a parachain item * resolve type inference failure caused by multiple From impls * handle bitfield signing subsystem & Allmessages variant in overseer * fix more multi-From inference issues * more concisely handle overflow Co-authored-by: Andronik Ordian * Revert "resolve type inference failure caused by multiple From impls" This reverts commit 7fc77805de5e5074a1b01037f8d4e3919e03e0e1. * Revert "fix more multi-From inference issues" This reverts commit f14ffe589e20d664d8a900ed62f68b6fb844a514. * impl From for ParaId * handle another instance of AllSubsystems * improve consistency when returning existing options Co-authored-by: Andronik Ordian --- Cargo.lock | 14 + Cargo.toml | 1 + node/bitfield-signing/Cargo.toml | 15 + node/bitfield-signing/src/lib.rs | 290 ++++++++++++++++++ node/overseer/examples/minimal-example.rs | 1 + node/overseer/src/lib.rs | 32 +- node/service/src/lib.rs | 1 + node/subsystem/src/messages.rs | 27 +- parachain/src/primitives.rs | 30 ++ .../src/node/availability/bitfield-signing.md | 6 +- 10 files changed, 411 insertions(+), 6 deletions(-) create mode 100644 node/bitfield-signing/Cargo.toml create mode 100644 node/bitfield-signing/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4f58980d5f3c..76b257eddd79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4544,6 +4544,20 @@ dependencies = [ "sp-runtime", ] +[[package]] +name = "polkadot-node-bitfield-signing" +version = "0.1.0" +dependencies = [ + "bitvec", + "derive_more 0.99.9", + "futures 0.3.5", + "log 0.4.8", + "polkadot-node-subsystem", + "polkadot-primitives", + "sc-keystore", + "wasm-timer", +] + [[package]] name = "polkadot-node-core-backing" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 28ad17ac4d77..58211e162039 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "service", "validation", + "node/bitfield-signing", "node/core/proposer", "node/network/bridge", "node/network/pov-distribution", diff --git a/node/bitfield-signing/Cargo.toml b/node/bitfield-signing/Cargo.toml new file mode 100644 index 000000000000..c3a2934aaddd --- /dev/null +++ b/node/bitfield-signing/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "polkadot-node-bitfield-signing" +version = "0.1.0" +authors = ["Peter Goodspeed-Niklaus "] +edition = "2018" + +[dependencies] +bitvec = "0.17.4" +derive_more = "0.99.9" +futures = "0.3.5" +log = "0.4.8" +polkadot-primitives = { path = "../../primitives" } +polkadot-node-subsystem = { path = "../subsystem" } +keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" } +wasm-timer = "0.2.4" diff --git a/node/bitfield-signing/src/lib.rs b/node/bitfield-signing/src/lib.rs new file mode 100644 index 000000000000..b973f4b3ae09 --- /dev/null +++ b/node/bitfield-signing/src/lib.rs @@ -0,0 +1,290 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block. + +use bitvec::bitvec; +use futures::{ + channel::{mpsc, oneshot}, + prelude::*, + stream, Future, +}; +use keystore::KeyStorePtr; +use polkadot_node_subsystem::{ + messages::{ + self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, + BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, + }, + util::{self, JobManager, JobTrait, ToJobTrait, Validator}, +}; +use polkadot_primitives::v1::{AvailabilityBitfield, CoreOccupied, Hash}; +use std::{convert::TryFrom, pin::Pin, time::Duration}; +use wasm_timer::{Delay, Instant}; + +/// Delay between starting a bitfield signing job and its attempting to create a bitfield. +const JOB_DELAY: Duration = Duration::from_millis(1500); + +/// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent. +pub struct BitfieldSigningJob; + +/// Messages which a `BitfieldSigningJob` is prepared to receive. +pub enum ToJob { + BitfieldSigning(BitfieldSigningMessage), + Stop, +} + +impl ToJobTrait for ToJob { + const STOP: Self = ToJob::Stop; + + fn relay_parent(&self) -> Option { + match self { + Self::BitfieldSigning(bsm) => bsm.relay_parent(), + Self::Stop => None, + } + } +} + +impl TryFrom for ToJob { + type Error = (); + + fn try_from(msg: AllMessages) -> Result { + match msg { + AllMessages::BitfieldSigning(bsm) => Ok(ToJob::BitfieldSigning(bsm)), + _ => Err(()), + } + } +} + +impl From for ToJob { + fn from(bsm: BitfieldSigningMessage) -> ToJob { + ToJob::BitfieldSigning(bsm) + } +} + +/// Messages which may be sent from a `BitfieldSigningJob`. +pub enum FromJob { + AvailabilityStore(AvailabilityStoreMessage), + BitfieldDistribution(BitfieldDistributionMessage), + CandidateBacking(CandidateBackingMessage), + RuntimeApi(RuntimeApiMessage), +} + +impl From for AllMessages { + fn from(from_job: FromJob) -> AllMessages { + match from_job { + FromJob::AvailabilityStore(asm) => AllMessages::AvailabilityStore(asm), + FromJob::BitfieldDistribution(bdm) => AllMessages::BitfieldDistribution(bdm), + FromJob::CandidateBacking(cbm) => AllMessages::CandidateBacking(cbm), + FromJob::RuntimeApi(ram) => AllMessages::RuntimeApi(ram), + } + } +} + +impl TryFrom for FromJob { + type Error = (); + + fn try_from(msg: AllMessages) -> Result { + match msg { + AllMessages::AvailabilityStore(asm) => Ok(Self::AvailabilityStore(asm)), + AllMessages::BitfieldDistribution(bdm) => Ok(Self::BitfieldDistribution(bdm)), + AllMessages::CandidateBacking(cbm) => Ok(Self::CandidateBacking(cbm)), + AllMessages::RuntimeApi(ram) => Ok(Self::RuntimeApi(ram)), + _ => Err(()), + } + } +} + +/// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`. +#[derive(Debug, derive_more::From)] +pub enum Error { + /// error propagated from the utility subsystem + #[from] + Util(util::Error), + /// io error + #[from] + Io(std::io::Error), + /// a one shot channel was canceled + #[from] + Oneshot(oneshot::Canceled), + /// a mspc channel failed to send + #[from] + MpscSend(mpsc::SendError), + /// several errors collected into one + #[from] + Multiple(Vec), +} + +// this function exists mainly to collect a bunch of potential error points into one. +async fn get_core_availability( + relay_parent: Hash, + idx: usize, + core: Option, + sender: &mpsc::Sender, +) -> Result { + use messages::{ + AvailabilityStoreMessage::QueryPoVAvailable, + RuntimeApiRequest::CandidatePendingAvailability, + }; + use FromJob::{AvailabilityStore, RuntimeApi}; + use RuntimeApiMessage::Request; + + // we have to (cheaply) clone this sender so we can mutate it to actually send anything + let mut sender = sender.clone(); + + // REVIEW: is it safe to ignore parathreads here, or do they also figure in the availability mapping? + if let Some(CoreOccupied::Parachain) = core { + let (tx, rx) = oneshot::channel(); + sender + .send(RuntimeApi(Request( + relay_parent, + CandidatePendingAvailability(idx.into(), tx), + ))) + .await?; + let committed_candidate_receipt = match rx.await? { + Some(ccr) => ccr, + None => return Ok(false), + }; + let (tx, rx) = oneshot::channel(); + sender + .send(AvailabilityStore(QueryPoVAvailable( + committed_candidate_receipt.descriptor.pov_hash, + tx, + ))) + .await?; + return rx.await.map_err(Into::into); + } + Ok(false) +} + +// the way this function works is not intuitive: +// +// - get the scheduler roster so we have a list of cores, in order. +// - for each occupied core, fetch `candidate_pending_availability` from runtime +// - from there, we can get the `CandidateDescriptor` +// - from there, we can send a `AvailabilityStore::QueryPoV` and set the indexed bit to 1 if it returns Some(_) +async fn construct_availability_bitfield( + relay_parent: Hash, + sender: &mut mpsc::Sender, +) -> Result { + use futures::lock::Mutex; + + use messages::RuntimeApiRequest::ValidatorGroups; + use FromJob::RuntimeApi; + use RuntimeApiMessage::Request; + + // request the validator groups so we can get the scheduler roster + let (tx, rx) = oneshot::channel(); + sender + .send(RuntimeApi(Request(relay_parent, ValidatorGroups(tx)))) + .await?; + + // we now need sender to be immutable so we can copy the reference to multiple concurrent closures + let sender = &*sender; + + // wait for the scheduler roster + let scheduler_roster = rx.await?; + + // prepare outputs + let out = + Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; scheduler_roster.availability_cores.len())); + // in principle, we know that we never want concurrent access to the _same_ bit within the vec; + // we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding + // any need to ever wait to lock this mutex. + // in practice, it's safer to just use the mutex, and speed optimizations should wait until + // benchmarking proves that they are necessary. + let out_ref = &out; + let errs = Mutex::new(Vec::new()); + let errs_ref = &errs; + + // Handle each (idx, core) pair concurrently + // + // In principle, this work is all concurrent, not parallel. In practice, we can't guarantee it, which is why + // we need the mutexes and explicit references above. + stream::iter(scheduler_roster.availability_cores.into_iter().enumerate()) + .for_each_concurrent(None, |(idx, core)| async move { + let availability = match get_core_availability(relay_parent, idx, core, sender).await { + Ok(availability) => availability, + Err(err) => { + errs_ref.lock().await.push(err); + return; + } + }; + out_ref.lock().await.set(idx, availability); + }) + .await; + + let errs = errs.into_inner(); + if errs.is_empty() { + Ok(out.into_inner().into()) + } else { + Err(errs.into()) + } +} + +impl JobTrait for BitfieldSigningJob { + type ToJob = ToJob; + type FromJob = FromJob; + type Error = Error; + type RunArgs = KeyStorePtr; + + const NAME: &'static str = "BitfieldSigningJob"; + + /// Run a job for the parent block indicated + fn run( + relay_parent: Hash, + keystore: Self::RunArgs, + _receiver: mpsc::Receiver, + mut sender: mpsc::Sender, + ) -> Pin> + Send>> { + async move { + // figure out when to wait to + let wait_until = Instant::now() + JOB_DELAY; + + // now do all the work we can before we need to wait for the availability store + // if we're not a validator, we can just succeed effortlessly + let validator = match Validator::new(relay_parent, keystore, sender.clone()).await { + Ok(validator) => validator, + Err(util::Error::NotAValidator) => return Ok(()), + Err(err) => return Err(Error::Util(err)), + }; + + // wait a bit before doing anything else + Delay::new_at(wait_until).await?; + + let bitfield = construct_availability_bitfield(relay_parent, &mut sender).await?; + let signed_bitfield = validator.sign(bitfield); + + // make an anonymous scope to contain some use statements to simplify creating the outbound message + { + use BitfieldDistributionMessage::DistributeBitfield; + use FromJob::BitfieldDistribution; + + sender + .send(BitfieldDistribution(DistributeBitfield( + relay_parent, + signed_bitfield, + ))) + .await + .map_err(Into::into) + } + } + .boxed() + } +} + +/// BitfieldSigningSubsystem manages a number of bitfield signing jobs. +pub type BitfieldSigningSubsystem = + JobManager; diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 35ebf8f3367d..36440275cb5b 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -147,6 +147,7 @@ fn main() { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index c6ac6444c29e..cb158d09c129 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -77,7 +77,7 @@ use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use polkadot_subsystem::messages::{ CandidateValidationMessage, CandidateBackingMessage, CandidateSelectionMessage, StatementDistributionMessage, - AvailabilityDistributionMessage, BitfieldDistributionMessage, + AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage, ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, }; @@ -339,6 +339,9 @@ pub struct Overseer { /// An availability distribution subsystem. availability_distribution_subsystem: OverseenSubsystem, + /// A bitfield signing subsystem. + bitfield_signing_subsystem: OverseenSubsystem, + /// A bitfield distribution subsystem. bitfield_distribution_subsystem: OverseenSubsystem, @@ -390,7 +393,7 @@ pub struct Overseer { /// /// [`Subsystem`]: trait.Subsystem.html /// [`DummySubsystem`]: struct.DummySubsystem.html -pub struct AllSubsystems { +pub struct AllSubsystems { /// A candidate validation subsystem. pub candidate_validation: CV, /// A candidate backing subsystem. @@ -401,6 +404,8 @@ pub struct AllSubsystems { pub statement_distribution: SD, /// An availability distribution subsystem. pub availability_distribution: AD, + /// A bitfield signing subsystem. + pub bitfield_signing: BS, /// A bitfield distribution subsystem. pub bitfield_distribution: BD, /// A provisioner subsystem. @@ -487,6 +492,7 @@ where /// candidate_selection: DummySubsystem, /// statement_distribution: DummySubsystem, /// availability_distribution: DummySubsystem, + /// bitfield_signing: DummySubsystem, /// bitfield_distribution: DummySubsystem, /// provisioner: DummySubsystem, /// pov_distribution: DummySubsystem, @@ -513,9 +519,9 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new( leaves: impl IntoIterator, - all_subsystems: AllSubsystems, + all_subsystems: AllSubsystems, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> where @@ -524,6 +530,7 @@ where CS: Subsystem> + Send, SD: Subsystem> + Send, AD: Subsystem> + Send, + BS: Subsystem> + Send, BD: Subsystem> + Send, P: Subsystem> + Send, PoVD: Subsystem> + Send, @@ -575,6 +582,13 @@ where all_subsystems.availability_distribution, )?; + let bitfield_signing_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.bitfield_signing, + )?; + let bitfield_distribution_subsystem = spawn( &mut s, &mut running_subsystems, @@ -630,6 +644,7 @@ where candidate_selection_subsystem, statement_distribution_subsystem, availability_distribution_subsystem, + bitfield_signing_subsystem, bitfield_distribution_subsystem, provisioner_subsystem, pov_distribution_subsystem, @@ -871,6 +886,11 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } + AllMessages::BitfieldSigning(msg) => { + if let Some(ref mut s) = self.bitfield_signing_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication{ msg }).await; + } + } AllMessages::Provisioner(msg) => { if let Some(ref mut s) = self.provisioner_subsystem.instance { let _ = s.tx.send(FromOverseer::Communication { msg }).await; @@ -1050,6 +1070,7 @@ mod tests { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, @@ -1112,6 +1133,7 @@ mod tests { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, @@ -1227,6 +1249,7 @@ mod tests { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, @@ -1323,6 +1346,7 @@ mod tests { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 02907e151f32..49df3da528e5 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -302,6 +302,7 @@ fn real_overseer( candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index a6b7859c38ef..a2f0bf8bbc8b 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -26,7 +26,7 @@ use futures::channel::{mpsc, oneshot}; use polkadot_primitives::v1::{ BlockNumber, Hash, - CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, + CandidateReceipt, CommittedCandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, CoreAssignment, CoreOccupied, HeadData, CandidateDescriptor, ValidatorSignature, OmittedValidationData, @@ -219,12 +219,32 @@ impl BitfieldDistributionMessage { } } +/// Bitfield signing message. +/// +/// Currently non-instantiable. +#[derive(Debug)] +pub enum BitfieldSigningMessage {} + +impl BitfieldSigningMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + None + } +} + /// Availability store subsystem message. #[derive(Debug)] pub enum AvailabilityStoreMessage { /// Query a `PoV` from the AV store. QueryPoV(Hash, oneshot::Sender>), + /// Query whether a `PoV` exists within the AV Store. + /// + /// This is useful in cases like bitfield signing, when existence + /// matters, but we don't want to necessarily pass around multiple + /// megabytes of data to get a single bit of information. + QueryPoVAvailable(Hash, oneshot::Sender), + /// Query an `ErasureChunk` from the AV store. QueryChunk(Hash, ValidatorIndex, oneshot::Sender), @@ -237,6 +257,7 @@ impl AvailabilityStoreMessage { pub fn relay_parent(&self) -> Option { match self { Self::QueryPoV(hash, _) => Some(*hash), + Self::QueryPoVAvailable(hash, _) => Some(*hash), Self::QueryChunk(hash, _, _) => Some(*hash), Self::StoreChunk(hash, _, _) => Some(*hash), } @@ -271,6 +292,8 @@ pub enum RuntimeApiRequest { ValidationCode(ParaId, BlockNumber, Option, oneshot::Sender), /// Get head data for a specific para. HeadData(ParaId, oneshot::Sender), + /// Get a the candidate pending availability for a particular parachain by parachain / core index + CandidatePendingAvailability(ParaId, oneshot::Sender>), } /// A message to the Runtime API subsystem. @@ -397,6 +420,8 @@ pub enum AllMessages { AvailabilityDistribution(AvailabilityDistributionMessage), /// Message for the bitfield distribution subsystem. BitfieldDistribution(BitfieldDistributionMessage), + /// Message for the bitfield signing subsystem. + BitfieldSigning(BitfieldSigningMessage), /// Message for the Provisioner subsystem. Provisioner(ProvisionerMessage), /// Message for the PoV Distribution subsystem. diff --git a/parachain/src/primitives.rs b/parachain/src/primitives.rs index f6b69cd89441..65d890e26dcf 100644 --- a/parachain/src/primitives.rs +++ b/parachain/src/primitives.rs @@ -80,6 +80,36 @@ impl From for Id { fn from(x: u32) -> Self { Id(x) } } +impl From for Id { + fn from(x: usize) -> Self { + use sp_std::convert::TryInto; + // can't panic, so need to truncate + let x = x.try_into().unwrap_or(u32::MAX); + Id(x) + } +} + +// When we added a second From impl for Id, type inference could no longer +// determine which impl should apply for things like `5.into()`. It therefore +// raised a bunch of errors in our test code, scattered throughout the +// various modules' tests, that there is no impl of `From` (`i32` being +// the default numeric type). +// +// We can't use `cfg(test)` here, because that configuration directive does not +// propagate between crates, which would fail to fix tests in crates other than +// this one. +// +// Instead, let's take advantage of the observation that what really matters for a +// ParaId within a test context is that it is unique and constant. I believe that +// there is no case where someone does `(-1).into()` anyway, but if they do, it +// never matters whether the actual contained ID is `-1` or `4294967295`. Nobody +// does arithmetic on a `ParaId`; doing so would be a bug. +impl From for Id { + fn from(x: i32) -> Self { + Id(x as u32) + } +} + const USER_INDEX_START: u32 = 1000; /// The ID of the first user (non-system) parachain. diff --git a/roadmap/implementers-guide/src/node/availability/bitfield-signing.md b/roadmap/implementers-guide/src/node/availability/bitfield-signing.md index 736fac4ff1b4..0ca9badd32e2 100644 --- a/roadmap/implementers-guide/src/node/availability/bitfield-signing.md +++ b/roadmap/implementers-guide/src/node/availability/bitfield-signing.md @@ -4,6 +4,10 @@ Validators vote on the availability of a backed candidate by issuing signed bitf ## Protocol +Input: + +There is no dedicated input mechanism for bitfield signing. Instead, Bitfield Signing produces a bitfield representing the current state of availability on `StartWork`. + Output: - BitfieldDistribution::DistributeBitfield: distribute a locally signed bitfield @@ -18,8 +22,8 @@ Upon receipt of an `ActiveLeavesUpdate`, launch bitfield signing job for each `a Localized to a specific relay-parent `r` If not running as a validator, do nothing. +- Begin by waiting a fixed period of time so availability distribution has the chance to make candidates available. - Determine our validator index `i`, the set of backed candidates pending availability in `r`, and which bit of the bitfield each corresponds to. -- > TODO: wait T time for availability distribution? - Start with an empty bitfield. For each bit in the bitfield, if there is a candidate pending availability, query the [Availability Store](../utility/availability-store.md) for whether we have the availability chunk for our validator index. - For all chunks we have, set the corresponding bit in the bitfield. - Sign the bitfield and dispatch a `BitfieldDistribution::DistributeBitfield` message.