Skip to content

Commit

Permalink
Add node rewards endpoint to Node Metrics canister (#705)
Browse files Browse the repository at this point in the history
  • Loading branch information
pietrodimarco-dfinity authored Aug 13, 2024
1 parent 13fd109 commit 661bdb9
Show file tree
Hide file tree
Showing 7 changed files with 544 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use candid::Principal;
use ic_cdk_macros::*;
use itertools::Itertools;
use std::time::Duration;
use types::{SubnetNodeMetricsArgs, SubnetNodeMetricsResponse};
use std::collections::{self, btree_map::Entry, BTreeMap};
use types::{
DailyNodeMetrics, NodeMetrics, NodeMetricsStoredKey, NodeRewardsArgs, NodeRewardsResponse, SubnetNodeMetricsArgs, SubnetNodeMetricsResponse,
};
mod metrics_manager;
mod rewards_manager;
mod stable_memory;
pub mod types;

Expand All @@ -21,8 +25,11 @@ async fn update_metrics_task() {
}

fn setup_timers() {
ic_cdk_timers::set_timer(Duration::from_secs(0), || ic_cdk::spawn(update_metrics_task()));
ic_cdk_timers::set_timer_interval(Duration::from_secs(TIMER_INTERVAL_SEC), || ic_cdk::spawn(update_metrics_task()));
ic_cdk_timers::set_timer(std::time::Duration::from_secs(0), || ic_cdk::spawn(update_metrics_task()));
ic_cdk_timers::set_timer_interval(
std::time::Duration::from_secs(TIMER_INTERVAL_SEC),
|| ic_cdk::spawn(update_metrics_task()),
);
}

#[init]
Expand All @@ -38,24 +45,81 @@ fn post_upgrade() {
#[query]
fn subnet_node_metrics(args: SubnetNodeMetricsArgs) -> Result<Vec<SubnetNodeMetricsResponse>, String> {
let from_ts = args.ts.unwrap_or_default();
let mut subnet_node_metrics: BTreeMap<(u64, Principal), Vec<NodeMetrics>> = BTreeMap::new();

let metrics = stable_memory::get_metrics(from_ts);
let node_metrics: Vec<(NodeMetricsStoredKey, types::NodeMetricsStored)> = stable_memory::get_metrics_range(from_ts, None);

let metrics_flat = metrics
for ((ts, node_id), node_metrics_value) in node_metrics {
if let Some(subnet_id) = args.subnet_id {
if subnet_id != node_metrics_value.subnet_assigned {
continue;
}
}

let result_key = (ts, node_metrics_value.subnet_assigned);
let result_value: NodeMetrics = NodeMetrics {
node_id,
num_blocks_proposed_total: node_metrics_value.num_blocks_proposed_total,
num_blocks_failures_total: node_metrics_value.num_blocks_failures_total,
};

match subnet_node_metrics.entry(result_key) {
Entry::Occupied(mut entry) => {
let v: &mut Vec<NodeMetrics> = entry.get_mut();
v.push(result_value)
}
Entry::Vacant(entry) => {
entry.insert(vec![result_value]);
}
}
}

let result = subnet_node_metrics
.into_iter()
.flat_map(|(ts, subnets)| {
subnets.into_iter().map(move |subnet_node_metrics| SubnetNodeMetricsResponse {
ts,
subnet_id: subnet_node_metrics.subnet_id,
node_metrics: subnet_node_metrics.node_metrics,
})
})
.map(|((ts, subnet_id), node_metrics)| SubnetNodeMetricsResponse { ts, subnet_id, node_metrics })
.collect_vec();

let result = match args.subnet_id {
Some(subnet_id) => metrics_flat.into_iter().filter(|metrics| metrics.subnet_id == subnet_id).collect_vec(),
None => metrics_flat,
};

Ok(result)
}

#[query]
fn node_rewards(args: NodeRewardsArgs) -> Vec<NodeRewardsResponse> {
let period_start = args.from_ts;
let period_end = args.to_ts;
let node_metrics: Vec<(NodeMetricsStoredKey, types::NodeMetricsStored)> = stable_memory::get_metrics_range(period_start, Some(period_end));

let mut daily_metrics = collections::BTreeMap::new();
for ((ts, node_id), node_metrics_value) in node_metrics {
let daily_node_metrics = DailyNodeMetrics::new(
ts,
node_metrics_value.subnet_assigned,
node_metrics_value.num_blocks_proposed,
node_metrics_value.num_blocks_failed,
);

match daily_metrics.entry(node_id) {
Entry::Occupied(mut entry) => {
let v: &mut Vec<DailyNodeMetrics> = entry.get_mut();
v.push(daily_node_metrics)
}
Entry::Vacant(entry) => {
entry.insert(vec![daily_node_metrics]);
}
}
}

daily_metrics
.into_iter()
.map(|(node_id, daily_node_metrics)| {
let rewards_no_penalty = rewards_manager::rewards_no_penalty(&daily_node_metrics);
let rewards_with_penalty = rewards_manager::rewards_with_penalty(&daily_node_metrics);

NodeRewardsResponse {
node_id,
rewards_no_penalty,
rewards_with_penalty,
daily_node_metrics,
}
})
.collect_vec()
}
Original file line number Diff line number Diff line change
@@ -1,76 +1,69 @@
use std::collections::{btree_map::Entry, BTreeMap};
use std::collections::BTreeMap;

use anyhow::Ok;
use dfn_core::api::PrincipalId;
use futures::FutureExt;
use ic_management_canister_types::NodeMetrics as ICManagementNodeMetrics;
use ic_management_canister_types::{NodeMetricsHistoryArgs, NodeMetricsHistoryResponse};
use ic_protobuf::registry::subnet::v1::SubnetListRecord;
use itertools::Itertools;

use crate::types::{NodeMetricsGrouped, NodeMetricsStored, NodeMetricsStoredKey};
use crate::{
stable_memory,
types::{NodeMetrics, PrincipalNodeMetricsHistory, SubnetNodeMetrics, TimestampNanos},
types::{SubnetNodeMetricsHistory, TimestampNanos},
};

impl SubnetNodeMetrics {
pub fn new(subnet_id: PrincipalId, subnet_metrics: Vec<ICManagementNodeMetrics>) -> Self {
let node_metrics = subnet_metrics.into_iter().map(|node_metrics| node_metrics.into()).collect_vec();

Self {
subnet_id: subnet_id.0,
node_metrics,
}
}
}
/// Node metrics storable
///
/// Computes daily proposed/failed blocks from a vector of node metrics
fn node_metrics_storable(
node_id: PrincipalId,
node_metrics_grouped: Vec<NodeMetricsGrouped>,
initial_proposed_total: u64,
initial_failed_total: u64,
) -> Vec<(NodeMetricsStoredKey, NodeMetricsStored)> {
let mut metrics_ordered = node_metrics_grouped;
metrics_ordered.sort_by_key(|(ts, _, _)| *ts);

let principal = node_id.0;
let mut node_metrics_storable = Vec::new();

let mut previous_proposed_total = initial_proposed_total;
let mut previous_failed_total = initial_failed_total;

for (ts, subnet_assigned, metrics) in metrics_ordered {
let key = (ts, principal);
let current_proposed_total = metrics.num_blocks_proposed_total;
let current_failed_total = metrics.num_block_failures_total;

let (daily_proposed, daily_failed) = calculate_daily_metrics(
previous_proposed_total,
previous_failed_total,
metrics.num_blocks_proposed_total,
metrics.num_block_failures_total,
);

let node_metrics_stored = NodeMetricsStored {
subnet_assigned: subnet_assigned.0,
num_blocks_proposed_total: current_proposed_total,
num_blocks_failures_total: current_failed_total,
num_blocks_proposed: daily_proposed,
num_blocks_failed: daily_failed,
};

impl From<ICManagementNodeMetrics> for NodeMetrics {
fn from(node_metrics: ICManagementNodeMetrics) -> Self {
Self {
node_id: node_metrics.node_id.0,
num_block_failures_total: node_metrics.num_block_failures_total,
num_blocks_proposed_total: node_metrics.num_blocks_proposed_total,
}
}
}
node_metrics_storable.push((key, node_metrics_stored));

fn store_results(results: BTreeMap<u64, Vec<SubnetNodeMetrics>>) {
for (timestamp, storable) in results {
stable_memory::insert(timestamp, storable)
previous_proposed_total = current_proposed_total;
previous_failed_total = current_failed_total;
}
}

/// Transform metrics
///
/// Groups the metrics received by timestamp to fit the "storable" format
fn transform_metrics(subnets_metrics: Vec<PrincipalNodeMetricsHistory>) -> BTreeMap<TimestampNanos, Vec<SubnetNodeMetrics>> {
let mut results = BTreeMap::new();

for (subnet, subnet_metrics) in subnets_metrics {
for ts_node_metrics in subnet_metrics {
let ts: TimestampNanos = ts_node_metrics.timestamp_nanos;

let subnet_metrics_storable = SubnetNodeMetrics::new(subnet, ts_node_metrics.node_metrics);

match results.entry(ts) {
Entry::Occupied(mut entry) => {
let v: &mut Vec<SubnetNodeMetrics> = entry.get_mut();
v.push(subnet_metrics_storable)
}
Entry::Vacant(entry) => {
entry.insert(vec![subnet_metrics_storable]);
}
}
}
}
results
node_metrics_storable
}

/// Fetch metrics
///
/// Calls to the node_metrics_history endpoint of the management canister for all the subnets
/// to get updated metrics since refresh_ts.
async fn fetch_metrics(subnets: Vec<PrincipalId>, refresh_ts: TimestampNanos) -> anyhow::Result<Vec<PrincipalNodeMetricsHistory>> {
async fn fetch_metrics(subnets: Vec<PrincipalId>, refresh_ts: TimestampNanos) -> anyhow::Result<Vec<SubnetNodeMetricsHistory>> {
let mut subnets_node_metrics = Vec::new();

for subnet_id in subnets {
Expand Down Expand Up @@ -117,23 +110,62 @@ async fn fetch_subnets() -> anyhow::Result<Vec<PrincipalId>> {
Ok(subnets)
}

// Calculates the daily proposed and failed blocks
fn calculate_daily_metrics(last_proposed_total: u64, last_failed_total: u64, current_proposed_total: u64, current_failed_total: u64) -> (u64, u64) {
if last_failed_total > current_failed_total || last_proposed_total > current_proposed_total {
// This is the case when node gets redeploied
(current_proposed_total, current_failed_total)
} else {
(current_proposed_total - last_proposed_total, current_failed_total - last_failed_total)
}
}

fn grouped_by_node(subnet_metrics: Vec<(PrincipalId, Vec<NodeMetricsHistoryResponse>)>) -> BTreeMap<PrincipalId, Vec<NodeMetricsGrouped>> {
let mut grouped_by_node: BTreeMap<PrincipalId, Vec<NodeMetricsGrouped>> = BTreeMap::new();

for (subnet_id, history) in subnet_metrics {
for history_response in history {
for metrics in history_response.node_metrics {
grouped_by_node
.entry(metrics.node_id)
.or_default()
.push((history_response.timestamp_nanos, subnet_id, metrics));
}
}
}
grouped_by_node
}

fn store_metrics(node_metrics_storable: Vec<((u64, candid::Principal), NodeMetricsStored)>) {
for (key, node_metrics) in node_metrics_storable {
stable_memory::insert(key, node_metrics)
}
}

pub async fn update_metrics() -> anyhow::Result<()> {
let subnets = fetch_subnets().await?;
let latest_ts = stable_memory::latest_key().unwrap_or_default();
let latest_ts = stable_memory::latest_ts().unwrap_or_default();
let refresh_ts = latest_ts + 1;

ic_cdk::println!(
"Updating node metrics for {} subnets:\nLatest timestamp persisted: {}\nRefreshing metrics from timestamp {}",
"Updating node metrics for {} subnets: Latest timestamp persisted: {} Refreshing metrics from timestamp {}",
subnets.len(),
latest_ts,
refresh_ts
);
let subnet_metrics: Vec<(PrincipalId, Vec<NodeMetricsHistoryResponse>)> = fetch_metrics(subnets, refresh_ts).await?;
let grouped_by_node: BTreeMap<PrincipalId, Vec<NodeMetricsGrouped>> = grouped_by_node(subnet_metrics);

let metrics = fetch_metrics(subnets, refresh_ts).await?;
for (node_id, node_metrics_grouped) in grouped_by_node {
let first_ts = node_metrics_grouped.first().expect("node_metrics empty").0;
let metrics_before = stable_memory::metrics_before_ts(node_id.0, first_ts);

let results = transform_metrics(metrics);
let initial_proposed_total = metrics_before.as_ref().map(|(_, metrics)| metrics.num_blocks_proposed_total).unwrap_or(0);
let initial_failed_total = metrics_before.as_ref().map(|(_, metrics)| metrics.num_blocks_failures_total).unwrap_or(0);

store_results(results);
let node_metrics_storable = node_metrics_storable(node_id, node_metrics_grouped, initial_proposed_total, initial_failed_total);
store_metrics(node_metrics_storable);
}

Ok(())
}
Loading

0 comments on commit 661bdb9

Please sign in to comment.