Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into ao-session-info-migration
Browse files Browse the repository at this point in the history
* master:
  Fix currently-checking-cache (#4410)
  Companion for Substrate#10445 (#4506)
  pvf-precheck: update rustdoc for paras module (#4459)
  • Loading branch information
ordian committed Dec 22, 2021
2 parents 75aa29b + 42c6665 commit f51b27d
Show file tree
Hide file tree
Showing 9 changed files with 417 additions and 226 deletions.
336 changes: 168 additions & 168 deletions Cargo.lock

Large diffs are not rendered by default.

53 changes: 29 additions & 24 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ use futures::{
};

use std::{
collections::{btree_map::Entry, BTreeMap, HashMap, HashSet},
collections::{
btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
},
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -403,7 +405,7 @@ impl Wakeups {
}

// we are replacing previous wakeup with an earlier one.
if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
if let Some(pos) =
entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
{
Expand Down Expand Up @@ -439,7 +441,7 @@ impl Wakeups {
});

for (tick, pruned) in pruned_wakeups {
if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
if entry.get().is_empty() {
let _ = entry.remove();
Expand All @@ -460,10 +462,10 @@ impl Wakeups {
Some(tick) => {
clock.wait(tick).await;
match self.wakeups.entry(tick) {
Entry::Vacant(_) => {
BTMEntry::Vacant(_) => {
panic!("entry is known to exist since `first` was `Some`; qed")
},
Entry::Occupied(mut entry) => {
BTMEntry::Occupied(mut entry) => {
let (hash, candidate_hash) = entry.get_mut().pop()
.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");

Expand Down Expand Up @@ -510,9 +512,7 @@ impl ApprovalState {
}

struct CurrentlyCheckingSet {
/// Invariant: The contained `Vec` needs to stay sorted as we are using `binary_search_by_key`
/// on it.
candidate_hash_map: HashMap<CandidateHash, Vec<Hash>>,
candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
}

Expand All @@ -532,21 +532,26 @@ impl CurrentlyCheckingSet {
relay_block: Hash,
launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
) -> SubsystemResult<()> {
let val = self.candidate_hash_map.entry(candidate_hash).or_insert(Default::default());

if let Err(k) = val.binary_search_by_key(&relay_block, |v| *v) {
let _ = val.insert(k, relay_block);
let work = launch_work.await?;
self.currently_checking.push(Box::pin(async move {
match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
None => ApprovalState {
candidate_hash,
validator_index,
approval_outcome: ApprovalOutcome::TimedOut,
},
Some(approval_state) => approval_state,
}
}));
match self.candidate_hash_map.entry(candidate_hash) {
HMEntry::Occupied(mut entry) => {
// validation already undergoing. just add the relay hash if unknown.
entry.get_mut().insert(relay_block);
},
HMEntry::Vacant(entry) => {
// validation not ongoing. launch work and time out the remote handle.
entry.insert(HashSet::new()).insert(relay_block);
let work = launch_work.await?;
self.currently_checking.push(Box::pin(async move {
match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
None => ApprovalState {
candidate_hash,
validator_index,
approval_outcome: ApprovalOutcome::TimedOut,
},
Some(approval_state) => approval_state,
}
}));
},
}

Ok(())
Expand All @@ -555,7 +560,7 @@ impl CurrentlyCheckingSet {
pub async fn next(
&mut self,
approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
) -> (Vec<Hash>, ApprovalState) {
) -> (HashSet<Hash>, ApprovalState) {
if !self.currently_checking.is_empty() {
if let Some(approval_state) = self.currently_checking.next().await {
let out = self
Expand Down
141 changes: 122 additions & 19 deletions node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use super::*;
use polkadot_node_primitives::approval::{
AssignmentCert, AssignmentCertKind, DelayTranche, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT,
use polkadot_node_primitives::{
approval::{
AssignmentCert, AssignmentCertKind, DelayTranche, VRFOutput, VRFProof,
RELAY_VRF_MODULO_CONTEXT,
},
AvailableData, BlockData, PoV,
};
use polkadot_node_subsystem::{
messages::{AllMessages, ApprovalVotingMessage, AssignmentCheckResult},
messages::{
AllMessages, ApprovalVotingMessage, AssignmentCheckResult, AvailabilityRecoveryMessage,
},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_overseer::HeadSupportsParachains;
use polkadot_primitives::v1::{
CandidateEvent, CoreIndex, GroupIndex, Header, Id as ParaId, ValidatorSignature,
CandidateCommitments, CandidateEvent, CoreIndex, GroupIndex, Header, Id as ParaId,
ValidationCode, ValidatorSignature,
};
use std::time::Duration;

Expand Down Expand Up @@ -2138,7 +2145,7 @@ fn subsystem_approved_ancestor_missing_approval() {
}

#[test]
fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
fn subsystem_validate_approvals_cache() {
let assignment_criteria = Box::new(MockAssignmentCriteria(
|| {
let mut assignments = HashMap::new();
Expand Down Expand Up @@ -2168,7 +2175,10 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
} = test_harness;

let block_hash = Hash::repeat_byte(0x01);
let candidate_receipt = dummy_candidate_receipt(block_hash);
let fork_block_hash = Hash::repeat_byte(0x02);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();
let candidate_hash = candidate_receipt.hash();
let slot = Slot::from(1);
let candidate_index = 0;
Expand All @@ -2189,17 +2199,24 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
..session_info(&validators)
};

let candidates = Some(vec![(candidate_receipt.clone(), CoreIndex(0), GroupIndex(0))]);
ChainBuilder::new()
.add_block(
block_hash,
ChainBuilder::GENESIS_HASH,
1,
BlockConfig {
slot,
candidates: Some(vec![(candidate_receipt, CoreIndex(0), GroupIndex(0))]),
session_info: Some(session_info),
candidates: candidates.clone(),
session_info: Some(session_info.clone()),
},
)
.add_block(
fork_block_hash,
ChainBuilder::GENESIS_HASH,
1,
BlockConfig { slot, candidates, session_info: Some(session_info) },
)
.build(&mut virtual_overseer)
.await;

Expand All @@ -2211,19 +2228,8 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {

futures_timer::Delay::new(Duration::from_millis(200)).await;

assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot + 2)));
clock.inner.lock().wakeup_all(slot_to_tick(slot + 2));

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
c_index,
)) => {
assert_eq!(candidate_index, c_index);
}
);

assert_eq!(clock.inner.lock().wakeups.len(), 0);

futures_timer::Delay::new(Duration::from_millis(200)).await;
Expand All @@ -2233,10 +2239,92 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(our_assignment.triggered());

// Handle the the next two assignment imports, where only one should trigger approvals work
handle_double_assignment_import(&mut virtual_overseer, candidate_index).await;

virtual_overseer
});
}

/// Ensure that when two assignments are imported, only one triggers the Approval Checking work
pub async fn handle_double_assignment_import(
virtual_overseer: &mut VirtualOverseer,
candidate_index: CandidateIndex,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
c_index,
)) => {
assert_eq!(candidate_index, c_index);
}
);

recover_available_data(virtual_overseer).await;
fetch_validation_code(virtual_overseer).await;

let first_message = virtual_overseer.recv().await;
let second_message = virtual_overseer.recv().await;

for msg in vec![first_message, second_message].into_iter() {
match msg {
AllMessages::ApprovalDistribution(
ApprovalDistributionMessage::DistributeAssignment(_, c_index),
) => {
assert_eq!(candidate_index, c_index);
},
AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx),
) if timeout == APPROVAL_EXECUTION_TIMEOUT => {
tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
},
_ => panic! {},
}
}

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none());
}

/// Handles validation code fetch, returns the received relay parent hash.
async fn fetch_validation_code(virtual_overseer: &mut VirtualOverseer) -> Hash {
let validation_code = ValidationCode(Vec::new());

assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::ValidationCodeByHash(
_,
tx,
)
)) => {
tx.send(Ok(Some(validation_code))).unwrap();
hash
},
"overseer did not receive runtime API request for validation code",
)
}

async fn recover_available_data(virtual_overseer: &mut VirtualOverseer) {
let pov_block = PoV { block_data: BlockData(Vec::new()) };

let available_data =
AvailableData { pov: Arc::new(pov_block), validation_data: Default::default() };

assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityRecovery(
AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx)
) => {
tx.send(Ok(available_data)).unwrap();
},
"overseer did not receive recover available data message",
);
}

struct TriggersAssignmentConfig<F1, F2> {
our_assigned_tranche: DelayTranche,
assign_validator_tranche: F1,
Expand Down Expand Up @@ -2413,6 +2501,21 @@ async fn step_until_done(clock: &MockClock) {
println!("relevant_ticks: {:?}", relevant_ticks);
}

#[test]
fn subsystem_process_wakeup_trigger_assignment_launch_approval() {
triggers_assignment_test(TriggersAssignmentConfig {
our_assigned_tranche: 0,
assign_validator_tranche: |_| Ok(0),
no_show_slots: 0,
assignments_to_import: vec![1],
approvals_to_import: vec![1],
ticks: vec![
10, // Alice wakeup, assignment triggered
],
should_be_triggered: |_| true,
});
}

#[test]
fn subsystem_assignment_triggered_solo_zero_tranche() {
triggers_assignment_test(TriggersAssignmentConfig {
Expand Down
4 changes: 2 additions & 2 deletions runtime/kusama/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1832,9 +1832,9 @@ sp_api::impl_runtime_apis! {
}

impl beefy_primitives::BeefyApi<Block> for Runtime {
fn validator_set() -> beefy_primitives::ValidatorSet<BeefyId> {
fn validator_set() -> Option<beefy_primitives::ValidatorSet<BeefyId>> {
// dummy implementation due to lack of BEEFY pallet.
beefy_primitives::ValidatorSet { validators: Vec::new(), id: 0 }
None
}
}

Expand Down
Loading

0 comments on commit f51b27d

Please sign in to comment.