From 7a42e853b84c6b5389e073d16a973518101152d0 Mon Sep 17 00:00:00 2001 From: Adrian Catangiu Date: Wed, 7 Dec 2022 12:19:46 +0200 Subject: [PATCH] Mmr persist state (#12822) client/mmr: persisting gadget state across runs Fixes #12780 * client/mmr: on init do canonicalization catch-up * client/mmr: add more tests * client/mmr: persist gadget progress in aux db * client/mmr: add more tests * client/mmr: replace async_std with tokio * remove leftover comment * address review comments Signed-off-by: acatangiu --- Cargo.lock | 2 + client/merkle-mountain-range/Cargo.toml | 4 +- .../merkle-mountain-range/src/aux_schema.rs | 228 ++++++++++++++++++ client/merkle-mountain-range/src/lib.rs | 64 +++-- .../merkle-mountain-range/src/offchain_mmr.rs | 163 +++++++++++-- .../merkle-mountain-range/src/test_utils.rs | 98 +++++--- 6 files changed, 477 insertions(+), 82 deletions(-) create mode 100644 client/merkle-mountain-range/src/aux_schema.rs diff --git a/Cargo.lock b/Cargo.lock index 1ba2462a24caa..c4f06732a94b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4137,6 +4137,7 @@ dependencies = [ "futures", "log", "parity-scale-codec", + "parking_lot 0.12.1", "sc-block-builder", "sc-client-api", "sc-offchain", @@ -4148,6 +4149,7 @@ dependencies = [ "sp-io", "sp-mmr-primitives", "sp-runtime", + "sp-tracing", "substrate-test-runtime-client", "tokio", ] diff --git a/client/merkle-mountain-range/Cargo.toml b/client/merkle-mountain-range/Cargo.toml index e32764eff1d63..4fb423cee83bc 100644 --- a/client/merkle-mountain-range/Cargo.toml +++ b/client/merkle-mountain-range/Cargo.toml @@ -26,6 +26,8 @@ sc-offchain = { version = "4.0.0-dev", path = "../offchain" } sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" } [dev-dependencies] -tokio = "1.17.0" +parking_lot = "0.12.1" sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } +sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" } substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } +tokio = "1.17.0" diff --git a/client/merkle-mountain-range/src/aux_schema.rs b/client/merkle-mountain-range/src/aux_schema.rs new file mode 100644 index 0000000000000..907deb0bde239 --- /dev/null +++ b/client/merkle-mountain-range/src/aux_schema.rs @@ -0,0 +1,228 @@ +// This file is part of Substrate. + +// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Schema for MMR-gadget state persisted in the aux-db. + +use crate::LOG_TARGET; +use codec::{Decode, Encode}; +use log::{info, trace}; +use sc_client_api::backend::AuxStore; +use sp_blockchain::{Error as ClientError, Result as ClientResult}; +use sp_runtime::traits::{Block, NumberFor}; + +const VERSION_KEY: &[u8] = b"mmr_auxschema_version"; +const GADGET_STATE: &[u8] = b"mmr_gadget_state"; + +const CURRENT_VERSION: u32 = 1; +pub(crate) type PersistedState = NumberFor; + +pub(crate) fn write_current_version(backend: &B) -> ClientResult<()> { + info!(target: LOG_TARGET, "write aux schema version {:?}", CURRENT_VERSION); + AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[]) +} + +/// Write gadget state. +pub(crate) fn write_gadget_state( + backend: &BE, + state: &PersistedState, +) -> ClientResult<()> { + trace!(target: LOG_TARGET, "persisting {:?}", state); + backend.insert_aux(&[(GADGET_STATE, state.encode().as_slice())], &[]) +} + +fn load_decode(backend: &B, key: &[u8]) -> ClientResult> { + match backend.get_aux(key)? { + None => Ok(None), + Some(t) => T::decode(&mut &t[..]) + .map_err(|e| ClientError::Backend(format!("MMR aux DB is corrupted: {}", e))) + .map(Some), + } +} + +/// Load or initialize persistent data from backend. +pub(crate) fn load_persistent(backend: &BE) -> ClientResult>> +where + B: Block, + BE: AuxStore, +{ + let version: Option = load_decode(backend, VERSION_KEY)?; + + match version { + None => (), + Some(1) => return load_decode::<_, PersistedState>(backend, GADGET_STATE), + other => + return Err(ClientError::Backend(format!("Unsupported MMR aux DB version: {:?}", other))), + } + + // No persistent state found in DB. + Ok(None) +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::test_utils::{ + run_test_with_mmr_gadget_pre_post_using_client, MmrBlock, MockClient, OffchainKeyType, + }; + use parking_lot::Mutex; + use sp_core::offchain::{DbExternalities, StorageKind}; + use sp_mmr_primitives::utils::NodesUtils; + use sp_runtime::generic::BlockId; + use std::{sync::Arc, time::Duration}; + use substrate_test_runtime_client::{runtime::Block, Backend}; + + #[test] + fn should_load_persistent_sanity_checks() { + let client = MockClient::new(); + let backend = &*client.backend; + + // version not available in db -> None + assert_eq!(load_persistent::(backend).unwrap(), None); + + // populate version in db + write_current_version(backend).unwrap(); + // verify correct version is retrieved + assert_eq!(load_decode(backend, VERSION_KEY).unwrap(), Some(CURRENT_VERSION)); + + // version is available in db but state isn't -> None + assert_eq!(load_persistent::(backend).unwrap(), None); + } + + #[test] + fn should_persist_progress_across_runs() { + sp_tracing::try_init_simple(); + + let client = Arc::new(MockClient::new()); + let backend = client.backend.clone(); + + // version not available in db -> None + assert_eq!(load_decode::>(&*backend, VERSION_KEY).unwrap(), None); + // state not available in db -> None + assert_eq!(load_persistent::(&*backend).unwrap(), None); + // run the gadget while importing and finalizing 3 blocks + run_test_with_mmr_gadget_pre_post_using_client( + client.clone(), + |_| async {}, + |client| async move { + let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await; + let a2 = client.import_block(&BlockId::Number(1), b"a2", Some(1)).await; + let a3 = client.import_block(&BlockId::Number(2), b"a3", Some(2)).await; + client.finalize_block(a3.hash(), Some(3)); + tokio::time::sleep(Duration::from_millis(200)).await; + // a1, a2, a3 were canonicalized + client.assert_canonicalized(&[&a1, &a2, &a3]); + }, + ); + + // verify previous progress was persisted and run the gadget again + run_test_with_mmr_gadget_pre_post_using_client( + client.clone(), + |client| async move { + let backend = &*client.backend; + // check there is both version and best canon available in db before running gadget + assert_eq!(load_decode(backend, VERSION_KEY).unwrap(), Some(CURRENT_VERSION)); + assert_eq!(load_persistent::(backend).unwrap(), Some(3)); + }, + |client| async move { + let a4 = client.import_block(&BlockId::Number(3), b"a4", Some(3)).await; + let a5 = client.import_block(&BlockId::Number(4), b"a5", Some(4)).await; + let a6 = client.import_block(&BlockId::Number(5), b"a6", Some(5)).await; + client.finalize_block(a6.hash(), Some(6)); + tokio::time::sleep(Duration::from_millis(200)).await; + + // a4, a5, a6 were canonicalized + client.assert_canonicalized(&[&a4, &a5, &a6]); + // check persisted best canon was updated + assert_eq!(load_persistent::(&*client.backend).unwrap(), Some(6)); + }, + ); + } + + #[test] + fn should_resume_from_persisted_state() { + sp_tracing::try_init_simple(); + + let client = Arc::new(MockClient::new()); + let blocks = Arc::new(Mutex::new(Vec::::new())); + let blocks_clone = blocks.clone(); + + // run the gadget while importing and finalizing 3 blocks + run_test_with_mmr_gadget_pre_post_using_client( + client.clone(), + |_| async {}, + |client| async move { + let mut blocks = blocks_clone.lock(); + blocks.push(client.import_block(&BlockId::Number(0), b"a1", Some(0)).await); + blocks.push(client.import_block(&BlockId::Number(1), b"a2", Some(1)).await); + blocks.push(client.import_block(&BlockId::Number(2), b"a3", Some(2)).await); + client.finalize_block(blocks.last().unwrap().hash(), Some(3)); + tokio::time::sleep(Duration::from_millis(200)).await; + // a1, a2, a3 were canonicalized + let slice: Vec<&MmrBlock> = blocks.iter().collect(); + client.assert_canonicalized(&slice); + + // now manually move them back to non-canon/temp location + let mut offchain_db = client.offchain_db(); + for mmr_block in slice { + for node in NodesUtils::right_branch_ending_in_leaf(mmr_block.leaf_idx.unwrap()) + { + let canon_key = mmr_block.get_offchain_key(node, OffchainKeyType::Canon); + let val = offchain_db + .local_storage_get(StorageKind::PERSISTENT, &canon_key) + .unwrap(); + offchain_db.local_storage_clear(StorageKind::PERSISTENT, &canon_key); + + let temp_key = mmr_block.get_offchain_key(node, OffchainKeyType::Temp); + offchain_db.local_storage_set(StorageKind::PERSISTENT, &temp_key, &val); + } + } + }, + ); + + let blocks_clone = blocks.clone(); + // verify new gadget continues from block 4 and ignores 1, 2, 3 based on persisted state + run_test_with_mmr_gadget_pre_post_using_client( + client.clone(), + |client| async move { + let blocks = blocks_clone.lock(); + let slice: Vec<&MmrBlock> = blocks.iter().collect(); + + // verify persisted state says a1, a2, a3 were canonicalized, + assert_eq!(load_persistent::(&*client.backend).unwrap(), Some(3)); + // but actually they are NOT canon (we manually reverted them earlier). + client.assert_not_canonicalized(&slice); + }, + |client| async move { + let a4 = client.import_block(&BlockId::Number(3), b"a4", Some(3)).await; + let a5 = client.import_block(&BlockId::Number(4), b"a5", Some(4)).await; + let a6 = client.import_block(&BlockId::Number(5), b"a6", Some(5)).await; + client.finalize_block(a6.hash(), Some(6)); + tokio::time::sleep(Duration::from_millis(200)).await; + + let block_1_to_3 = blocks.lock(); + let slice: Vec<&MmrBlock> = block_1_to_3.iter().collect(); + // verify a1, a2, a3 are still NOT canon (skipped by gadget based on data in aux db) + client.assert_not_canonicalized(&slice); + // but a4, a5, a6 were canonicalized + client.assert_canonicalized(&[&a4, &a5, &a6]); + // check persisted best canon was updated + assert_eq!(load_persistent::(&*client.backend).unwrap(), Some(6)); + }, + ); + } +} diff --git a/client/merkle-mountain-range/src/lib.rs b/client/merkle-mountain-range/src/lib.rs index 59f26b4265708..401a5d5d4d56b 100644 --- a/client/merkle-mountain-range/src/lib.rs +++ b/client/merkle-mountain-range/src/lib.rs @@ -37,15 +37,15 @@ #![warn(missing_docs)] +mod aux_schema; mod offchain_mmr; #[cfg(test)] pub mod test_utils; -use std::{marker::PhantomData, sync::Arc}; - +use crate::offchain_mmr::OffchainMmr; +use beefy_primitives::MmrRootHash; use futures::StreamExt; -use log::{error, trace, warn}; - +use log::{debug, error, trace, warn}; use sc_client_api::{Backend, BlockchainEvents, FinalityNotifications}; use sc_offchain::OffchainDb; use sp_api::ProvideRuntimeApi; @@ -55,50 +55,75 @@ use sp_runtime::{ generic::BlockId, traits::{Block, Header, NumberFor}, }; - -use crate::offchain_mmr::OffchainMMR; -use beefy_primitives::MmrRootHash; -use sp_core::offchain::OffchainStorage; +use std::{marker::PhantomData, sync::Arc}; /// Logging target for the mmr gadget. pub const LOG_TARGET: &str = "mmr"; -struct OffchainMmrBuilder { +struct OffchainMmrBuilder, C> { + backend: Arc, client: Arc, - offchain_db: OffchainDb, + offchain_db: OffchainDb, indexing_prefix: Vec, _phantom: PhantomData, } -impl OffchainMmrBuilder +impl OffchainMmrBuilder where B: Block, + BE: Backend, C: ProvideRuntimeApi + HeaderBackend + HeaderMetadata, C::Api: MmrApi>, - S: OffchainStorage, { async fn try_build( self, finality_notifications: &mut FinalityNotifications, - ) -> Option> { + ) -> Option> { while let Some(notification) = finality_notifications.next().await { let best_block = *notification.header.number(); match self.client.runtime_api().mmr_leaf_count(&BlockId::number(best_block)) { Ok(Ok(mmr_leaf_count)) => { + debug!( + target: LOG_TARGET, + "pallet-mmr detected at block {:?} with mmr size {:?}", + best_block, + mmr_leaf_count + ); match utils::first_mmr_block_num::(best_block, mmr_leaf_count) { Ok(first_mmr_block) => { - let mut offchain_mmr = OffchainMMR { + debug!( + target: LOG_TARGET, + "pallet-mmr genesis computed at block {:?}", first_mmr_block, + ); + let best_canonicalized = + match offchain_mmr::load_or_init_best_canonicalized::( + &*self.backend, + first_mmr_block, + ) { + Ok(best) => best, + Err(e) => { + error!( + target: LOG_TARGET, + "Error loading state from aux db: {:?}", e + ); + return None + }, + }; + let mut offchain_mmr = OffchainMmr { + backend: self.backend, client: self.client, offchain_db: self.offchain_db, indexing_prefix: self.indexing_prefix, first_mmr_block, - - _phantom: Default::default(), + best_canonicalized, }; + // We need to make sure all blocks leading up to current notification + // have also been canonicalized. + offchain_mmr.canonicalize_catch_up(¬ification); // We have to canonicalize and prune the blocks in the finality // notification that lead to building the offchain-mmr as well. - offchain_mmr.canonicalize_and_prune(¬ification); + offchain_mmr.canonicalize_and_prune(notification); return Some(offchain_mmr) }, Err(e) => { @@ -143,14 +168,14 @@ where C: BlockchainEvents + HeaderBackend + HeaderMetadata + ProvideRuntimeApi, C::Api: MmrApi>, { - async fn run(mut self, builder: OffchainMmrBuilder) { + async fn run(mut self, builder: OffchainMmrBuilder) { let mut offchain_mmr = match builder.try_build(&mut self.finality_notifications).await { Some(offchain_mmr) => offchain_mmr, None => return, }; while let Some(notification) = self.finality_notifications.next().await { - offchain_mmr.canonicalize_and_prune(¬ification); + offchain_mmr.canonicalize_and_prune(notification); } } @@ -174,6 +199,7 @@ where }; mmr_gadget .run(OffchainMmrBuilder { + backend, client, offchain_db, indexing_prefix, diff --git a/client/merkle-mountain-range/src/offchain_mmr.rs b/client/merkle-mountain-range/src/offchain_mmr.rs index 1cdd3810b4c52..988b3ffef882a 100644 --- a/client/merkle-mountain-range/src/offchain_mmr.rs +++ b/client/merkle-mountain-range/src/offchain_mmr.rs @@ -21,33 +21,57 @@ #![warn(missing_docs)] -use std::{marker::PhantomData, sync::Arc}; - -use log::{debug, error, warn}; - -use sc_client_api::FinalityNotification; +use crate::{aux_schema, LOG_TARGET}; +use log::{debug, error, info, warn}; +use sc_client_api::{AuxStore, Backend, FinalityNotification}; use sc_offchain::OffchainDb; use sp_blockchain::{CachedHeaderMetadata, ForkBackend, HeaderBackend, HeaderMetadata}; -use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind}; +use sp_core::offchain::{DbExternalities, StorageKind}; use sp_mmr_primitives::{utils, utils::NodesUtils, NodeIndex}; -use sp_runtime::traits::{Block, Header}; - -use crate::LOG_TARGET; +use sp_runtime::{ + traits::{Block, NumberFor, One}, + Saturating, +}; +use std::{collections::VecDeque, sync::Arc}; + +pub(crate) fn load_or_init_best_canonicalized( + backend: &BE, + first_mmr_block: NumberFor, +) -> sp_blockchain::Result> +where + BE: AuxStore, + B: Block, +{ + // Initialize gadget best_canon from AUX DB or from pallet genesis. + if let Some(best) = aux_schema::load_persistent::(backend)? { + info!(target: LOG_TARGET, "Loading MMR best canonicalized state from db: {:?}.", best); + Ok(best) + } else { + let best = first_mmr_block.saturating_sub(One::one()); + info!( + target: LOG_TARGET, + "Loading MMR from pallet genesis on what appears to be the first startup: {:?}.", best + ); + aux_schema::write_current_version(backend)?; + aux_schema::write_gadget_state::(backend, &best)?; + Ok(best) + } +} /// `OffchainMMR` exposes MMR offchain canonicalization and pruning logic. -pub struct OffchainMMR { +pub struct OffchainMmr, C> { + pub backend: Arc, pub client: Arc, - pub offchain_db: OffchainDb, + pub offchain_db: OffchainDb, pub indexing_prefix: Vec, - pub first_mmr_block: ::Number, - - pub _phantom: PhantomData, + pub first_mmr_block: NumberFor, + pub best_canonicalized: NumberFor, } -impl OffchainMMR +impl OffchainMmr where C: HeaderBackend + HeaderMetadata, - S: OffchainStorage, + BE: Backend, B: Block, { fn node_temp_offchain_key(&self, pos: NodeIndex, parent_hash: B::Hash) -> Vec { @@ -77,7 +101,7 @@ where fn right_branch_ending_in_block_or_log( &self, - block_num: ::Number, + block_num: NumberFor, action: &str, ) -> Option> { match utils::block_num_to_leaf_index::(block_num, self.first_mmr_block) { @@ -128,9 +152,9 @@ where } } - fn canonicalize_branch(&mut self, block_hash: &B::Hash) { + fn canonicalize_branch(&mut self, block_hash: B::Hash) { let action = "canonicalize"; - let header = match self.header_metadata_or_log(*block_hash, action) { + let header = match self.header_metadata_or_log(block_hash, action) { Some(header) => header, _ => return, }; @@ -148,6 +172,7 @@ where None => { // If we can't convert the block number to a leaf index, the chain state is probably // corrupted. We only log the error, hoping that the chain state will be fixed. + self.best_canonicalized = header.number; return }, }; @@ -174,16 +199,58 @@ where ); } } + if self.best_canonicalized != header.number.saturating_sub(One::one()) { + warn!( + target: LOG_TARGET, + "Detected canonicalization skip: best {:?} current {:?}.", + self.best_canonicalized, + header.number, + ); + } + self.best_canonicalized = header.number; + } + + /// In case of missed finality notifications (node restarts for example), + /// make sure to also canon everything leading up to `notification.tree_route`. + pub fn canonicalize_catch_up(&mut self, notification: &FinalityNotification) { + let first = notification.tree_route.first().unwrap_or(¬ification.hash); + if let Some(mut header) = self.header_metadata_or_log(*first, "canonicalize") { + let mut to_canon = VecDeque::<::Hash>::new(); + // Walk up the chain adding all blocks newer than `self.best_canonicalized`. + loop { + header = match self.header_metadata_or_log(header.parent, "canonicalize") { + Some(header) => header, + _ => break, + }; + if header.number <= self.best_canonicalized { + break + } + to_canon.push_front(header.hash); + } + // Canonicalize all blocks leading up to current finality notification. + for hash in to_canon.drain(..) { + self.canonicalize_branch(hash); + } + if let Err(e) = + aux_schema::write_gadget_state::(&*self.backend, &self.best_canonicalized) + { + debug!(target: LOG_TARGET, "error saving state: {:?}", e); + } + } } /// Move leafs and nodes added by finalized blocks in offchain db from _fork-aware key_ to /// _canonical key_. /// Prune leafs and nodes added by stale blocks in offchain db from _fork-aware key_. - pub fn canonicalize_and_prune(&mut self, notification: &FinalityNotification) { + pub fn canonicalize_and_prune(&mut self, notification: FinalityNotification) { // Move offchain MMR nodes for finalized blocks to canonical keys. - for block_hash in notification.tree_route.iter().chain(std::iter::once(¬ification.hash)) + for hash in notification.tree_route.iter().chain(std::iter::once(¬ification.hash)) { + self.canonicalize_branch(*hash); + } + if let Err(e) = + aux_schema::write_gadget_state::(&*self.backend, &self.best_canonicalized) { - self.canonicalize_branch(block_hash); + debug!(target: LOG_TARGET, "error saving state: {:?}", e); } // Remove offchain MMR nodes for stale forks. @@ -201,9 +268,10 @@ where #[cfg(test)] mod tests { - use crate::test_utils::run_test_with_mmr_gadget; + use crate::test_utils::{run_test_with_mmr_gadget, run_test_with_mmr_gadget_pre_post}; + use parking_lot::Mutex; use sp_runtime::generic::BlockId; - use std::time::Duration; + use std::{sync::Arc, time::Duration}; #[test] fn canonicalize_and_prune_works_correctly() { @@ -243,4 +311,51 @@ mod tests { client.assert_pruned(&[&b1, &b2, &b3, &a4]); }) } + + #[test] + fn canonicalize_catchup_works_correctly() { + let mmr_blocks = Arc::new(Mutex::new(vec![])); + let mmr_blocks_ref = mmr_blocks.clone(); + run_test_with_mmr_gadget_pre_post( + |client| async move { + // G -> A1 -> A2 + // | | + // | | -> finalized without gadget (missed notification) + // | + // | -> first mmr block + + let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await; + let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await; + + client.finalize_block(a2.hash(), Some(2)); + + { + let mut mmr_blocks = mmr_blocks_ref.lock(); + mmr_blocks.push(a1); + mmr_blocks.push(a2); + } + }, + |client| async move { + // G -> A1 -> A2 -> A3 -> A4 + // | | | | + // | | | | -> finalized after starting gadget + // | | | + // | | | -> gadget start + // | | + // | | -> finalized before starting gadget (missed notification) + // | + // | -> first mmr block + let blocks = mmr_blocks.lock(); + let a1 = blocks[0].clone(); + let a2 = blocks[1].clone(); + let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(2)).await; + let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(3)).await; + + client.finalize_block(a4.hash(), Some(4)); + tokio::time::sleep(Duration::from_millis(200)).await; + // expected finalized heads: a1, a2 _and_ a3, a4. + client.assert_canonicalized(&[&a1, &a2, &a3, &a4]); + }, + ) + } } diff --git a/client/merkle-mountain-range/src/test_utils.rs b/client/merkle-mountain-range/src/test_utils.rs index b854686b2dc86..f345fb52578ab 100644 --- a/client/merkle-mountain-range/src/test_utils.rs +++ b/client/merkle-mountain-range/src/test_utils.rs @@ -16,12 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{ - future::Future, - sync::{Arc, Mutex}, - time::Duration, -}; - +use crate::MmrGadget; +use parking_lot::Mutex; use sc_block_builder::BlockBuilderProvider; use sc_client_api::{ Backend as BackendT, BlockchainEvents, FinalityNotifications, ImportNotifications, @@ -41,33 +37,34 @@ use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header as HeaderT}, }; +use std::{future::Future, sync::Arc, time::Duration}; use substrate_test_runtime_client::{ runtime::{Block, BlockNumber, Hash, Header}, Backend, BlockBuilderExt, Client, ClientBlockImportExt, ClientExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, }; - -use crate::MmrGadget; +use tokio::runtime::Runtime; type MmrHash = H256; -struct MockRuntimeApiData { - num_blocks: BlockNumber, +pub(crate) struct MockRuntimeApiData { + pub(crate) num_blocks: BlockNumber, } #[derive(Clone)] -pub struct MockRuntimeApi { - data: Arc>, +pub(crate) struct MockRuntimeApi { + pub(crate) data: Arc>, } impl MockRuntimeApi { - pub const INDEXING_PREFIX: &'static [u8] = b"mmr_test"; + pub(crate) const INDEXING_PREFIX: &'static [u8] = b"mmr_test"; } -pub struct MmrBlock { - block: Block, - leaf_idx: Option, - leaf_data: Vec, +#[derive(Clone, Debug)] +pub(crate) struct MmrBlock { + pub(crate) block: Block, + pub(crate) leaf_idx: Option, + pub(crate) leaf_data: Vec, } #[derive(Clone, Copy)] @@ -90,7 +87,7 @@ impl MmrBlock { OffchainKeyType::Temp => NodesUtils::node_temp_offchain_key::
( MockRuntimeApi::INDEXING_PREFIX, node, - *self.block.header.parent_hash(), + self.parent_hash(), ), OffchainKeyType::Canon => NodesUtils::node_canon_offchain_key(MockRuntimeApi::INDEXING_PREFIX, node), @@ -98,14 +95,14 @@ impl MmrBlock { } } -pub struct MockClient { - client: Mutex>, - backend: Arc, - runtime_api_params: Arc>, +pub(crate) struct MockClient { + pub(crate) client: Mutex>, + pub(crate) backend: Arc, + pub(crate) runtime_api_params: Arc>, } impl MockClient { - fn new() -> Self { + pub(crate) fn new() -> Self { let client_builder = TestClientBuilder::new().enable_offchain_indexing_api(); let (client, backend) = client_builder.build_with_backend(); MockClient { @@ -115,7 +112,7 @@ impl MockClient { } } - fn offchain_db(&self) -> OffchainDb<>::OffchainStorage> { + pub(crate) fn offchain_db(&self) -> OffchainDb<>::OffchainStorage> { OffchainDb::new(self.backend.offchain_storage().unwrap()) } @@ -125,7 +122,7 @@ impl MockClient { name: &[u8], maybe_leaf_idx: Option, ) -> MmrBlock { - let mut client = self.client.lock().unwrap(); + let mut client = self.client.lock(); let mut block_builder = client.new_block_at(at, Default::default(), false).unwrap(); // Make sure the block has a different hash than its siblings @@ -157,9 +154,9 @@ impl MockClient { } pub fn finalize_block(&self, hash: Hash, maybe_num_mmr_blocks: Option) { - let client = self.client.lock().unwrap(); + let client = self.client.lock(); if let Some(num_mmr_blocks) = maybe_num_mmr_blocks { - self.runtime_api_params.lock().unwrap().num_blocks = num_mmr_blocks; + self.runtime_api_params.lock().num_blocks = num_mmr_blocks; } client.finalize_block(hash, None).unwrap(); @@ -216,7 +213,7 @@ impl HeaderMetadata for MockClient { type Error = as HeaderMetadata>::Error; fn header_metadata(&self, hash: Hash) -> Result, Self::Error> { - self.client.lock().unwrap().header_metadata(hash) + self.client.lock().header_metadata(hash) } fn insert_header_metadata(&self, _hash: Hash, _header_metadata: CachedHeaderMetadata) { @@ -230,23 +227,23 @@ impl HeaderMetadata for MockClient { impl HeaderBackend for MockClient { fn header(&self, id: BlockId) -> sc_client_api::blockchain::Result> { - self.client.lock().unwrap().header(&id) + self.client.lock().header(&id) } fn info(&self) -> Info { - self.client.lock().unwrap().info() + self.client.lock().info() } fn status(&self, id: BlockId) -> sc_client_api::blockchain::Result { - self.client.lock().unwrap().status(id) + self.client.lock().status(id) } fn number(&self, hash: Hash) -> sc_client_api::blockchain::Result> { - self.client.lock().unwrap().number(hash) + self.client.lock().number(hash) } fn hash(&self, number: BlockNumber) -> sc_client_api::blockchain::Result> { - self.client.lock().unwrap().hash(number) + self.client.lock().hash(number) } } @@ -256,7 +253,7 @@ impl BlockchainEvents for MockClient { } fn finality_notification_stream(&self) -> FinalityNotifications { - self.client.lock().unwrap().finality_notification_stream() + self.client.lock().finality_notification_stream() } fn storage_changes_notification_stream( @@ -283,7 +280,7 @@ sp_api::mock_impl_runtime_apis! { } fn mmr_leaf_count(&self) -> Result { - Ok(self.data.lock().unwrap().num_blocks) + Ok(self.data.lock().num_blocks) } fn generate_proof( @@ -310,13 +307,38 @@ sp_api::mock_impl_runtime_apis! { } } -pub fn run_test_with_mmr_gadget(f: F) +pub(crate) fn run_test_with_mmr_gadget(post_gadget: F) where F: FnOnce(Arc) -> Fut + 'static, Fut: Future, { - let runtime = tokio::runtime::Runtime::new().unwrap(); + run_test_with_mmr_gadget_pre_post(|_| async {}, post_gadget); +} + +pub(crate) fn run_test_with_mmr_gadget_pre_post(pre_gadget: F, post_gadget: G) +where + F: FnOnce(Arc) -> RetF + 'static, + G: FnOnce(Arc) -> RetG + 'static, + RetF: Future, + RetG: Future, +{ let client = Arc::new(MockClient::new()); + run_test_with_mmr_gadget_pre_post_using_client(client, pre_gadget, post_gadget) +} + +pub(crate) fn run_test_with_mmr_gadget_pre_post_using_client( + client: Arc, + pre_gadget: F, + post_gadget: G, +) where + F: FnOnce(Arc) -> RetF + 'static, + G: FnOnce(Arc) -> RetG + 'static, + RetF: Future, + RetG: Future, +{ + let client_clone = client.clone(); + let runtime = Runtime::new().unwrap(); + runtime.block_on(async move { pre_gadget(client_clone).await }); let client_clone = client.clone(); runtime.spawn(async move { @@ -327,6 +349,6 @@ where runtime.block_on(async move { tokio::time::sleep(Duration::from_millis(200)).await; - f(client).await + post_gadget(client).await }); }