Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Migrate from MQCs in persisted validation data to merkle proofs #317

Merged
merged 7 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
601 changes: 318 additions & 283 deletions Cargo.lock

Large diffs are not rendered by default.

37 changes: 36 additions & 1 deletion collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,31 @@ where
})
.ok()?;

let ingress_channels = relay_parent_state_backend
.storage(&relay_well_known_keys::hrmp_ingress_channel_index(
self.para_id,
))
.map_err(|e| {
error!(
target: LOG_TARGET,
"Cannot obtain the hrmp ingress channel index: {:?}",
e,
)
})
.ok()?;
let ingress_channels = ingress_channels
.map(|raw| <Vec<ParaId>>::decode(&mut &raw[..]))
.transpose()
.map_err(|e| {
error!(
target: LOG_TARGET,
"Cannot decode the hrmp ingress channel index: {:?}",
e,
)
})
.ok()?
.unwrap_or_default();

let egress_channels = relay_parent_state_backend
.storage(&relay_well_known_keys::hrmp_egress_channel_index(
self.para_id,
Expand Down Expand Up @@ -246,12 +271,22 @@ where

let mut relevant_keys = vec![];
relevant_keys.push(relay_well_known_keys::ACTIVE_CONFIG.to_vec());
relevant_keys.push(relay_well_known_keys::dmq_mqc_head(self.para_id));
relevant_keys.push(relay_well_known_keys::relay_dispatch_queue_size(
self.para_id,
));
relevant_keys.push(relay_well_known_keys::hrmp_ingress_channel_index(
self.para_id,
));
relevant_keys.push(relay_well_known_keys::hrmp_egress_channel_index(
self.para_id,
));
relevant_keys.extend(ingress_channels.into_iter().map(|sender| {
relay_well_known_keys::hrmp_channels(HrmpChannelId {
sender,
recipient: self.para_id,
})
}));
relevant_keys.extend(egress_channels.into_iter().map(|recipient| {
relay_well_known_keys::hrmp_channels(HrmpChannelId {
sender: self.para_id,
Expand Down Expand Up @@ -586,7 +621,7 @@ where
);

let collation =
self.build_collation(b, block_hash, validation_data.block_number)?;
self.build_collation(b, block_hash, validation_data.relay_parent_number)?;
let pov_hash = collation.proof_of_validity.hash();

self.wait_to_announce
Expand Down
134 changes: 62 additions & 72 deletions parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,18 @@ decl_module! {
// which means we can put the initialization logic here to remove the
// sequencing problem.
if let Some((apply_block, validation_function)) = PendingValidationFunction::get() {
if vfp.block_number >= apply_block {
if vfp.relay_parent_number >= apply_block {
PendingValidationFunction::kill();
LastUpgrade::put(&apply_block);
Self::put_parachain_code(&validation_function);
Self::deposit_event(Event::ValidationFunctionApplied(vfp.block_number));
Self::deposit_event(Event::ValidationFunctionApplied(vfp.relay_parent_number));
}
}

let (host_config, relevant_messaging_state) =
relay_state_snapshot::extract_from_proof(
T::SelfParaId::get(),
vfp.relay_storage_root,
vfp.relay_parent_storage_root,
relay_chain_state
)
.map_err(|err| {
Expand All @@ -200,13 +200,19 @@ decl_module! {

storage::unhashed::put(VALIDATION_DATA, &vfp);
DidUpdateValidationData::put(true);
RelevantMessagingState::put(relevant_messaging_state);
RelevantMessagingState::put(relevant_messaging_state.clone());
HostConfiguration::put(host_config);

<T::OnValidationData as OnValidationData>::on_validation_data(&vfp);

Self::process_inbound_downward_messages(&vfp, downward_messages)?;
Self::process_inbound_horizontal_messages(&vfp, horizontal_messages)?;
Self::process_inbound_downward_messages(
relevant_messaging_state.dmq_mqc_head,
downward_messages,
)?;
Self::process_inbound_horizontal_messages(
&relevant_messaging_state.ingress_channels,
horizontal_messages,
)?;

Ok(())
}
Expand Down Expand Up @@ -437,7 +443,7 @@ impl<T: Config> Module<T> {
/// Checks if the sequence of the messages is valid, dispatches them and communicates the number
/// of processed messages to the collator via a storage update.
fn process_inbound_downward_messages(
vfp: &PersistedValidationData,
expected_dmq_mqc_head: relay_chain::Hash,
downward_messages: Vec<InboundDownwardMessage>,
) -> DispatchResult {
let dm_count = downward_messages.len() as u32;
Expand All @@ -453,7 +459,7 @@ impl<T: Config> Module<T> {
// After hashing each message in the message queue chain submitted by the collator, we should
// arrive to the MQC head provided by the relay chain.
ensure!(
result_mqc_head == vfp.dmq_mqc_head,
result_mqc_head == expected_dmq_mqc_head,
Error::<T>::DmpMqcMismatch
);

Expand All @@ -469,14 +475,14 @@ impl<T: Config> Module<T> {
/// This is similar to [`process_inbound_downward_messages`], but works on multiple inbound
/// channels.
fn process_inbound_horizontal_messages(
vfp: &PersistedValidationData,
ingress_channels: &[(ParaId, cumulus_primitives::AbridgedHrmpChannel)],
horizontal_messages: BTreeMap<ParaId, Vec<InboundHrmpMessage>>,
) -> DispatchResult {
// First, check that all submitted messages are sent from channels that exist. The channel
// exists if its MQC head is present in `vfp.hrmp_mqc_heads`.
for sender in horizontal_messages.keys() {
ensure!(
vfp.hrmp_mqc_heads
ingress_channels
.binary_search_by_key(sender, |&(s, _)| s)
.is_ok(),
Error::<T>::HrmpNoMqc,
Expand Down Expand Up @@ -533,13 +539,14 @@ impl<T: Config> Module<T> {
// `running_mqc_heads`. Otherwise, in a block where no messages were sent in a channel
// it won't get into next block's `last_mqc_heads` and thus will be all zeros, which
// would corrupt the message queue chain.
for &(ref sender, ref target_head) in &vfp.hrmp_mqc_heads {
for &(ref sender, ref channel) in ingress_channels {
let cur_head = running_mqc_heads
.entry(*sender)
.or_insert_with(|| last_mqc_heads.get(&sender).cloned().unwrap_or_default())
.head();
let target_head = channel.mqc_head.unwrap_or_default();

ensure!(&cur_head == target_head, Error::<T>::HrmpMqcMismatch);
ensure!(cur_head == target_head, Error::<T>::HrmpMqcMismatch);
}

LastHrmpMqcHeads::put(running_mqc_heads);
Expand Down Expand Up @@ -592,15 +599,15 @@ impl<T: Config> Module<T> {
}

let relay_blocks_since_last_upgrade = vfp
.block_number
.relay_parent_number
.saturating_sub(LastUpgrade::get());

if relay_blocks_since_last_upgrade <= cfg.validation_upgrade_frequency {
// The cooldown after the last upgrade hasn't elapsed yet. Upgrade is not allowed.
return None;
}

Some(vfp.block_number + cfg.validation_upgrade_delay)
Some(vfp.relay_parent_number + cfg.validation_upgrade_delay)
}

/// The implementation of the runtime upgrade scheduling.
Expand Down Expand Up @@ -1072,6 +1079,7 @@ mod tests {
self
}

#[allow(dead_code)] // might come in handy in future. If now is future and it still hasn't - feel free.
fn with_validation_data<F>(mut self, f: F) -> Self
where
F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut PersistedValidationData),
Expand Down Expand Up @@ -1117,11 +1125,11 @@ mod tests {
if let Some(ref hook) = self.relay_sproof_builder_hook {
hook(self, *n as RelayChainBlockNumber, &mut sproof_builder);
}
let (relay_storage_root, relay_chain_state) =
let (relay_parent_storage_root, relay_chain_state) =
sproof_builder.into_state_root_and_proof();
let mut vfp = PersistedValidationData {
block_number: *n as RelayChainBlockNumber,
relay_storage_root,
relay_parent_number: *n as RelayChainBlockNumber,
relay_parent_storage_root,
..Default::default()
};
if let Some(ref hook) = self.persisted_validation_data_hook {
Expand Down Expand Up @@ -1612,11 +1620,11 @@ mod tests {
}

BlockTests::new()
.with_validation_data(
|_, relay_block_num, validation_data| match relay_block_num {
.with_relay_sproof_builder(
|_, relay_block_num, sproof| match relay_block_num {
1 => {
validation_data.dmq_mqc_head =
MessageQueueChain::default().extend_downward(&MSG).head();
sproof.dmq_mqc_head =
Some(MessageQueueChain::default().extend_downward(&MSG).head());
}
_ => unreachable!(),
},
Expand Down Expand Up @@ -1661,39 +1669,31 @@ mod tests {
}

BlockTests::new()
.with_validation_data(
|_, relay_block_num, validation_data| match relay_block_num {
.with_relay_sproof_builder(
|_, relay_block_num, sproof| match relay_block_num {
1 => {
// 200 - doesn't exist yet
// 300 - one new message
validation_data.hrmp_mqc_heads.push((
ParaId::from(300),
MessageQueueChain::default().extend_hrmp(&MSG_1).head(),
));
sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head =
Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head());
}
2 => {
// 200 - two new messages
// 300 - now present with one message.
validation_data.hrmp_mqc_heads.push((
ParaId::from(200),
MessageQueueChain::default().extend_hrmp(&MSG_4).head(),
));
validation_data.hrmp_mqc_heads.push((
ParaId::from(300),
MessageQueueChain::default()
sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head =
Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head());
sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head =
Some(MessageQueueChain::default()
.extend_hrmp(&MSG_1)
.extend_hrmp(&MSG_2)
.extend_hrmp(&MSG_3)
.head(),
));
.head());
}
3 => {
// 200 - no new messages
// 300 - is gone
validation_data.hrmp_mqc_heads.push((
ParaId::from(200),
MessageQueueChain::default().extend_hrmp(&MSG_4).head(),
));
sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head =
Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head());
}
_ => unreachable!(),
},
Expand Down Expand Up @@ -1747,21 +1747,17 @@ mod tests {
#[test]
fn receive_hrmp_empty_channel() {
BlockTests::new()
.with_validation_data(
|_, relay_block_num, validation_data| match relay_block_num {
1 => {
// no channels
}
2 => {
// one new channel
validation_data.hrmp_mqc_heads.push((
ParaId::from(300),
MessageQueueChain::default().head(),
));
}
_ => unreachable!(),
},
)
.with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num {
1 => {
// no channels
}
2 => {
// one new channel
sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head =
Some(MessageQueueChain::default().head());
}
_ => unreachable!(),
})
.add(1, || {})
.add(2, || {});
}
Expand All @@ -1783,30 +1779,24 @@ mod tests {
const ALICE: ParaId = ParaId::new(300);

BlockTests::new()
.with_validation_data(
|_, relay_block_num, validation_data| match relay_block_num {
.with_relay_sproof_builder(
|_, relay_block_num, sproof| match relay_block_num {
1 => {
validation_data.hrmp_mqc_heads.push((
ALICE,
MessageQueueChain::default().extend_hrmp(&MSG_1).head(),
));
sproof.upsert_inbound_channel(ALICE).mqc_head
= Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head());
}
2 => {
// 300 - no new messages, mqc stayed the same.
validation_data.hrmp_mqc_heads.push((
ALICE,
MessageQueueChain::default().extend_hrmp(&MSG_1).head(),
));
sproof.upsert_inbound_channel(ALICE).mqc_head
= Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head());
}
3 => {
// 300 - new message.
validation_data.hrmp_mqc_heads.push((
ALICE,
MessageQueueChain::default()
.extend_hrmp(&MSG_1)
.extend_hrmp(&MSG_2)
.head(),
));
sproof.upsert_inbound_channel(ALICE).mqc_head
= Some(MessageQueueChain::default()
.extend_hrmp(&MSG_1)
.extend_hrmp(&MSG_2)
.head());
}
_ => unreachable!(),
},
Expand Down
Loading