diff --git a/utils/staking-miner/Cargo.toml b/utils/staking-miner/Cargo.toml index ce936af93052..f0a47424f3a9 100644 --- a/utils/staking-miner/Cargo.toml +++ b/utils/staking-miner/Cargo.toml @@ -25,7 +25,6 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master sp-npos-elections = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", branch = "master" } - frame-system = { git = "https://github.com/paritytech/substrate", branch = "master" } frame-support = { git = "https://github.com/paritytech/substrate", branch = "master" } frame-election-provider-support = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/utils/staking-miner/src/main.rs b/utils/staking-miner/src/main.rs index f4bdff73aea3..b63ce3eb168a 100644 --- a/utils/staking-miner/src/main.rs +++ b/utils/staking-miner/src/main.rs @@ -35,6 +35,8 @@ mod prelude; mod rpc; mod signer; +use std::str::FromStr; + pub(crate) use prelude::*; pub(crate) use signer::get_account_info; @@ -45,7 +47,7 @@ use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; use remote_externalities::{Builder, Mode, OnlineConfig}; use rpc::{RpcApiClient, SharedRpcClient}; use sp_npos_elections::ExtendedBalance; -use sp_runtime::{traits::Block as BlockT, DeserializeOwned}; +use sp_runtime::{traits::Block as BlockT, DeserializeOwned, Perbill}; use tracing_subscriber::{fmt, EnvFilter}; use std::{ops::Deref, sync::Arc}; @@ -243,6 +245,7 @@ enum Error { IncorrectPhase, AlreadySubmitted, VersionMismatch, + StrategyNotSatisfied, } impl From for Error { @@ -299,6 +302,46 @@ enum Solver { }, } +/// Submission strategy to use. +#[derive(Debug, Copy, Clone)] +#[cfg_attr(test, derive(PartialEq))] +enum SubmissionStrategy { + // Only submit if at the time, we are the best. + IfLeading, + // Always submit. + Always, + // Submit if we are leading, or if the solution that's leading is more that the given `Perbill` + // better than us. This helps detect obviously fake solutions and still combat them. + ClaimBetterThan(Perbill), +} + +/// Custom `impl` to parse `SubmissionStrategy` from CLI. +/// +/// Possible options: +/// * --submission-strategy if-leading: only submit if leading +/// * --submission-strategy always: always submit +/// * --submission-strategy "percent-better ": submit if submission is `n` percent better. +/// +impl FromStr for SubmissionStrategy { + type Err = String; + + fn from_str(s: &str) -> Result { + let s = s.trim(); + + let res = if s == "if-leading" { + Self::IfLeading + } else if s == "always" { + Self::Always + } else if s.starts_with("percent-better ") { + let percent: u32 = s[15..].parse().map_err(|e| format!("{:?}", e))?; + Self::ClaimBetterThan(Perbill::from_percent(percent)) + } else { + return Err(s.into()) + }; + Ok(res) + } +} + frame_support::parameter_types! { /// Number of balancing iterations for a solution algorithm. Set based on the [`Solvers`] CLI /// config. @@ -320,6 +363,18 @@ struct MonitorConfig { /// The solver algorithm to use. #[clap(subcommand)] solver: Solver, + + /// Submission strategy to use. + /// + /// Possible options: + /// + /// `--submission-strategy if-leading`: only submit if leading. + /// + /// `--submission-strategy always`: always submit. + /// + /// `--submission-strategy "percent-better "`: submit if the submission is `n` percent better. + #[clap(long, parse(try_from_str), default_value = "if-leading")] + submission_strategy: SubmissionStrategy, } #[derive(Debug, Clone, Parser)] @@ -665,7 +720,8 @@ mod tests { seed_or_path: "//Alice".to_string(), command: Command::Monitor(MonitorConfig { listen: "head".to_string(), - solver: Solver::SeqPhragmen { iterations: 10 } + solver: Solver::SeqPhragmen { iterations: 10 }, + submission_strategy: SubmissionStrategy::IfLeading, }), } ); @@ -727,4 +783,16 @@ mod tests { } ); } + + #[test] + fn submission_strategy_from_str_works() { + use std::str::FromStr; + + assert_eq!(SubmissionStrategy::from_str("if-leading"), Ok(SubmissionStrategy::IfLeading)); + assert_eq!(SubmissionStrategy::from_str("always"), Ok(SubmissionStrategy::Always)); + assert_eq!( + SubmissionStrategy::from_str(" percent-better 99 "), + Ok(SubmissionStrategy::ClaimBetterThan(Perbill::from_percent(99))) + ); + } } diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index 0ad314053875..ffe4e0daf459 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -16,12 +16,16 @@ //! The monitor command. -use crate::{prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient}; +use crate::{ + prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient, SubmissionStrategy, +}; use codec::Encode; use jsonrpsee::core::Error as RpcError; use sc_transaction_pool_api::TransactionStatus; use sp_core::storage::StorageKey; +use sp_runtime::Perbill; use tokio::sync::mpsc; +use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf}; /// Ensure that now is the signed phase. async fn ensure_signed_phase>( @@ -43,21 +47,70 @@ async fn ensure_signed_phase>( } /// Ensure that our current `us` have not submitted anything previously. -async fn ensure_no_previous_solution< - T: EPM::Config + frame_system::Config, - B: BlockT, ->( - ext: &mut Ext, +async fn ensure_no_previous_solution( + rpc: &SharedRpcClient, + at: Hash, us: &AccountId, +) -> Result<(), Error> +where + T: EPM::Config + frame_system::Config, + B: BlockT, +{ + let indices_key = StorageKey(EPM::SignedSubmissionIndices::::hashed_key().to_vec()); + + let indices: SubmissionIndicesOf = rpc + .get_storage_and_decode(&indices_key, Some(at)) + .await + .map_err::, _>(Into::into)? + .unwrap_or_default(); + + for (_score, idx) in indices { + let key = StorageKey(EPM::SignedSubmissionsMap::::hashed_key_for(idx)); + + if let Some(submission) = rpc + .get_storage_and_decode::>(&key, Some(at)) + .await + .map_err::, _>(Into::into)? + { + if &submission.who == us { + return Err(Error::AlreadySubmitted) + } + } + } + + Ok(()) +} + +/// Reads all current solutions and checks the scores according to the `SubmissionStrategy`. +async fn ensure_no_better_solution( + rpc: &SharedRpcClient, + at: Hash, + score: sp_npos_elections::ElectionScore, + strategy: SubmissionStrategy, ) -> Result<(), Error> { - use EPM::signed::SignedSubmissions; - ext.execute_with(|| { - if >::get().iter().any(|ss| &ss.who == us) { - Err(Error::AlreadySubmitted) - } else { - Ok(()) + let epsilon = match strategy { + // don't care about current scores. + SubmissionStrategy::Always => return Ok(()), + SubmissionStrategy::IfLeading => Perbill::zero(), + SubmissionStrategy::ClaimBetterThan(epsilon) => epsilon, + }; + + let indices_key = StorageKey(EPM::SignedSubmissionIndices::::hashed_key().to_vec()); + + let indices: SubmissionIndicesOf = rpc + .get_storage_and_decode(&indices_key, Some(at)) + .await + .map_err::, _>(Into::into)? + .unwrap_or_default(); + + // BTreeMap is ordered, take last to get the max score. + if let Some(curr_max_score) = indices.into_iter().last().map(|(s, _)| s) { + if !score.strict_threshold_better(curr_max_score, epsilon) { + return Err(Error::StrategyNotSatisfied) } - }) + } + + Ok(()) } macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { @@ -131,39 +184,52 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { config: MonitorConfig, ) { + async fn flatten( + handle: tokio::task::JoinHandle> + ) -> Result { + match handle.await { + Ok(Ok(result)) => Ok(result), + Ok(Err(err)) => Err(err), + Err(err) => panic!("tokio spawn task failed; kill task: {:?}", err), + } + } + let hash = at.hash(); log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash); - // if the runtime version has changed, terminate. + // block on this because if this fails there is no way to recover from + // that error i.e, upgrade/downgrade required. if let Err(err) = crate::check_versions::(&rpc).await { let _ = tx.send(err.into()); return; } - // we prefer doing this check before fetching anything into a remote-ext. - if ensure_signed_phase::(&rpc, hash).await.is_err() { - log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all."); + let rpc1 = rpc.clone(); + let rpc2 = rpc.clone(); + let account = signer.account.clone(); + + let signed_phase_fut = tokio::spawn(async move { + ensure_signed_phase::(&rpc1, hash).await + }); + + let no_prev_sol_fut = tokio::spawn(async move { + ensure_no_previous_solution::(&rpc2, hash, &account).await + }); + + // Run the calls in parallel and return once all has completed or any failed. + if let Err(err) = tokio::try_join!(flatten(signed_phase_fut), flatten(no_prev_sol_fut)) { + log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err); return; } - // grab an externalities without staking, just the election snapshot. - let mut ext = match crate::create_election_ext::( - rpc.clone(), - Some(hash), - vec![], - ).await { + let mut ext = match crate::create_election_ext::(rpc.clone(), Some(hash), vec![]).await { Ok(ext) => ext, Err(err) => { - let _ = tx.send(err); + log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err); return; } }; - if ensure_no_previous_solution::(&mut ext, &signer.account).await.is_err() { - log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping."); - return; - } - // mine a solution, and run feasibility check on it as well. let raw_solution = match crate::mine_with::(&config.solver, &mut ext, true) { Ok(r) => r, @@ -173,7 +239,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { } }; - log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score); + let score = raw_solution.score; + log::info!(target: LOG_TARGET, "mined solution with {:?}", score); let nonce = match crate::get_account_info::(&rpc, &signer.account, Some(hash)).await { Ok(maybe_account) => { @@ -200,6 +267,25 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let extrinsic = ext.execute_with(|| create_uxt(raw_solution, signer.clone(), nonce, tip, era)); let bytes = sp_core::Bytes(extrinsic.encode()); + let rpc1 = rpc.clone(); + let rpc2 = rpc.clone(); + + let ensure_no_better_fut = tokio::spawn(async move { + ensure_no_better_solution::(&rpc1, hash, score, config.submission_strategy).await + }); + + let ensure_signed_phase_fut = tokio::spawn(async move { + ensure_signed_phase::(&rpc2, hash).await + }); + + // Run the calls in parallel and return once all has completed or any failed. + if tokio::try_join!( + flatten(ensure_no_better_fut), + flatten(ensure_signed_phase_fut), + ).is_err() { + return; + } + let mut tx_subscription = match rpc.watch_extrinsic(&bytes).await { Ok(sub) => sub, Err(RpcError::RestartNeeded(e)) => {