Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

collect included disputes from on-chain #3924

Merged
55 commits merged into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
de14670
dummy: impl another runtime API
drahnr Sep 24, 2021
32a32ed
query the on chain disputes, and inform self
drahnr Sep 24, 2021
5c4386d
make use of the refactor
drahnr Sep 24, 2021
06a83f3
minro
drahnr Sep 24, 2021
9096d78
SPLIT ME
drahnr Sep 24, 2021
2716e9e
write dispute values
drahnr Sep 27, 2021
784322f
wip
drahnr Sep 27, 2021
794c353
impl for all runtimes
drahnr Sep 28, 2021
b72b9fd
chore: fmt
drahnr Sep 28, 2021
a31bd85
[] -> get
drahnr Sep 28, 2021
69dd70f
fixup mock runtime
drahnr Sep 28, 2021
6f59992
fixup
drahnr Sep 28, 2021
1b6a837
fixup discovery for overseer init
drahnr Sep 28, 2021
4f44aba
chore: fmt
drahnr Sep 28, 2021
6454553
spellcheck
drahnr Sep 28, 2021
fe88251
rename imported_on_chain_disputes -> on_chain_votes
drahnr Sep 29, 2021
6765dc4
Merge remote-tracking branch 'origin/master' into bernhard-collect-on…
drahnr Sep 29, 2021
44cff30
reduction
drahnr Sep 29, 2021
b318611
make it mockable
drahnr Sep 29, 2021
06dfc07
Merge remote-tracking branch 'origin/master' into bernhard-collect-on…
drahnr Sep 29, 2021
eabed89
rename and refactor
drahnr Sep 29, 2021
dea3605
don't query on chain info if it's not needed
drahnr Sep 29, 2021
03fbfa6
yikes
drahnr Sep 29, 2021
8b3bb1f
fmt
drahnr Sep 29, 2021
c755ac6
fix test
drahnr Sep 29, 2021
a467848
minimal fix for existing tests
drahnr Sep 29, 2021
f3ff4b5
attempt to fetch the session info from the rolling window before fall…
drahnr Sep 29, 2021
3fa85c2
Merge branch 'master' into bernhard-collect-on-chain-disputes
drahnr Sep 29, 2021
b936d22
moved
drahnr Sep 30, 2021
652fecd
comments
drahnr Sep 30, 2021
d9f16cf
comments
drahnr Sep 30, 2021
57d9f32
test for backing votes
drahnr Sep 30, 2021
747021b
rename
drahnr Sep 30, 2021
6527535
Update runtime/polkadot/src/lib.rs
drahnr Oct 1, 2021
78aa973
chore: spellcheck + dict
drahnr Oct 1, 2021
ce43ca6
chore: fmt
drahnr Oct 1, 2021
f723136
fixup cache size
drahnr Oct 1, 2021
e4dba7e
add warning
drahnr Oct 1, 2021
b406539
logging, rationale, less defense
drahnr Oct 1, 2021
f8bf91f
introduce new unchecked, that still checks in debug builds
drahnr Oct 1, 2021
b0abbf4
fix
drahnr Oct 1, 2021
b6beb5d
draft alt approach
drahnr Oct 1, 2021
f37e64f
fix unused imports
drahnr Oct 1, 2021
648e19d
include the session
drahnr Oct 1, 2021
3124ad5
Update node/core/dispute-coordinator/src/real/mod.rs
drahnr Oct 2, 2021
95e6a86
provide where possible
drahnr Oct 2, 2021
4313fa3
expand comment
drahnr Oct 4, 2021
1bd9c54
fixin
drahnr Oct 4, 2021
d18e446
fixup
drahnr Oct 4, 2021
58a1ab9
ValidityVote <-> ValidityAttestation <-> CompactStatement has a 1:1 r…
drahnr Oct 4, 2021
1e972aa
mark TODO
drahnr Oct 4, 2021
3fe3857
Update primitives/src/v1/mod.rs
drahnr Oct 4, 2021
a1b6e36
address review comments
drahnr Oct 5, 2021
b943201
update docs
drahnr Oct 5, 2021
50b0658
Merge remote-tracking branch 'origin/master' into bernhard-collect-on…
drahnr Oct 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 114 additions & 12 deletions node/core/dispute-coordinator/src/real/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};

use futures::{channel::oneshot, prelude::*};
use kvdb::KeyValueDB;
use parity_scale_codec::{Decode, Encode, Error as CodecError};
use polkadot_node_primitives::{
CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
DISPUTE_WINDOW,
Expand All @@ -39,21 +42,17 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{
BlockDescription, ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult,
DisputeParticipationMessage, ImportStatementsResult, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
};
use polkadot_primitives::v1::{
BlockNumber, CandidateHash, CandidateReceipt, DisputeStatement, Hash, SessionIndex,
SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
BlockNumber, CandidateHash, CandidateReceipt, DisputeStatement, DisputeStatementSet, Hash,
SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};

use futures::{channel::oneshot, prelude::*};
use kvdb::KeyValueDB;
use parity_scale_codec::{Decode, Encode, Error as CodecError};
use sc_keystore::LocalKeystore;

use crate::metrics::Metrics;
Expand Down Expand Up @@ -348,6 +347,8 @@ where
&mut overlay_db,
&mut state,
update.activated.into_iter().map(|a| a.hash),
clock.now(),
&metrics,
)
.await?;
if !state.recovery_state.complete() {
Expand Down Expand Up @@ -476,6 +477,8 @@ async fn handle_new_activations(
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
new_activations: impl IntoIterator<Item = Hash>,
now: u64,
metrics: &Metrics,
) -> Result<(), Error> {
for new_leaf in new_activations {
let block_header = {
Expand All @@ -500,8 +503,6 @@ async fn handle_new_activations(
err = ?e,
"Failed to update session cache for disputes",
);

continue
},
Ok(SessionWindowUpdate::Initialized { window_end, .. }) |
Ok(SessionWindowUpdate::Advanced { new_window_end: window_end, .. }) => {
Expand All @@ -516,6 +517,108 @@ async fn handle_new_activations(
},
_ => {},
}

let on_chain_scraped: ScrapedImportDisputesAndBackingVotes = {
let (tx, rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
drahnr marked this conversation as resolved.
Show resolved Hide resolved
new_leaf,
RuntimeApiRequest::ImportedOnChainDisputes(tx),
))
.await;

match rx.await {
Ok(Ok(Some(val))) => val,
Ok(Ok(None)) => {
tracing::trace!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"No data stored for relay parent");
continue
},
Ok(Err(e)) => {
tracing::trace!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
error = ?e,
"Could not retrieve relay parent since the API returned an error");
continue
},
Err(e) => {
tracing::trace!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
error = ?e,
"Could not retrieve relay parent due to ");
continue
},
}
};

let session_info: SessionInfo = {
let (tx, rx) = oneshot::channel();
ctx.send_message(RuntimeApiRequest::SessionInfo(on_chain_scraped.session_index, tx))
.await;

match rx.await?? {
None => continue,
Some(session_info) => session_info,
}
};

// scraped on-chain backing votes
for (candidate_receipt, backers) in on_chain_scraped.backing_validators.iter() {
let statements = backers.iter().map(|validator_idx| {
DisputeStatement::Valid(DisputeStatementKind::Backing(new_leaf))
});
let _import_result = handle_import_statements(
ctx,
overlay_db,
state,
candidate_hash,
candidate_receipt,
on_chain_scraped.session_index,
statements,
now,
metrics,
)
.await?;
}

let candidate_backings = {
let backings = ctx
.send_message(CandidateBackingMessage::GetBackedCandidates(
new_leaf, candidates, tx,
))
.await?;
// FIXME XXX double check all those `?`s
let backings = rx.await??;
backings.into_iter().collect::<HashMap<_, _>>()
};

// concluded disputes from on-chain, this already went through a vote so it's assumed
// as verified and no gossip is needed, and hence.
'dss: for DisputeStatementSet { candidate_hash, session, statements } in
on_chain_scraped.disputes.iter()
{
let candidate_receipt = if let Some(candidate_receipt) =
backings.get(candidate_hash).map(|(_, backing_vote)| backing_vote.receipt())
{
candidate_receipt
} else {
tracing::warn!("Missing backing vote for disputed candidate {}", &candidate_hash);
continue 'dss
};
let mut votes =
CandidateVotes { candidate_receipt, valid: Vec::new(), invalid: Vec::new() };
let statements =
statements.into_iter().for_each(|dispute_statement: DisputeStatement| {
match dispute_statement {
DisputeStatement::Valid(valid) => votes.valid.push(valid),
DisputeStatement::Invalid(invalid) => votes.invalid.push(invalid),
}
});
overlay_db.write_candidate_votes(session, candidate_hash, votes);
}
}

Ok(())
Expand Down Expand Up @@ -644,9 +747,8 @@ async fn handle_import_statements(
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
statements: impl IntoIter<Item = (SignedDisputeStatement, ValidatorIndex)>,
now: Timestamp,
pending_confirmation: oneshot::Sender<ImportStatementsResult>,
metrics: &Metrics,
) -> Result<ImportStatementsResult, Error> {
if state.highest_session.map_or(true, |h| session + DISPUTE_WINDOW < h) {
Expand Down Expand Up @@ -928,7 +1030,7 @@ async fn issue_local_statement(
now,
metrics,
)
.await?
.await
{
Err(_) => {
tracing::error!(
Expand Down
22 changes: 21 additions & 1 deletion node/core/runtime-api/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent,
CommittedCandidateReceipt, CoreState, GroupRotationInfo, Hash, Id as ParaId,
InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData,
SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ScrapedImportDisputesAndBackingVotes, SessionIndex, SessionInfo, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex,
};

const AUTHORITIES_CACHE_SIZE: usize = 128 * 1024;
Expand Down Expand Up @@ -98,6 +99,8 @@ pub(crate) struct RequestResultCache {
ResidentSizeOf<BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>>>,
>,
current_babe_epoch: MemoryLruCache<Hash, DoesNotAllocate<Epoch>>,
imported_on_chain_disputes:
MemoryLruCache<Hash, ResidentSizeOf<Option<ScrapedImportDisputesAndBackingVotes>>>,
}

impl Default for RequestResultCache {
Expand All @@ -120,6 +123,7 @@ impl Default for RequestResultCache {
dmq_contents: MemoryLruCache::new(DMQ_CONTENTS_CACHE_SIZE),
inbound_hrmp_channels_contents: MemoryLruCache::new(INBOUND_HRMP_CHANNELS_CACHE_SIZE),
current_babe_epoch: MemoryLruCache::new(CURRENT_BABE_EPOCH_CACHE_SIZE),
imported_on_chain_disputes: MemoryLruCache::new(3),
}
}
}
Expand Down Expand Up @@ -320,6 +324,21 @@ impl RequestResultCache {
pub(crate) fn cache_current_babe_epoch(&mut self, relay_parent: Hash, epoch: Epoch) {
self.current_babe_epoch.insert(relay_parent, DoesNotAllocate(epoch));
}

pub(crate) fn imported_on_chain_disputes(
&mut self,
relay_parent: &Hash,
) -> Option<&Option<ScrapedImportDisputesAndBackingVotes>> {
self.imported_on_chain_disputes.get(relay_parent).map(|v| &v.0)
}

pub(crate) fn cache_imported_on_chain_disputes(
&mut self,
relay_parent: Hash,
scraped: Option<ScrapedImportDisputesAndBackingVotes>,
) {
self.imported_on_chain_disputes.insert(relay_parent, ResidentSizeOf(scraped));
}
}

pub(crate) enum RequestResult {
Expand All @@ -342,4 +361,5 @@ pub(crate) enum RequestResult {
BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>>,
),
CurrentBabeEpoch(Hash, Epoch),
ImportedOnChainDisputes(Hash, Option<ScrapedImportDisputesAndBackingVotes>),
}
7 changes: 7 additions & 0 deletions node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ where
.cache_inbound_hrmp_channel_contents((relay_parent, para_id), contents),
CurrentBabeEpoch(relay_parent, epoch) =>
self.requests_cache.cache_current_babe_epoch(relay_parent, epoch),
ImportedOnChainDisputes(relay_parent, scraped) =>
self.requests_cache.cache_imported_on_chain_disputes(relay_parent, scraped),
}
}

Expand Down Expand Up @@ -209,6 +211,9 @@ where
.map(|sender| Request::InboundHrmpChannelsContents(id, sender)),
Request::CurrentBabeEpoch(sender) =>
query!(current_babe_epoch(), sender).map(|sender| Request::CurrentBabeEpoch(sender)),
Request::ImportedOnChainDisputes(sender) =>
query!(imported_on_chain_disputes(), sender)
.map(|sender| Request::ImportedOnChainDisputes(sender)),
}
}

Expand Down Expand Up @@ -342,6 +347,8 @@ where
Request::InboundHrmpChannelsContents(id, sender) =>
query!(InboundHrmpChannelsContents, inbound_hrmp_channels_contents(id), sender),
Request::CurrentBabeEpoch(sender) => query!(CurrentBabeEpoch, current_epoch(), sender),
Request::ImportedOnChainDisputes(sender) =>
query!(ImportedOnChainDisputes, imported_on_chain_disputes(), sender),
}
}

Expand Down
8 changes: 6 additions & 2 deletions node/core/runtime-api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, GroupRotationInfo,
Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
PersistedValidationData, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash,
ValidatorId, ValidatorIndex,
PersistedValidationData, ScrapedImportDisputesAndBackingVotes, SessionIndex, SessionInfo,
ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
};
use sp_core::testing::TaskExecutor;
use std::{
Expand Down Expand Up @@ -149,6 +149,10 @@ sp_api::mock_impl_runtime_apis! {
) -> Option<ValidationCode> {
self.validation_code_by_hash.get(&hash).map(|c| c.clone())
}

fn imported_on_chain_disputes(&self) -> Option<ScrapedImportDisputesAndBackingVotes> {
None
}
}

impl BabeApi<Block> for MockRuntimeApi {
Expand Down
7 changes: 5 additions & 2 deletions node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
CollatorProtocolSubsystem,
ApprovalDistributionSubsystem,
ApprovalVotingSubsystem,
GossipSupportSubsystem,
GossipSupportSubsystem<AuthorityDiscoveryService>,
DisputeCoordinatorSubsystem,
DisputeParticipationSubsystem,
DisputeDistributionSubsystem<AuthorityDiscoveryService>,
Expand Down Expand Up @@ -405,7 +405,10 @@ where
Box::new(network_service.clone()),
Metrics::register(registry)?,
))
.gossip_support(GossipSupportSubsystem::new(keystore.clone()))
.gossip_support(GossipSupportSubsystem::new(
keystore.clone(),
authority_discovery_service.clone(),
))
.dispute_coordinator(DisputeCoordinatorSubsystem::new(
parachains_db.clone(),
dispute_coordinator_config,
Expand Down
4 changes: 4 additions & 0 deletions node/subsystem-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ pub enum RuntimeApiRequest {
),
/// Get information about the BABE epoch the block was included in.
CurrentBabeEpoch(RuntimeApiSender<BabeEpoch>),
/// Get all disputes in relation to a relay parent.
ImportedOnChainDisputes(
RuntimeApiSender<Option<polkadot_primitives::v1::ScrapedImportDisputesAndBackingVotes>>,
),
}

/// A message to the Runtime API subsystem.
Expand Down
22 changes: 21 additions & 1 deletion primitives/src/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,19 @@ pub struct SessionInfo {
pub needed_approvals: u32,
}

/// Scraped runtime backing votes and resolved disputes.
#[derive(Clone, Encode, Decode, RuntimeDebug, TypeInfo)]
#[cfg_attr(feature = "std", derive(PartialEq, Default, MallocSizeOf))]
pub struct ScrapedImportDisputesAndBackingVotes<H: Encode + Decode = Hash> {
/// The session in which the block was included.
pub session: SessionIndex,
/// Set of backing validators for each candidate, represented by it's candidate
drahnr marked this conversation as resolved.
Show resolved Hide resolved
/// receipt.
pub backing_validators: Vec<(CandidateReceipt<H>, Vec<ValidatorIndex>)>,
/// On-chain-recorded set of disputes.
pub disputes: MultiDisputeStatementSet,
}

/// A vote of approval on a candidate.
#[derive(Clone, RuntimeDebug)]
pub struct ApprovalVote(pub CandidateHash);
Expand All @@ -960,7 +973,7 @@ impl ApprovalVote {

sp_api::decl_runtime_apis! {
/// The API for querying the state of parachains on-chain.
pub trait ParachainHost<H: Decode = Hash, N: Encode + Decode = BlockNumber> {
pub trait ParachainHost<H: Encode + Decode = Hash, N: Encode + Decode = BlockNumber> {
/// Get the current validators.
fn validators() -> Vec<ValidatorId>;

Expand Down Expand Up @@ -1017,6 +1030,9 @@ sp_api::decl_runtime_apis! {

/// Get the validation code from its hash.
fn validation_code_by_hash(hash: ValidationCodeHash) -> Option<ValidationCode>;

/// Scrape dispute relevant from on-chain, backing votes and resolved disputes.
fn imported_on_chain_disputes() -> Option<ScrapedImportDisputesAndBackingVotes<H>>;
drahnr marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -1182,6 +1198,7 @@ impl<H> From<ConsensusLog> for runtime_primitives::DigestItem<H> {
///
/// Statements are either in favor of the candidate's validity or against it.
#[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug, TypeInfo)]
#[cfg_attr(feature = "std", derive(MallocSizeOf))]
pub enum DisputeStatement {
/// A valid statement, of the given kind.
#[codec(index = 0)]
Expand Down Expand Up @@ -1251,6 +1268,7 @@ impl DisputeStatement {

/// Different kinds of statements of validity on a candidate.
#[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug, TypeInfo)]
#[cfg_attr(feature = "std", derive(MallocSizeOf))]
pub enum ValidDisputeStatementKind {
/// An explicit statement issued as part of a dispute.
#[codec(index = 0)]
Expand All @@ -1268,6 +1286,7 @@ pub enum ValidDisputeStatementKind {

/// Different kinds of statements of invalidity on a candidate.
#[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug, TypeInfo)]
#[cfg_attr(feature = "std", derive(MallocSizeOf))]
pub enum InvalidDisputeStatementKind {
/// An explicit statement issued as part of a dispute.
#[codec(index = 0)]
Expand Down Expand Up @@ -1296,6 +1315,7 @@ impl ExplicitDisputeStatement {

/// A set of statements about a specific candidate.
#[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug, TypeInfo)]
#[cfg_attr(feature = "std", derive(MallocSizeOf))]
pub struct DisputeStatementSet {
/// The candidate referenced by this set.
pub candidate_hash: CandidateHash,
Expand Down
Loading