diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 15122674..73b8a289 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -183,12 +183,21 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { .build()?; let gcp_service = rt.block_on(async { GcpService::init(&account_id, &storage_options).await })?; + + let mut rpc_client = near_fetch::Client::new(&near_rpc); + if let Some(referer_param) = client_header_referer { + let client_headers = rpc_client.inner_mut().headers_mut(); + client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); + } + tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); + let (indexer_handle, indexer) = indexer::run( &indexer_options, &mpc_contract_id, &account_id, &sign_queue, &gcp_service, + rpc_client.clone(), &rt, )?; @@ -212,13 +221,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { let (sender, receiver) = mpsc::channel(16384); tracing::info!(%my_address, "address detected"); - let mut rpc_client = near_fetch::Client::new(&near_rpc); - if let Some(referer_param) = client_header_referer { - let client_headers = rpc_client.inner_mut().headers_mut(); - client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); - } - tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk); let (protocol, protocol_state) = MpcSignProtocol::init( my_address, diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 9335e4bf..d8f092ce 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -1,6 +1,7 @@ use crate::gcp::error::DatastoreStorageError; use crate::gcp::GcpService; use crate::protocol::{SignQueue, SignRequest}; +use crate::rpc_client; use crate::types::LatestBlockHeight; use crypto_shared::{derive_epsilon, ScalarExt}; use k256::Scalar; @@ -53,6 +54,14 @@ pub struct Options { /// The threshold in seconds to check if the indexer needs to be restarted due to it stalling. #[clap(long, env("MPC_INDEXER_RUNNING_THRESHOLD"), default_value = "300")] pub running_threshold: u64, + + /// The threshold in block height lag to check if the indexer has caught up. + #[clap( + long, + env("MPC_INDEXER_BLOCK_HEIGHT_LAG_THRESHOLD"), + default_value = "50" + )] + pub block_height_lag_threshold: u64, } impl Options { @@ -68,6 +77,8 @@ impl Options { self.behind_threshold.to_string(), "--running-threshold".to_string(), self.running_threshold.to_string(), + "--block-height-lag-threshold".to_string(), + self.block_height_lag_threshold.to_string(), ]; if let Some(s3_url) = self.s3_url { @@ -105,10 +116,16 @@ pub struct Indexer { last_updated_timestamp: Arc>, running_threshold: Duration, behind_threshold: Duration, + block_height_lag_threshold: u64, + rpc_client: near_fetch::Client, } impl Indexer { - fn new(latest_block_height: LatestBlockHeight, options: &Options) -> Self { + fn new( + latest_block_height: LatestBlockHeight, + options: &Options, + rpc_client: near_fetch::Client, + ) -> Self { tracing::info!( "creating new indexer, latest block height: {}", latest_block_height.block_height @@ -118,6 +135,8 @@ impl Indexer { last_updated_timestamp: Arc::new(RwLock::new(Instant::now())), running_threshold: Duration::from_secs(options.running_threshold), behind_threshold: Duration::from_secs(options.behind_threshold), + block_height_lag_threshold: options.block_height_lag_threshold, + rpc_client, } } @@ -126,7 +145,7 @@ impl Indexer { self.latest_block_height.read().await.block_height } - /// Check whether the indexer is on track with the latest block height from the chain. + /// Check whether the indexer block height has been updated recently. pub async fn is_on_track(&self) -> bool { self.last_updated_timestamp.read().await.elapsed() <= self.behind_threshold } @@ -138,7 +157,17 @@ impl Indexer { /// Check whether the indexer is behind with the latest block height from the chain. pub async fn is_behind(&self) -> bool { - self.last_updated_timestamp.read().await.elapsed() > self.behind_threshold + let network_latest_height = rpc_client::fetch_latest_block_height(&self.rpc_client).await; + if let Ok(network_latest_height) = network_latest_height { + self.latest_block_height().await + < network_latest_height - self.block_height_lag_threshold + } else { + false + } + } + + pub async fn is_stable(&self) -> bool { + !self.is_behind().await && self.is_on_track().await } async fn update_block_height( @@ -287,6 +316,7 @@ pub fn run( node_account_id: &AccountId, queue: &Arc>, gcp_service: &crate::gcp::GcpService, + rpc_client: near_fetch::Client, rt: &tokio::runtime::Runtime, ) -> anyhow::Result<(JoinHandle>, Indexer)> { tracing::info!( @@ -311,7 +341,7 @@ pub fn run( } }); - let indexer = Indexer::new(latest_block_height, options); + let indexer = Indexer::new(latest_block_height, options, rpc_client); let context = Context { mpc_contract_id: mpc_contract_id.clone(), node_account_id: node_account_id.clone(), diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 44f1c5a3..075f70cf 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; use cait_sith::protocol::Participant; -use near_primitives::types::BlockHeight; use tokio::sync::RwLock; use url::Url; @@ -162,61 +161,14 @@ impl Pool { self.potential_connections.read().await.clone() } - async fn max_block_height_among_participants(&self) -> BlockHeight { - self.status - .read() - .await - .values() - .filter_map(|state| { - if let StateView::Running { - latest_block_height, - .. - } = state - { - Some(*latest_block_height) - } else { - None - } - }) - .max() - .unwrap_or(0) - } - - pub async fn is_participant_indexer_progressing(&self, participant: &Participant) -> bool { + pub async fn is_participant_stable(&self, participant: &Participant) -> bool { self.status .read() .await .get(participant) .map_or(false, |state| match state { - StateView::Running { - is_indexer_progressing, - .. - } => *is_indexer_progressing, + StateView::Running { is_stable, .. } => *is_stable, _ => false, }) } - - pub async fn is_participant_indexer_caught_up(&self, participant: &Participant) -> bool { - let max_block_height = self.max_block_height_among_participants().await; - - if max_block_height == 0 { - return false; - } - - let my_block_height = self - .status - .read() - .await - .get(participant) - .and_then(|state| match state { - StateView::Running { - latest_block_height, - .. - } => Some(*latest_block_height), - _ => None, - }) - .unwrap_or(0); - - (max_block_height - my_block_height) < 50 - } } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index c35330c3..56b7b9ac 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -53,15 +53,7 @@ impl Mesh { pub async fn stable_participants(&self) -> Participants { let mut stable = Participants::default(); for (participant, info) in self.active_participants().iter() { - if self - .connections - .is_participant_indexer_progressing(participant) - .await - && self - .connections - .is_participant_indexer_caught_up(participant) - .await - { + if self.connections.is_participant_stable(participant).await { stable.insert(participant, info.clone()); } } diff --git a/chain-signatures/node/src/rpc_client.rs b/chain-signatures/node/src/rpc_client.rs index 6917af9b..b0298261 100644 --- a/chain-signatures/node/src/rpc_client.rs +++ b/chain-signatures/node/src/rpc_client.rs @@ -4,6 +4,7 @@ use crate::protocol::ProtocolState; use near_account_id::AccountId; use near_crypto::InMemorySigner; +use near_primitives::types::BlockHeight; use serde_json::json; pub async fn fetch_mpc_contract_state( @@ -99,3 +100,19 @@ pub async fn vote_reshared( Ok(result) } + +pub async fn fetch_latest_block_height( + rpc_client: &near_fetch::Client, +) -> anyhow::Result { + let latest_block_height: BlockHeight = rpc_client + .view_block() + .await + .map_err(|e| { + tracing::warn!(%e, "failed to fetch latest block"); + e + })? + .header + .height; + tracing::debug!(latest_block_height, "latest block height"); + Ok(latest_block_height) +} diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index 8e3f9d50..3e045cc6 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -112,13 +112,13 @@ pub enum StateView { presignature_mine_count: usize, presignature_potential_count: usize, latest_block_height: BlockHeight, - is_indexer_progressing: bool, + is_stable: bool, }, Resharing { old_participants: Vec, new_participants: Vec, latest_block_height: BlockHeight, - is_indexer_progressing: bool, + is_stable: bool, }, Joining { participants: Vec, @@ -131,7 +131,7 @@ pub enum StateView { async fn state(Extension(state): Extension>) -> Result> { tracing::debug!("fetching state"); let latest_block_height = state.indexer.latest_block_height().await; - let is_indexer_progressing = state.indexer.is_on_track().await; + let is_stable = state.indexer.is_stable().await; let protocol_state = state.protocol_state.read().await; match &*protocol_state { @@ -155,7 +155,7 @@ async fn state(Extension(state): Extension>) -> Result { @@ -165,7 +165,7 @@ async fn state(Extension(state): Extension>) -> Result {