From fcc513773130acc650987979591cce2215f940c3 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Sun, 5 Sep 2021 16:45:44 +0000 Subject: [PATCH 1/4] pow: add Version for quick-check of metadata state and refactor lock handling --- client/consensus/common/src/block_import.rs | 2 +- client/consensus/pow/src/lib.rs | 18 +- client/consensus/pow/src/worker.rs | 198 +++++++++++++------- 3 files changed, 142 insertions(+), 76 deletions(-) diff --git a/client/consensus/common/src/block_import.rs b/client/consensus/common/src/block_import.rs index 6d411dd9afbf1..d828e54bc7e3e 100644 --- a/client/consensus/common/src/block_import.rs +++ b/client/consensus/common/src/block_import.rs @@ -74,7 +74,7 @@ impl ImportResult { &self, hash: &B::Hash, number: NumberFor, - justification_sync_link: &mut dyn JustificationSyncLink, + justification_sync_link: &dyn JustificationSyncLink, ) where B: BlockT, { diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index 1f5781434ef71..c6ae28a04cb84 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -41,13 +41,12 @@ mod worker; -pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker}; +pub use crate::worker::{MiningBuild, MiningMetadata, MiningHandle}; use crate::worker::UntilImportedOrTimeout; use codec::{Decode, Encode}; use futures::{Future, StreamExt}; use log::*; -use parking_lot::Mutex; use prometheus_endpoint::Registry; use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents}; use sc_consensus::{ @@ -525,7 +524,7 @@ pub fn start_mining_worker( build_time: Duration, can_author_with: CAW, ) -> ( - Arc>::Proof>>>, + MiningHandle>::Proof>, impl Future, ) where @@ -543,12 +542,7 @@ where CAW: CanAuthorWith + Clone + Send + 'static, { let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout); - let worker = Arc::new(Mutex::new(MiningWorker { - build: None, - algorithm: algorithm.clone(), - block_import, - justification_sync_link, - })); + let worker = MiningHandle::new(algorithm.clone(), block_import, justification_sync_link); let worker_ret = worker.clone(); let task = async move { @@ -559,7 +553,7 @@ where if sync_oracle.is_major_syncing() { debug!(target: "pow", "Skipping proposal due to sync."); - worker.lock().on_major_syncing(); + worker.on_major_syncing(); continue } @@ -587,7 +581,7 @@ where continue } - if worker.lock().best_hash() == Some(best_hash) { + if worker.best_hash() == Some(best_hash) { continue } @@ -682,7 +676,7 @@ where proposal, }; - worker.lock().on_build(build); + worker.on_build(build); } }; diff --git a/client/consensus/pow/src/worker.rs b/client/consensus/pow/src/worker.rs index c0ca16ccad3aa..0117228140de2 100644 --- a/client/consensus/pow/src/worker.rs +++ b/client/consensus/pow/src/worker.rs @@ -30,7 +30,8 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT}, DigestItem, }; -use std::{borrow::Cow, collections::HashMap, pin::Pin, time::Duration}; +use parking_lot::Mutex; +use std::{borrow::Cow, sync::Arc, sync::atomic::{AtomicUsize, Ordering}, collections::HashMap, pin::Pin, time::Duration}; use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, POW_ENGINE_ID}; @@ -60,21 +61,26 @@ pub struct MiningBuild< pub proposal: Proposal, Proof>, } +/// Version of the mining worker. +#[derive(Eq, PartialEq, Clone, Copy)] +pub struct Version(usize); + /// Mining worker that exposes structs to query the current mining build and submit mined blocks. -pub struct MiningWorker< +pub struct MiningHandle< Block: BlockT, Algorithm: PowAlgorithm, C: sp_api::ProvideRuntimeApi, L: sc_consensus::JustificationSyncLink, Proof, > { - pub(crate) build: Option>, - pub(crate) algorithm: Algorithm, - pub(crate) block_import: BoxBlockImport>, - pub(crate) justification_sync_link: L, + version: Arc, + algorithm: Arc, + justification_sync_link: Arc, + build: Arc>>>, + block_import: Arc>>>, } -impl MiningWorker +impl MiningHandle where Block: BlockT, C: sp_api::ProvideRuntimeApi, @@ -83,35 +89,65 @@ where L: sc_consensus::JustificationSyncLink, sp_api::TransactionFor: Send + 'static, { - /// Get the current best hash. `None` if the worker has just started or the client is doing - /// major syncing. - pub fn best_hash(&self) -> Option { - self.build.as_ref().map(|b| b.metadata.best_hash) + fn increment_version(&self) { + self.version.fetch_add(1, Ordering::SeqCst); + } + + pub(crate) fn new( + algorithm: Algorithm, + block_import: BoxBlockImport>, + justification_sync_link: L + ) -> Self { + Self { + version: Arc::new(AtomicUsize::new(0)), + algorithm: Arc::new(algorithm), + justification_sync_link: Arc::new(justification_sync_link), + build: Arc::new(Mutex::new(None)), + block_import: Arc::new(Mutex::new(block_import)), + } } - pub(crate) fn on_major_syncing(&mut self) { - self.build = None; + pub(crate) fn on_major_syncing(&self) { + let mut build = self.build.lock(); + *build = None; + self.increment_version(); } - pub(crate) fn on_build(&mut self, build: MiningBuild) { - self.build = Some(build); + pub(crate) fn on_build(&self, value: MiningBuild) { + let mut build = self.build.lock(); + *build = Some(value); + self.increment_version(); + } + + /// Get the version of the mining worker. + /// + /// This returns type `Version` which can only compare equality. If `Version` is unchanged, then + /// it can be certain that `best_hash` and `metadata` does not changed. + pub fn version(&self) -> Version { + Version(self.version.load(Ordering::SeqCst)) + } + + /// Get the current best hash. `None` if the worker has just started or the client is doing + /// major syncing. + pub fn best_hash(&self) -> Option { + self.build.lock().as_ref().map(|b| b.metadata.best_hash) } /// Get a copy of the current mining metadata, if available. pub fn metadata(&self) -> Option> { - self.build.as_ref().map(|b| b.metadata.clone()) + self.build.lock().as_ref().map(|b| b.metadata.clone()) } /// Submit a mined seal. The seal will be validated again. Returns true if the submission is /// successful. pub async fn submit(&mut self, seal: Seal) -> bool { - if let Some(build) = self.build.take() { + if let Some(metadata) = self.metadata() { match self.algorithm.verify( - &BlockId::Hash(build.metadata.best_hash), - &build.metadata.pre_hash, - build.metadata.pre_runtime.as_ref().map(|v| &v[..]), + &BlockId::Hash(metadata.best_hash), + &metadata.pre_hash, + metadata.pre_runtime.as_ref().map(|v| &v[..]), &seal, - build.metadata.difficulty, + metadata.difficulty, ) { Ok(true) => (), Ok(false) => { @@ -130,55 +166,91 @@ where return false }, } + } else { + warn!( + target: "pow", + "Unable to import mined block: metadata does not exist", + ); + return false + } - let seal = DigestItem::Seal(POW_ENGINE_ID, seal); - let (header, body) = build.proposal.block.deconstruct(); - - let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); - import_block.post_digests.push(seal); - import_block.body = Some(body); - import_block.state_action = - StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes)); - - let intermediate = PowIntermediate:: { - difficulty: Some(build.metadata.difficulty), - }; - - import_block - .intermediates - .insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>); - - let header = import_block.post_header(); - match self.block_import.import_block(import_block, HashMap::default()).await { - Ok(res) => { - res.handle_justification( - &header.hash(), - *header.number(), - &mut self.justification_sync_link, - ); - - info!( - target: "pow", - "✅ Successfully mined block on top of: {}", - build.metadata.best_hash - ); - true - }, - Err(err) => { - warn!( - target: "pow", - "Unable to import mined block: {:?}", - err, - ); - false - }, + let build = if let Some(build) = { + let mut build = self.build.lock(); + let value = build.take(); + if value.is_some() { + self.increment_version(); } + value + } { + build } else { warn!( target: "pow", "Unable to import mined block: build does not exist", ); - false + return false + }; + + let seal = DigestItem::Seal(POW_ENGINE_ID, seal); + let (header, body) = build.proposal.block.deconstruct(); + + let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); + import_block.post_digests.push(seal); + import_block.body = Some(body); + import_block.state_action = + StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes)); + + let intermediate = PowIntermediate:: { + difficulty: Some(build.metadata.difficulty), + }; + + import_block + .intermediates + .insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>); + + let header = import_block.post_header(); + let mut block_import = self.block_import.lock(); + + match block_import.import_block(import_block, HashMap::default()).await { + Ok(res) => { + res.handle_justification( + &header.hash(), + *header.number(), + &self.justification_sync_link, + ); + + info!( + target: "pow", + "✅ Successfully mined block on top of: {}", + build.metadata.best_hash + ); + true + }, + Err(err) => { + warn!( + target: "pow", + "Unable to import mined block: {:?}", + err, + ); + false + }, + } + } +} + +impl Clone for MiningHandle where + Block: BlockT, + Algorithm: PowAlgorithm, + C: sp_api::ProvideRuntimeApi, + L: sc_consensus::JustificationSyncLink, +{ + fn clone(&self) -> Self { + Self { + version: self.version.clone(), + algorithm: self.algorithm.clone(), + justification_sync_link: self.justification_sync_link.clone(), + build: self.build.clone(), + block_import: self.block_import.clone(), } } } From f8540cbd35bf219b6f2e9ecef7aa90f5c7ffc2a8 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Sun, 5 Sep 2021 16:51:20 +0000 Subject: [PATCH 2/4] typo: mut self -> self --- client/consensus/pow/src/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/consensus/pow/src/worker.rs b/client/consensus/pow/src/worker.rs index 0117228140de2..2706ea1637cda 100644 --- a/client/consensus/pow/src/worker.rs +++ b/client/consensus/pow/src/worker.rs @@ -140,7 +140,7 @@ where /// Submit a mined seal. The seal will be validated again. Returns true if the submission is /// successful. - pub async fn submit(&mut self, seal: Seal) -> bool { + pub async fn submit(&self, seal: Seal) -> bool { if let Some(metadata) = self.metadata() { match self.algorithm.verify( &BlockId::Hash(metadata.best_hash), From 237c872ad7981b9ff814150bc6c8b531f47d3f90 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Sun, 5 Sep 2021 17:59:23 +0000 Subject: [PATCH 3/4] Run rustfmt --- client/consensus/pow/src/lib.rs | 2 +- client/consensus/pow/src/worker.rs | 18 ++++++++++++++---- client/utils/src/metrics.rs | 1 - 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index c6ae28a04cb84..7b1012888e869 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -41,7 +41,7 @@ mod worker; -pub use crate::worker::{MiningBuild, MiningMetadata, MiningHandle}; +pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata}; use crate::worker::UntilImportedOrTimeout; use codec::{Decode, Encode}; diff --git a/client/consensus/pow/src/worker.rs b/client/consensus/pow/src/worker.rs index 2706ea1637cda..c9011b210a8b2 100644 --- a/client/consensus/pow/src/worker.rs +++ b/client/consensus/pow/src/worker.rs @@ -22,6 +22,7 @@ use futures::{ }; use futures_timer::Delay; use log::*; +use parking_lot::Mutex; use sc_client_api::ImportNotifications; use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges}; use sp_consensus::{BlockOrigin, Proposal}; @@ -30,8 +31,16 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT}, DigestItem, }; -use parking_lot::Mutex; -use std::{borrow::Cow, sync::Arc, sync::atomic::{AtomicUsize, Ordering}, collections::HashMap, pin::Pin, time::Duration}; +use std::{ + borrow::Cow, + collections::HashMap, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, POW_ENGINE_ID}; @@ -96,7 +105,7 @@ where pub(crate) fn new( algorithm: Algorithm, block_import: BoxBlockImport>, - justification_sync_link: L + justification_sync_link: L, ) -> Self { Self { version: Arc::new(AtomicUsize::new(0)), @@ -238,7 +247,8 @@ where } } -impl Clone for MiningHandle where +impl Clone for MiningHandle +where Block: BlockT, Algorithm: PowAlgorithm, C: sp_api::ProvideRuntimeApi, diff --git a/client/utils/src/metrics.rs b/client/utils/src/metrics.rs index 8df8e65962474..85ccce626bc25 100644 --- a/client/utils/src/metrics.rs +++ b/client/utils/src/metrics.rs @@ -16,7 +16,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . - //! Metering primitives and globals use lazy_static::lazy_static; From 60ba48eae65b7e8ed1d3cb2dc56856bd754eb3a6 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Sun, 5 Sep 2021 18:02:05 +0000 Subject: [PATCH 4/4] typo: grammar --- client/consensus/pow/src/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/consensus/pow/src/worker.rs b/client/consensus/pow/src/worker.rs index c9011b210a8b2..3faa18ece3188 100644 --- a/client/consensus/pow/src/worker.rs +++ b/client/consensus/pow/src/worker.rs @@ -131,7 +131,7 @@ where /// Get the version of the mining worker. /// /// This returns type `Version` which can only compare equality. If `Version` is unchanged, then - /// it can be certain that `best_hash` and `metadata` does not changed. + /// it can be certain that `best_hash` and `metadata` were not changed. pub fn version(&self) -> Version { Version(self.version.load(Ordering::SeqCst)) }