Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

staking miner: Check the queue one last time before submission #4819

Merged
merged 37 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
feb0a7b
staking miner: use config for emergency solution
niklasad1 Jan 11, 2022
633ee73
bump jsonrpsee
niklasad1 Jan 11, 2022
4c2dbd6
Merge remote-tracking branch 'origin/master' into na-staking-miner-pl…
niklasad1 Jan 13, 2022
4d9644e
run `monitor_cmd_for` until the connection is closed
niklasad1 Jan 13, 2022
21546b1
new tokio task for submit_and_watch xt
niklasad1 Jan 13, 2022
00ff014
re-use header subscription
niklasad1 Jan 14, 2022
54f3bf1
Merge remote-tracking branch 'origin/master' into HEAD
niklasad1 Jan 24, 2022
ecfe004
update jsonrpsee + simplify code
niklasad1 Jan 24, 2022
5fddbbe
revert polkadot runtime changes
niklasad1 Jan 24, 2022
b760a30
feat: add `ensure_no_better_solution` function
niklasad1 Jan 24, 2022
1d08111
Merge remote-tracking branch 'origin/master' into na-fix-3740
niklasad1 Jan 31, 2022
69a3096
storage access for submissions and indices
niklasad1 Jan 31, 2022
32c877a
check ensure_no_previous_solution before remote ext
niklasad1 Jan 31, 2022
9ea9f99
Merge remote-tracking branch 'origin/master' into na-fix-3740
niklasad1 Feb 4, 2022
e87406d
fix todos
niklasad1 Feb 4, 2022
1a5d18b
Merge remote-tracking branch 'origin/master' into na-fix-3740
niklasad1 Feb 7, 2022
c94aed1
Merge remote-tracking branch 'origin/master' into na-fix-3740
niklasad1 Feb 18, 2022
92de8a0
grumbles: Perbill::from_percent
niklasad1 Feb 21, 2022
756694a
hacky fix
niklasad1 Feb 21, 2022
5488de1
use modified EPM pallet and various fixes
niklasad1 Feb 21, 2022
b3b9a58
diener update --substrate --branch na-epm-pub
niklasad1 Feb 21, 2022
b3740f5
Revert "diener update --substrate --branch na-epm-pub"
niklasad1 Feb 22, 2022
359a469
update substrate
niklasad1 Feb 22, 2022
fabfd6e
tokio spawn on concurrent stuff
niklasad1 Feb 25, 2022
c633e44
cleanup
niklasad1 Feb 25, 2022
2e638d5
Update utils/staking-miner/src/monitor.rs
niklasad1 Feb 25, 2022
5946494
Update utils/staking-miner/src/monitor.rs
niklasad1 Feb 25, 2022
0d8aca7
more cleanup
niklasad1 Feb 25, 2022
be2f3fc
Merge remote-tracking branch 'origin/na-fix-3740' into na-fix-3740
niklasad1 Feb 25, 2022
51c4140
Merge remote-tracking branch 'origin/master' into na-fix-3740
niklasad1 Feb 25, 2022
deeee60
fix nits
niklasad1 Feb 25, 2022
e42aca8
address grumbles
niklasad1 Mar 1, 2022
19fc667
only run batch reqs when signed phase
niklasad1 Mar 1, 2022
75d9698
better help menu for submission strategy CLI
niklasad1 Mar 1, 2022
e3ebf7c
Merge remote-tracking branch 'origin/master' into na-fix-3740
niklasad1 Mar 3, 2022
64c4a33
Merge remote-tracking branch 'origin/master' into na-fix-3740
niklasad1 Mar 4, 2022
35f26ff
add tests for submission strategy
niklasad1 Mar 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion utils/staking-miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
sp-npos-elections = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", branch = "master" }


frame-system = { git = "https://github.com/paritytech/substrate", branch = "master" }
frame-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
frame-election-provider-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
45 changes: 44 additions & 1 deletion utils/staking-miner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ mod prelude;
mod rpc;
mod signer;

use std::str::FromStr;

pub(crate) use prelude::*;
pub(crate) use signer::get_account_info;

Expand All @@ -45,7 +47,7 @@ use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use remote_externalities::{Builder, Mode, OnlineConfig};
use rpc::{RpcApiClient, SharedRpcClient};
use sp_npos_elections::ExtendedBalance;
use sp_runtime::{traits::Block as BlockT, DeserializeOwned};
use sp_runtime::{traits::Block as BlockT, DeserializeOwned, Perbill};
use tracing_subscriber::{fmt, EnvFilter};

use std::{ops::Deref, sync::Arc};
Expand Down Expand Up @@ -244,6 +246,7 @@ enum Error<T: EPM::Config> {
IncorrectPhase,
AlreadySubmitted,
VersionMismatch,
StrategyNotSatisfied,
}

impl<T: EPM::Config> From<sp_core::crypto::SecretStringError> for Error<T> {
Expand Down Expand Up @@ -298,6 +301,42 @@ enum Solvers {
},
}

#[derive(Debug, Copy, Clone)]
enum SubmissionStrategy {
// Only submit if at the time, we are the best.
IfLeading,
// Always submit.
Always,
// Submit if we are leading, or if the solution that's leading is more that the given `Perbill`
// better than us. This helps detect obviously fake solutions and still combat them.
ClaimBetterThan(Perbill),
}

/// Custom `impl` to parse `SubmissionStrategy` from CLI.
///
/// Possible options:
/// * --submission-strategy if-leading: only submit if leading
/// * --submission-strategy always: always submit
/// * --submission-strategy "percent-better <percent>": submit if submission is `n` percent better.
///
impl FromStr for SubmissionStrategy {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let res = if s == "if-leading" {
Self::IfLeading
} else if s == "always" {
Self::Always
} else if s.starts_with("percent-better ") {
kianenigma marked this conversation as resolved.
Show resolved Hide resolved
let percent: u32 = s[15..].parse().map_err(|e| format!("{:?}", e))?;
Self::ClaimBetterThan(Perbill::from_percent(percent))
} else {
return Err(s.into())
};
Ok(res)
}
}

frame_support::parameter_types! {
/// Number of balancing iterations for a solution algorithm. Set based on the [`Solvers`] CLI
/// config.
Expand All @@ -318,6 +357,10 @@ struct MonitorConfig {
/// The solver algorithm to use.
#[clap(subcommand)]
solver: Solvers,

/// Submission strategy to use.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
#[clap(long, parse(try_from_str), default_value = "if-leading")]
submission_strategy: SubmissionStrategy,
}

#[derive(Debug, Clone, Parser)]
Expand Down
158 changes: 125 additions & 33 deletions utils/staking-miner/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

//! The monitor command.

use crate::{prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient};
use crate::{
prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient, SubmissionStrategy,
};
use codec::Encode;
use jsonrpsee::core::Error as RpcError;
use sc_transaction_pool_api::TransactionStatus;
use sp_core::storage::StorageKey;
use sp_runtime::Perbill;
use tokio::sync::mpsc;
use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf};

/// Ensure that now is the signed phase.
async fn ensure_signed_phase<T: EPM::Config, B: BlockT<Hash = Hash>>(
Expand All @@ -43,21 +47,70 @@ async fn ensure_signed_phase<T: EPM::Config, B: BlockT<Hash = Hash>>(
}

/// Ensure that our current `us` have not submitted anything previously.
async fn ensure_no_previous_solution<
T: EPM::Config + frame_system::Config<AccountId = AccountId>,
B: BlockT,
>(
ext: &mut Ext,
async fn ensure_no_previous_solution<T, B>(
rpc: &SharedRpcClient,
at: Hash,
us: &AccountId,
) -> Result<(), Error<T>>
where
T: EPM::Config + frame_system::Config<AccountId = AccountId, Hash = Hash>,
B: BlockT,
{
let indices_key = StorageKey(EPM::SignedSubmissionIndices::<T>::hashed_key().to_vec());

let indices: SubmissionIndicesOf<T> = rpc
.get_storage_and_decode(&indices_key, Some(at))
.await
.map_err::<Error<T>, _>(Into::into)?
.unwrap_or_default();

for (_score, idx) in indices {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda wanted to rewrite this with .any() but I realized async code in closures it still a PITA right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible but then we need convert the futures to a stream or batch them together.

Something like FuturesOrdered or JSON-RPC batch request might make this is little bit nicer to read.

let key = StorageKey(EPM::SignedSubmissionsMap::<T>::hashed_key_for(idx));

if let Some(submission) = rpc
.get_storage_and_decode::<SignedSubmissionOf<T>>(&key, Some(at))
.await
.map_err::<Error<T>, _>(Into::into)?
{
if &submission.who == us {
return Err(Error::AlreadySubmitted)
}
}
}

Ok(())
}

/// Reads all current solutions and checks the scores according to the `SubmissionStrategy`.
async fn ensure_no_better_solution<T: EPM::Config, B: BlockT>(
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
rpc: &SharedRpcClient,
at: Hash,
score: sp_npos_elections::ElectionScore,
strategy: SubmissionStrategy,
) -> Result<(), Error<T>> {
use EPM::signed::SignedSubmissions;
ext.execute_with(|| {
if <SignedSubmissions<T>>::get().iter().any(|ss| &ss.who == us) {
Err(Error::AlreadySubmitted)
} else {
Ok(())
let epsilon = match strategy {
emostov marked this conversation as resolved.
Show resolved Hide resolved
// don't care about current scores.
SubmissionStrategy::Always => return Ok(()),
SubmissionStrategy::IfLeading => Perbill::zero(),
SubmissionStrategy::ClaimBetterThan(epsilon) => epsilon,
};

let indices_key = StorageKey(EPM::SignedSubmissionIndices::<T>::hashed_key().to_vec());

let indices: SubmissionIndicesOf<T> = rpc
.get_storage_and_decode(&indices_key, Some(at))
.await
.map_err::<Error<T>, _>(Into::into)?
.unwrap_or_default();

// BTreeMap is ordered, take last to get the max score.
if let Some(curr_max_score) = indices.into_iter().last().map(|(s, _)| s) {
emostov marked this conversation as resolved.
Show resolved Hide resolved
if !score.strict_threshold_better(curr_max_score, epsilon) {
return Err(Error::StrategyNotSatisfied)
}
})
}

Ok(())
}

macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
Expand Down Expand Up @@ -131,39 +184,58 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
config: MonitorConfig,
) {

async fn flatten<T>(
emostov marked this conversation as resolved.
Show resolved Hide resolved
handle: tokio::task::JoinHandle<Result<T, StakingMinerError>>
) -> Result<T, StakingMinerError> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => panic!("tokio spawn task failed; kill task: {:?}", err),
}
}

let hash = at.hash();
log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash);

// if the runtime version has changed, terminate.
// block on this because if this fails there is no way to recover from
// that error i.e, upgrade/downgrade required.
if let Err(err) = crate::check_versions::<Runtime>(&rpc).await {
emostov marked this conversation as resolved.
Show resolved Hide resolved
let _ = tx.send(err.into());
return;
}

// we prefer doing this check before fetching anything into a remote-ext.
if ensure_signed_phase::<Runtime, Block>(&rpc, hash).await.is_err() {
log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all.");
return;
}
let rpc1 = rpc.clone();
let rpc2 = rpc.clone();
let rpc3 = rpc.clone();
let account = signer.account.clone();

let signed_phase_fut = tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we do this here to prevent heavy network usage from create_election_ext and then again right before submitting to make sure its not pointless to submit?

I think its fine for now, but in the future I think we should look into only doing it right before submitting. Even if it ends up with a lot of wasted network usage it should be fine as long as we are not executing this function righter before the signed phase, since that is when we want to be really fast to get the solution in. But if there fear is that this will be executing at the end of the sign phase I think its fine because we will have awhile until the next signed phase, so the extra network usage won't slow itself down. (Assuming its just traffic to a node on the local network)

Copy link
Member Author

@niklasad1 niklasad1 Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we do this here to prevent heavy network usage from create_election_ext and then again right before submitting to make sure its not pointless to submit?

Yeah

Even if it ends up with a lot of wasted network usage it should be fine as long as we are not executing this function righter before the signed phase, since that is when we want to be really fast to get the solution in.

I'm just afraid that it will flood the JSON-RPC client and it can't keep up with all pending calls because fetching the entire pallet data is quite large (even if the future is dropped the client has to process to responses to check whether it's a valid call). This will occur on every block if we try to do everything in parallel.

Thus, I think we could try to make a subscription to the Phase and only spawn these tasks if it's Phase::Signed then
I'd be okay in spawning the huge calls in parallel but we could try to benchmark this just to be on the safe side. I could be wrong :)

ensure_signed_phase::<Runtime, Block>(&rpc1, hash).await
});

let no_prev_sol_fut = tokio::spawn(async move {
ensure_no_previous_solution::<Runtime, Block>(&rpc2, hash, &account).await
});

let ext_fut = tokio::spawn(async move {
crate::create_election_ext::<Runtime, Block>(rpc3, Some(hash), vec![]).await
});

// Run the calls in parallel and return once all has completed or any failed.
let res = tokio::try_join!(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//cc @emostov now this these calls are performed in parallel however the code is a bit nasty with the clones

Copy link
Member Author

@niklasad1 niklasad1 Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to revert the create_election_ext because it increases network usage significantly on average even if the we drop the requests the server will answer it anyway.

In addition we only need to fetch these in signed phase anyway still no performance issues what I'm aware of anyway.

flatten(signed_phase_fut),
flatten(no_prev_sol_fut),
flatten(ext_fut),
);

// grab an externalities without staking, just the election snapshot.
let mut ext = match crate::create_election_ext::<Runtime, Block>(
rpc.clone(),
Some(hash),
vec![],
).await {
Ok(ext) => ext,
let mut ext = match res {
Ok((_, _, ext)) => ext,
Err(err) => {
let _ = tx.send(err);
log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err);
return;
}
};

if ensure_no_previous_solution::<Runtime, Block>(&mut ext, &signer.account).await.is_err() {
log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping.");
return;
}

// mine a solution, and run feasibility check on it as well.
let (raw_solution, witness) = match crate::mine_with::<Runtime>(&config.solver, &mut ext, true) {
Ok(r) => r,
Expand All @@ -173,7 +245,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
}
};

log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score);
let score = raw_solution.score;
log::info!(target: LOG_TARGET, "mined solution with {:?}", score);

let nonce = match crate::get_account_info::<Runtime>(&rpc, &signer.account, Some(hash)).await {
Ok(maybe_account) => {
Expand All @@ -200,6 +273,25 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
let extrinsic = ext.execute_with(|| create_uxt(raw_solution, witness, signer.clone(), nonce, tip, era));
let bytes = sp_core::Bytes(extrinsic.encode());

let rpc1 = rpc.clone();
let rpc2 = rpc.clone();

let ensure_no_better_fut = tokio::spawn(async move {
ensure_no_better_solution::<Runtime, Block>(&rpc1, hash, score, config.submission_strategy).await
});

let ensure_signed_phase_fut = tokio::spawn(async move {
ensure_signed_phase::<Runtime, Block>(&rpc2, hash).await
});

// Run the calls in parallel and return once all has completed or any failed.
if tokio::try_join!(
flatten(ensure_no_better_fut),
flatten(ensure_signed_phase_fut),
).is_err() {
return;
}

let mut tx_subscription = match rpc.watch_extrinsic(&bytes).await {
Ok(sub) => sub,
Err(RpcError::RestartNeeded(e)) => {
Expand Down