Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
staking miner: Check the queue one last time before submission (#4819)
Browse files Browse the repository at this point in the history
* staking miner: use config for emergency solution

Fixes #4678

* bump jsonrpsee

* run `monitor_cmd_for` until the connection is closed

* new tokio task for submit_and_watch xt

* re-use header subscription

* update jsonrpsee + simplify code

* revert polkadot runtime changes

* feat: add `ensure_no_better_solution` function

* storage access for submissions and indices

* check ensure_no_previous_solution before remote ext

* fix todos

* grumbles: Perbill::from_percent

* hacky fix

* use modified EPM pallet and various fixes

* diener update --substrate --branch na-epm-pub

* Revert "diener update --substrate --branch na-epm-pub"

This reverts commit b3b9a58.

* update substrate

* tokio spawn on concurrent stuff

* cleanup

* Update utils/staking-miner/src/monitor.rs

* Update utils/staking-miner/src/monitor.rs

* more cleanup

* fix nits

* address grumbles

* only run batch reqs when signed phase

* better help menu for submission strategy CLI

* add tests for submission strategy
  • Loading branch information
niklasad1 authored Mar 4, 2022
1 parent dbfa5bd commit 875dec1
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 33 deletions.
1 change: 0 additions & 1 deletion utils/staking-miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
sp-npos-elections = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", branch = "master" }


frame-system = { git = "https://github.com/paritytech/substrate", branch = "master" }
frame-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
frame-election-provider-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
72 changes: 70 additions & 2 deletions utils/staking-miner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ mod prelude;
mod rpc;
mod signer;

use std::str::FromStr;

pub(crate) use prelude::*;
pub(crate) use signer::get_account_info;

Expand All @@ -45,7 +47,7 @@ use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use remote_externalities::{Builder, Mode, OnlineConfig};
use rpc::{RpcApiClient, SharedRpcClient};
use sp_npos_elections::ExtendedBalance;
use sp_runtime::{traits::Block as BlockT, DeserializeOwned};
use sp_runtime::{traits::Block as BlockT, DeserializeOwned, Perbill};
use tracing_subscriber::{fmt, EnvFilter};

use std::{ops::Deref, sync::Arc};
Expand Down Expand Up @@ -243,6 +245,7 @@ enum Error<T: EPM::Config> {
IncorrectPhase,
AlreadySubmitted,
VersionMismatch,
StrategyNotSatisfied,
}

impl<T: EPM::Config> From<sp_core::crypto::SecretStringError> for Error<T> {
Expand Down Expand Up @@ -299,6 +302,46 @@ enum Solver {
},
}

/// Submission strategy to use.
#[derive(Debug, Copy, Clone)]
#[cfg_attr(test, derive(PartialEq))]
enum SubmissionStrategy {
// Only submit if at the time, we are the best.
IfLeading,
// Always submit.
Always,
// Submit if we are leading, or if the solution that's leading is more that the given `Perbill`
// better than us. This helps detect obviously fake solutions and still combat them.
ClaimBetterThan(Perbill),
}

/// Custom `impl` to parse `SubmissionStrategy` from CLI.
///
/// Possible options:
/// * --submission-strategy if-leading: only submit if leading
/// * --submission-strategy always: always submit
/// * --submission-strategy "percent-better <percent>": submit if submission is `n` percent better.
///
impl FromStr for SubmissionStrategy {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.trim();

let res = if s == "if-leading" {
Self::IfLeading
} else if s == "always" {
Self::Always
} else if s.starts_with("percent-better ") {
let percent: u32 = s[15..].parse().map_err(|e| format!("{:?}", e))?;
Self::ClaimBetterThan(Perbill::from_percent(percent))
} else {
return Err(s.into())
};
Ok(res)
}
}

frame_support::parameter_types! {
/// Number of balancing iterations for a solution algorithm. Set based on the [`Solvers`] CLI
/// config.
Expand All @@ -320,6 +363,18 @@ struct MonitorConfig {
/// The solver algorithm to use.
#[clap(subcommand)]
solver: Solver,

/// Submission strategy to use.
///
/// Possible options:
///
/// `--submission-strategy if-leading`: only submit if leading.
///
/// `--submission-strategy always`: always submit.
///
/// `--submission-strategy "percent-better <percent>"`: submit if the submission is `n` percent better.
#[clap(long, parse(try_from_str), default_value = "if-leading")]
submission_strategy: SubmissionStrategy,
}

#[derive(Debug, Clone, Parser)]
Expand Down Expand Up @@ -665,7 +720,8 @@ mod tests {
seed_or_path: "//Alice".to_string(),
command: Command::Monitor(MonitorConfig {
listen: "head".to_string(),
solver: Solver::SeqPhragmen { iterations: 10 }
solver: Solver::SeqPhragmen { iterations: 10 },
submission_strategy: SubmissionStrategy::IfLeading,
}),
}
);
Expand Down Expand Up @@ -727,4 +783,16 @@ mod tests {
}
);
}

#[test]
fn submission_strategy_from_str_works() {
use std::str::FromStr;

assert_eq!(SubmissionStrategy::from_str("if-leading"), Ok(SubmissionStrategy::IfLeading));
assert_eq!(SubmissionStrategy::from_str("always"), Ok(SubmissionStrategy::Always));
assert_eq!(
SubmissionStrategy::from_str(" percent-better 99 "),
Ok(SubmissionStrategy::ClaimBetterThan(Perbill::from_percent(99)))
);
}
}
146 changes: 116 additions & 30 deletions utils/staking-miner/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

//! The monitor command.
use crate::{prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient};
use crate::{
prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient, SubmissionStrategy,
};
use codec::Encode;
use jsonrpsee::core::Error as RpcError;
use sc_transaction_pool_api::TransactionStatus;
use sp_core::storage::StorageKey;
use sp_runtime::Perbill;
use tokio::sync::mpsc;
use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf};

/// Ensure that now is the signed phase.
async fn ensure_signed_phase<T: EPM::Config, B: BlockT<Hash = Hash>>(
Expand All @@ -43,21 +47,70 @@ async fn ensure_signed_phase<T: EPM::Config, B: BlockT<Hash = Hash>>(
}

/// Ensure that our current `us` have not submitted anything previously.
async fn ensure_no_previous_solution<
T: EPM::Config + frame_system::Config<AccountId = AccountId>,
B: BlockT,
>(
ext: &mut Ext,
async fn ensure_no_previous_solution<T, B>(
rpc: &SharedRpcClient,
at: Hash,
us: &AccountId,
) -> Result<(), Error<T>>
where
T: EPM::Config + frame_system::Config<AccountId = AccountId, Hash = Hash>,
B: BlockT,
{
let indices_key = StorageKey(EPM::SignedSubmissionIndices::<T>::hashed_key().to_vec());

let indices: SubmissionIndicesOf<T> = rpc
.get_storage_and_decode(&indices_key, Some(at))
.await
.map_err::<Error<T>, _>(Into::into)?
.unwrap_or_default();

for (_score, idx) in indices {
let key = StorageKey(EPM::SignedSubmissionsMap::<T>::hashed_key_for(idx));

if let Some(submission) = rpc
.get_storage_and_decode::<SignedSubmissionOf<T>>(&key, Some(at))
.await
.map_err::<Error<T>, _>(Into::into)?
{
if &submission.who == us {
return Err(Error::AlreadySubmitted)
}
}
}

Ok(())
}

/// Reads all current solutions and checks the scores according to the `SubmissionStrategy`.
async fn ensure_no_better_solution<T: EPM::Config, B: BlockT>(
rpc: &SharedRpcClient,
at: Hash,
score: sp_npos_elections::ElectionScore,
strategy: SubmissionStrategy,
) -> Result<(), Error<T>> {
use EPM::signed::SignedSubmissions;
ext.execute_with(|| {
if <SignedSubmissions<T>>::get().iter().any(|ss| &ss.who == us) {
Err(Error::AlreadySubmitted)
} else {
Ok(())
let epsilon = match strategy {
// don't care about current scores.
SubmissionStrategy::Always => return Ok(()),
SubmissionStrategy::IfLeading => Perbill::zero(),
SubmissionStrategy::ClaimBetterThan(epsilon) => epsilon,
};

let indices_key = StorageKey(EPM::SignedSubmissionIndices::<T>::hashed_key().to_vec());

let indices: SubmissionIndicesOf<T> = rpc
.get_storage_and_decode(&indices_key, Some(at))
.await
.map_err::<Error<T>, _>(Into::into)?
.unwrap_or_default();

// BTreeMap is ordered, take last to get the max score.
if let Some(curr_max_score) = indices.into_iter().last().map(|(s, _)| s) {
if !score.strict_threshold_better(curr_max_score, epsilon) {
return Err(Error::StrategyNotSatisfied)
}
})
}

Ok(())
}

macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
Expand Down Expand Up @@ -131,39 +184,52 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
config: MonitorConfig,
) {

async fn flatten<T>(
handle: tokio::task::JoinHandle<Result<T, StakingMinerError>>
) -> Result<T, StakingMinerError> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => panic!("tokio spawn task failed; kill task: {:?}", err),
}
}

let hash = at.hash();
log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash);

// if the runtime version has changed, terminate.
// block on this because if this fails there is no way to recover from
// that error i.e, upgrade/downgrade required.
if let Err(err) = crate::check_versions::<Runtime>(&rpc).await {
let _ = tx.send(err.into());
return;
}

// we prefer doing this check before fetching anything into a remote-ext.
if ensure_signed_phase::<Runtime, Block>(&rpc, hash).await.is_err() {
log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all.");
let rpc1 = rpc.clone();
let rpc2 = rpc.clone();
let account = signer.account.clone();

let signed_phase_fut = tokio::spawn(async move {
ensure_signed_phase::<Runtime, Block>(&rpc1, hash).await
});

let no_prev_sol_fut = tokio::spawn(async move {
ensure_no_previous_solution::<Runtime, Block>(&rpc2, hash, &account).await
});

// Run the calls in parallel and return once all has completed or any failed.
if let Err(err) = tokio::try_join!(flatten(signed_phase_fut), flatten(no_prev_sol_fut)) {
log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err);
return;
}

// grab an externalities without staking, just the election snapshot.
let mut ext = match crate::create_election_ext::<Runtime, Block>(
rpc.clone(),
Some(hash),
vec![],
).await {
let mut ext = match crate::create_election_ext::<Runtime, Block>(rpc.clone(), Some(hash), vec![]).await {
Ok(ext) => ext,
Err(err) => {
let _ = tx.send(err);
log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err);
return;
}
};

if ensure_no_previous_solution::<Runtime, Block>(&mut ext, &signer.account).await.is_err() {
log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping.");
return;
}

// mine a solution, and run feasibility check on it as well.
let raw_solution = match crate::mine_with::<Runtime>(&config.solver, &mut ext, true) {
Ok(r) => r,
Expand All @@ -173,7 +239,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
}
};

log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score);
let score = raw_solution.score;
log::info!(target: LOG_TARGET, "mined solution with {:?}", score);

let nonce = match crate::get_account_info::<Runtime>(&rpc, &signer.account, Some(hash)).await {
Ok(maybe_account) => {
Expand All @@ -200,6 +267,25 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
let extrinsic = ext.execute_with(|| create_uxt(raw_solution, signer.clone(), nonce, tip, era));
let bytes = sp_core::Bytes(extrinsic.encode());

let rpc1 = rpc.clone();
let rpc2 = rpc.clone();

let ensure_no_better_fut = tokio::spawn(async move {
ensure_no_better_solution::<Runtime, Block>(&rpc1, hash, score, config.submission_strategy).await
});

let ensure_signed_phase_fut = tokio::spawn(async move {
ensure_signed_phase::<Runtime, Block>(&rpc2, hash).await
});

// Run the calls in parallel and return once all has completed or any failed.
if tokio::try_join!(
flatten(ensure_no_better_fut),
flatten(ensure_signed_phase_fut),
).is_err() {
return;
}

let mut tx_subscription = match rpc.watch_extrinsic(&bytes).await {
Ok(sub) => sub,
Err(RpcError::RestartNeeded(e)) => {
Expand Down

0 comments on commit 875dec1

Please sign in to comment.