Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

collator-protocol: asynchronous backing changes #5740

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
8718ba1
Draft collator side changes
slumber Jul 12, 2022
943103c
Start working on collations management
slumber Jul 13, 2022
2fb910f
Handle peer's view change
slumber Jul 14, 2022
7766a66
Versioning on advertising
slumber Jul 15, 2022
5b698ca
Versioned collation fetching request
slumber Jul 16, 2022
5caae2f
Handle versioned messages
slumber Jul 18, 2022
70b432e
Improve docs for collation requests
slumber Jul 18, 2022
65280e0
Add spans
slumber Jul 18, 2022
96e1817
Add request receiver to overseer
slumber Jul 18, 2022
77ba097
Fix collator side tests
slumber Jul 18, 2022
0288476
Extract relay parent mode to lib
slumber Jul 31, 2022
19a47b0
Validator side draft
slumber Aug 4, 2022
9fee11e
Add more checks for advertisement
slumber Aug 5, 2022
a72212f
Request pvd based on async backing mode
slumber Aug 10, 2022
0d9bc24
review
slumber Aug 12, 2022
6d41025
Validator side improvements
slumber Aug 12, 2022
eeaa1ea
Make old tests green
slumber Aug 12, 2022
2732e26
More fixes
slumber Aug 24, 2022
1435696
Collator side tests draft
slumber Sep 5, 2022
1997a84
Send collation test
slumber Sep 6, 2022
0e5c8e0
Merge remote-tracking branch 'origin/rh-async-backing-feature' into s…
slumber Sep 13, 2022
9c74814
fmt
slumber Sep 13, 2022
cf09d40
Collator side network protocol versioning
slumber Sep 13, 2022
16643c6
cleanup
slumber Sep 13, 2022
32b6674
merge artifacts
slumber Sep 13, 2022
23fc0a0
Validator side net protocol versioning
slumber Sep 14, 2022
c014585
Remove fragment tree membership request
slumber Sep 14, 2022
3d85d94
Merge remote-tracking branch 'origin/rh-async-backing-feature' into s…
slumber Sep 15, 2022
81b957e
Resolve todo
slumber Sep 15, 2022
91961c7
Collator side core state test
slumber Sep 15, 2022
e647370
Improve net protocol compatibility
slumber Sep 15, 2022
0b0c70a
Validator side tests
slumber Sep 15, 2022
d3b85c0
Merge remote-tracking branch 'origin/rh-async-backing-feature' into s…
slumber Oct 3, 2022
941a031
more improvements
slumber Oct 3, 2022
50de99b
style fixes
slumber Oct 5, 2022
6baac29
downgrade log
slumber Oct 5, 2022
8acca05
Track implicit assignments
slumber Oct 5, 2022
c07c046
Limit the number of seconded candidates per para
slumber Oct 5, 2022
3582622
Add a sanity check
slumber Oct 5, 2022
67f6d11
Handle fetched candidate
slumber Oct 6, 2022
4f68a61
fix tests
slumber Oct 6, 2022
7a527f7
Retry fetch
slumber Oct 6, 2022
d87bcf3
Guard against dequeueing while already fetching
slumber Oct 6, 2022
86b5781
Merge remote-tracking branch 'origin/rh-async-backing-feature' into s…
slumber Oct 7, 2022
61195c2
Reintegrate connection management
slumber Oct 8, 2022
ef25127
Timeout on advertisements
slumber Oct 8, 2022
1323e70
fmt
slumber Oct 12, 2022
409483b
spellcheck
slumber Oct 12, 2022
2f64fdd
Merge branch 'rh-async-backing-feature' into slumber-async-backing-co…
slumber Oct 12, 2022
c194b8e
update tests after merge
slumber Oct 12, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ async fn handle_new_activations<Context>(
"collation-builder",
Box::pin(async move {
let persisted_validation_data_hash = validation_data.hash();
let parent_head_data_hash = validation_data.parent_head.hash();

let (collation, result_sender) =
match (task_config.collator)(relay_parent, &validation_data).await {
Expand Down Expand Up @@ -385,8 +386,13 @@ async fn handle_new_activations<Context>(

if let Err(err) = task_sender
.send(
CollatorProtocolMessage::DistributeCollation(ccr, pov, result_sender)
.into(),
CollatorProtocolMessage::DistributeCollation(
ccr,
parent_head_data_hash,
pov,
result_sender,
)
.into(),
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions node/network/collator-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ always-assert = "0.1.2"
futures = "0.3.21"
futures-timer = "3"
gum = { package = "tracing-gum", path = "../../gum" }
bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }

sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
139 changes: 139 additions & 0 deletions node/network/collator-protocol/src/collator_side/collation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Primitives for tracking collations-related data.

use std::collections::{HashSet, VecDeque};

use futures::{future::BoxFuture, stream::FuturesUnordered};

use polkadot_node_network_protocol::{
request_response::{
incoming::OutgoingResponse, v1 as protocol_v1, vstaging as protocol_vstaging,
IncomingRequest,
},
PeerId,
};
use polkadot_node_primitives::PoV;
use polkadot_primitives::v2::{CandidateHash, CandidateReceipt, Hash, Id as ParaId};

/// The status of a collation as seen from the collator.
pub enum CollationStatus {
/// The collation was created, but we did not advertise it to any validator.
Created,
/// The collation was advertised to at least one validator.
Advertised,
/// The collation was requested by at least one validator.
Requested,
}

impl CollationStatus {
/// Advance to the [`Self::Advertised`] status.
///
/// This ensures that `self` isn't already [`Self::Requested`].
pub fn advance_to_advertised(&mut self) {
if !matches!(self, Self::Requested) {
*self = Self::Advertised;
}
}

/// Advance to the [`Self::Requested`] status.
pub fn advance_to_requested(&mut self) {
*self = Self::Requested;
}
}

/// A collation built by the collator.
pub struct Collation {
pub receipt: CandidateReceipt,
pub parent_head_data_hash: Hash,
pub pov: PoV,
pub status: CollationStatus,
}

/// Stores the state for waiting collation fetches per relay parent.
#[derive(Default)]
pub struct WaitingCollationFetches {
/// A flag indicating that we have an ongoing request.
/// This limits the number of collations being sent at any moment
/// of time to 1 for each relay parent.
///
/// If set to `true`, any new request will be queued.
pub collation_fetch_active: bool,
Copy link
Member

@eskimor eskimor Oct 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this flag? We can always retrieve that information from !req_queue_queue.is_empty()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, the queue may be empty with an ongoing collation request.

/// The collation fetches waiting to be fulfilled.
pub waiting: VecDeque<VersionedCollationRequest>,
/// All peers that are waiting or actively uploading.
///
/// We will not accept multiple requests from the same peer, otherwise our DoS protection of
/// moving on to the next peer after `MAX_UNSHARED_UPLOAD_TIME` would be pointless.
pub waiting_peers: HashSet<(PeerId, CandidateHash)>,
}

/// Backwards-compatible wrapper for incoming collations requests.
pub enum VersionedCollationRequest {
V1(IncomingRequest<protocol_v1::CollationFetchingRequest>),
VStaging(IncomingRequest<protocol_vstaging::CollationFetchingRequest>),
}

impl From<IncomingRequest<protocol_v1::CollationFetchingRequest>> for VersionedCollationRequest {
fn from(req: IncomingRequest<protocol_v1::CollationFetchingRequest>) -> Self {
Self::V1(req)
}
}

impl From<IncomingRequest<protocol_vstaging::CollationFetchingRequest>>
for VersionedCollationRequest
{
fn from(req: IncomingRequest<protocol_vstaging::CollationFetchingRequest>) -> Self {
Self::VStaging(req)
}
}

impl VersionedCollationRequest {
pub fn para_id(&self) -> ParaId {
match self {
VersionedCollationRequest::V1(req) => req.payload.para_id,
VersionedCollationRequest::VStaging(req) => req.payload.para_id,
}
}

pub fn relay_parent(&self) -> Hash {
match self {
VersionedCollationRequest::V1(req) => req.payload.relay_parent,
VersionedCollationRequest::VStaging(req) => req.payload.relay_parent,
}
}

pub fn peer_id(&self) -> PeerId {
match self {
VersionedCollationRequest::V1(req) => req.peer,
VersionedCollationRequest::VStaging(req) => req.peer,
}
}

pub fn send_outgoing_response(
self,
response: OutgoingResponse<protocol_v1::CollationFetchingResponse>,
) -> Result<(), ()> {
match self {
VersionedCollationRequest::V1(req) => req.send_outgoing_response(response),
VersionedCollationRequest::VStaging(req) => req.send_outgoing_response(response),
}
}
}

pub type ActiveCollationFetches =
FuturesUnordered<BoxFuture<'static, (Hash, CandidateHash, PeerId)>>;
123 changes: 123 additions & 0 deletions node/network/collator-protocol/src/collator_side/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use polkadot_node_subsystem_util::metrics::{self, prometheus};

#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
pub fn on_advertisment_made(&self) {
if let Some(metrics) = &self.0 {
metrics.advertisements_made.inc();
}
}

pub fn on_collation_sent_requested(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_send_requested.inc();
}
}

pub fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
}
}

/// Provide a timer for `process_msg` which observes on drop.
pub fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
}

/// Provide a timer for `distribute_collation` which observes on drop.
pub fn time_collation_distribution(
&self,
label: &'static str,
) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| {
metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
})
}
}

#[derive(Clone)]
struct MetricsInner {
advertisements_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
collations_send_requested: prometheus::Counter<prometheus::U64>,
process_msg: prometheus::Histogram,
collation_distribution_time: prometheus::HistogramVec,
}

impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
advertisements_made: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collation_advertisements_made_total",
"A number of collation advertisements sent to validators.",
)?,
registry,
)?,
collations_send_requested: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collations_sent_requested_total",
"A number of collations requested to be sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collations_sent_total",
"A number of collations sent to validators.",
)?,
registry,
)?,
process_msg: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_collator_protocol_collator_process_msg",
"Time spent within `collator_protocol_collator::process_msg`",
)
.buckets(vec![
0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
1.0,
]),
)?,
registry,
)?,
collation_distribution_time: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_collator_protocol_collator_distribution_time",
"Time spent within `collator_protocol_collator::distribute_collation`",
)
.buckets(vec![
0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
1.0,
]),
&["state"],
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
}
}
Loading