Skip to content

Commit

Permalink
feat(dre): Include node health when resizing or creating a subnet (#801)
Browse files Browse the repository at this point in the history
Co-authored-by: sa-github-api <[email protected]>
  • Loading branch information
sasa-tomic and sa-github-api authored Aug 27, 2024
1 parent 3e1445d commit 3c91b71
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 143 deletions.
1 change: 1 addition & 0 deletions rs/cli/src/commands/subnet/resize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl ExecutableCommand for Resize {
include: self.include.clone().into(),
},
self.motivation.clone(),
&runner.health_of_nodes().await?,
)
.await
}
Expand Down
105 changes: 39 additions & 66 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;

use decentralization::nakamoto::NakamotoScore;
use decentralization::network::AvailableNodesQuerier;
use decentralization::network::DecentralizedSubnet;
use decentralization::network::NetworkHealRequest;
use decentralization::network::SubnetChange;
use decentralization::network::SubnetQuerier;
use decentralization::network::SubnetQueryBy;
use decentralization::network::TopologyManager;
use decentralization::network::{generate_added_node_description, generate_removed_nodes_description};
use decentralization::subnets::NodesRemover;
use decentralization::SubnetChangeResponse;
use futures::TryFutureExt;
Expand Down Expand Up @@ -109,17 +108,27 @@ impl Runner {
Ok(())
}

pub async fn subnet_resize(&self, request: ic_management_types::requests::SubnetResizeRequest, motivation: String) -> anyhow::Result<()> {
pub async fn health_of_nodes(&self) -> anyhow::Result<BTreeMap<PrincipalId, HealthStatus>> {
let health_client = health::HealthClient::new(self.network.clone());
health_client.nodes().await
}

pub async fn subnet_resize(
&self,
request: ic_management_types::requests::SubnetResizeRequest,
motivation: String,
health_of_nodes: &BTreeMap<PrincipalId, HealthStatus>,
) -> anyhow::Result<()> {
let change = self
.registry
.modify_subnet_nodes(SubnetQueryBy::SubnetId(request.subnet))
.await?
.excluding_from_available(request.exclude.clone().unwrap_or_default())
.including_from_available(request.only.clone().unwrap_or_default())
.including_from_available(request.include.clone().unwrap_or_default())
.resize(request.add, request.remove, 0)?;
.resize(request.add, request.remove, 0, health_of_nodes)?;

let change = SubnetChangeResponse::from(&change);
let change = SubnetChangeResponse::from(&change).with_health_of_nodes(health_of_nodes.clone());

if self.verbose {
if let Some(run_log) = &change.run_log {
Expand Down Expand Up @@ -163,6 +172,8 @@ impl Runner {
return Ok(());
}

let health_of_nodes = self.health_of_nodes().await?;

let subnet_creation_data = self
.registry
.create_subnet(
Expand All @@ -171,9 +182,10 @@ impl Runner {
request.include.clone().unwrap_or_default(),
request.exclude.clone().unwrap_or_default(),
request.only.clone().unwrap_or_default(),
&health_of_nodes,
)
.await?;
let subnet_creation_data = SubnetChangeResponse::from(&subnet_creation_data);
let subnet_creation_data = SubnetChangeResponse::from(&subnet_creation_data).with_health_of_nodes(health_of_nodes.clone());

if self.verbose {
if let Some(run_log) = &subnet_creation_data.run_log {
Expand Down Expand Up @@ -216,7 +228,9 @@ impl Runner {
if change.added_with_desc.is_empty() && change.removed_with_desc.is_empty() {
return Ok(());
}
self.run_membership_change(change.clone(), replace_proposal_options(&change)?).await

let options = replace_proposal_options(&change)?;
self.run_membership_change(change, options).await
}

pub async fn prepare_versions_to_retire(&self, release_artifact: &Artifact, edit_summary: bool) -> anyhow::Result<(String, Option<Vec<String>>)> {
Expand Down Expand Up @@ -478,18 +492,12 @@ impl Runner {
})
.map(|(id, subnet)| (*id, subnet.clone()))
.collect::<BTreeMap<_, _>>();
let (available_nodes, healths) = try_join(self.registry.available_nodes().map_err(anyhow::Error::from), health_client.nodes()).await?;

// Remove the unhealthy nodes from the list of available nodes
let available_nodes = available_nodes
.into_iter()
.filter(|n| healths.get(&n.id).map_or(false, |h| h == &HealthStatus::Healthy))
.collect::<Vec<_>>();
let (available_nodes, health_of_nodes) =
try_join(self.registry.available_nodes().map_err(anyhow::Error::from), health_client.nodes()).await?;

let subnets_change_response = NetworkHealRequest::new(subnets_without_proposals)
.heal_and_optimize(available_nodes, healths)
.heal_and_optimize(available_nodes, &health_of_nodes)
.await?;
subnets_change_response.iter().for_each(|change| println!("{}", change));

for change in &subnets_change_response {
let _ = self
Expand All @@ -507,48 +515,6 @@ impl Runner {
Ok(())
}

fn recalc_remove_node_with_desc(
&self,
subnet: &DecentralizedSubnet,
healths: &BTreeMap<PrincipalId, HealthStatus>,
remove_nodes: &[decentralization::network::Node],
) -> Vec<(decentralization::network::Node, String)> {
let mut subnet_nodes: HashMap<PrincipalId, decentralization::network::Node> =
HashMap::from_iter(subnet.nodes.iter().map(|n| (n.id, n.clone())));
let mut result = Vec::new();
for node in remove_nodes {
let node_health = healths.get(&node.id).unwrap_or(&HealthStatus::Unknown).to_string().to_lowercase();
let nakamoto_before = NakamotoScore::new_from_nodes(subnet_nodes.values());
subnet_nodes.remove(&node.id);
let nakamoto_after = NakamotoScore::new_from_nodes(subnet_nodes.values());
let nakamoto_diff = nakamoto_after.describe_difference_from(&nakamoto_before).1;

result.push((node.clone(), format!("health: {}, nakamoto impact: {}", node_health, nakamoto_diff)));
}
result
}

fn recalc_add_node_with_desc(
&self,
subnet: &DecentralizedSubnet,
healths: &BTreeMap<PrincipalId, HealthStatus>,
add_nodes: &[decentralization::network::Node],
) -> Vec<(decentralization::network::Node, String)> {
let mut subnet_nodes: HashMap<PrincipalId, decentralization::network::Node> =
HashMap::from_iter(subnet.nodes.iter().map(|n| (n.id, n.clone())));
let mut result = Vec::new();
for node in add_nodes {
let node_health = healths.get(&node.id).unwrap_or(&HealthStatus::Unknown).to_string().to_lowercase();
let nakamoto_before = NakamotoScore::new_from_nodes(subnet_nodes.values());
subnet_nodes.insert(node.id, node.clone());
let nakamoto_after = NakamotoScore::new_from_nodes(subnet_nodes.values());
let nakamoto_diff = nakamoto_after.describe_difference_from(&nakamoto_before).1;

result.push((node.clone(), format!("health: {}, nakamoto impact: {}", node_health, nakamoto_diff)));
}
result
}

pub async fn decentralization_change(
&self,
change: &ChangeSubnetMembershipPayload,
Expand All @@ -566,19 +532,18 @@ impl Runner {
.map_err(|e| anyhow::anyhow!(e))?,
};
let nodes_before = subnet_before.nodes.clone();
let health_client = health::HealthClient::new(self.network.clone());
let healths = health_client.nodes().await?;
let health_of_nodes = self.health_of_nodes().await?;

// Simulate node removal
let removed_nodes = self.registry.get_decentralized_nodes(&change.get_removed_node_ids()).await?;
let removed_nodes_with_desc = self.recalc_remove_node_with_desc(&subnet_before, &healths, &removed_nodes);
let removed_nodes_with_desc = generate_removed_nodes_description(&subnet_before.nodes, &removed_nodes);
let subnet_mid = subnet_before
.without_nodes(removed_nodes_with_desc.clone())
.map_err(|e| anyhow::anyhow!(e))?;

// Now simulate node addition
let added_nodes = self.registry.get_decentralized_nodes(&change.get_added_node_ids()).await?;
let added_nodes_with_desc = self.recalc_add_node_with_desc(&subnet_mid, &healths, &added_nodes);
let added_nodes_with_desc = generate_added_node_description(&subnet_mid.nodes, &added_nodes);

let subnet_after = subnet_mid.with_nodes(added_nodes_with_desc.clone());

Expand All @@ -590,7 +555,10 @@ impl Runner {
removed_nodes_desc: removed_nodes_with_desc.clone(),
..Default::default()
};
println!("{}", SubnetChangeResponse::from(&subnet_change));
println!(
"{}",
SubnetChangeResponse::from(&subnet_change).with_health_of_nodes(health_of_nodes.clone())
);
Ok(())
}

Expand All @@ -606,8 +574,9 @@ impl Runner {
None => change_request,
};

let change = SubnetChangeResponse::from(&change_request.rescue()?);
println!("{}", change);
let health_of_nodes = self.health_of_nodes().await?;

let change = SubnetChangeResponse::from(&change_request.rescue(&health_of_nodes)?).with_health_of_nodes(health_of_nodes);

if change.added_with_desc.is_empty() && change.removed_with_desc.is_empty() {
return Ok(());
Expand Down Expand Up @@ -760,7 +729,11 @@ pub fn replace_proposal_options(change: &SubnetChangeResponse) -> anyhow::Result
Ok(ic_admin::ProposeOptions {
title: format!("Replace {replace_target} in subnet {subnet_id_short}",).into(),
summary: format!("# Replace {replace_target} in subnet {subnet_id_short}",).into(),
motivation: change.motivation.clone(),
motivation: Some(format!(
"{}\n\n```\n{}\n```\n",
change.motivation.as_ref().unwrap_or(&String::new()),
change
)),
})
}

Expand Down
14 changes: 9 additions & 5 deletions rs/cli/src/subnet_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ impl SubnetManager {
to_be_replaced.extend(subnet_unhealthy_without_included);
}

let change = subnet_change_request.optimize(optimize.unwrap_or(0), &to_be_replaced)?;
let health_client = health::HealthClient::new(self.network.clone());
let health_of_nodes = health_client.nodes().await?;

let change = subnet_change_request.optimize(optimize.unwrap_or(0), &to_be_replaced, &health_of_nodes)?;

for (n, _) in change.removed().iter().filter(|(n, _)| !node_ids_unhealthy.contains(&n.id)) {
motivations.push(format!(
Expand All @@ -165,12 +168,13 @@ impl SubnetManager {
}

let motivation = format!(
"\n{}\n\nNOTE: The information below is provided for your convenience. Please independently verify the decentralization changes rather than relying solely on this summary.\nCode for calculating replacements is at https://github.com/dfinity/dre/blob/79066127f58c852eaf4adda11610e815a426878c/rs/decentralization/src/network.rs#L912\n\n```\n{}\n```\n",
motivations.iter().map(|s| format!(" - {}", s)).collect::<Vec<String>>().join("\n"),
change
"\n{}\n\nNOTE: The information below is provided for your convenience. Please independently verify the decentralization changes rather than relying solely on this summary.\nCode for calculating replacements is at https://github.com/dfinity/dre/blob/79066127f58c852eaf4adda11610e815a426878c/rs/decentralization/src/network.rs#L912",
motivations.iter().map(|s| format!(" - {}", s)).collect::<Vec<String>>().join("\n")
);

let change = SubnetChangeResponse::from(&change).with_motivation(motivation);
let change = SubnetChangeResponse::from(&change)
.with_health_of_nodes(health_of_nodes)
.with_motivation(motivation);

Ok(change)
}
Expand Down
21 changes: 18 additions & 3 deletions rs/decentralization/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};

use ic_base_types::PrincipalId;
use ic_management_types::NodeFeature;
use ic_management_types::{HealthStatus, NodeFeature};
use serde::{self, Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize, Default)]
Expand All @@ -15,6 +15,7 @@ pub struct SubnetChangeResponse {
pub removed_with_desc: Vec<(PrincipalId, String)>,
#[serde(skip_serializing_if = "Option::is_none")]
pub subnet_id: Option<PrincipalId>,
pub health_of_nodes: BTreeMap<PrincipalId, HealthStatus>,
pub score_before: nakamoto::NakamotoScore,
pub score_after: nakamoto::NakamotoScore,
pub motivation: Option<String>,
Expand All @@ -33,6 +34,9 @@ impl SubnetChangeResponse {
..self
}
}
pub fn with_health_of_nodes(self, health_of_nodes: BTreeMap<PrincipalId, HealthStatus>) -> Self {
SubnetChangeResponse { health_of_nodes, ..self }
}
}

impl From<&network::SubnetChange> for SubnetChangeResponse {
Expand All @@ -41,6 +45,7 @@ impl From<&network::SubnetChange> for SubnetChangeResponse {
added_with_desc: change.added().iter().map(|n| (n.0.id, n.1.clone())).collect(),
removed_with_desc: change.removed().iter().map(|n| (n.0.id, n.1.clone())).collect(),
subnet_id: if change.id == Default::default() { None } else { Some(change.id) },
health_of_nodes: BTreeMap::new(),
score_before: nakamoto::NakamotoScore::new_from_nodes(&change.old_nodes),
score_after: nakamoto::NakamotoScore::new_from_nodes(&change.new_nodes),
motivation: None,
Expand Down Expand Up @@ -111,11 +116,21 @@ impl Display for SubnetChangeResponse {

writeln!(f, "Nodes removed:")?;
for (id, desc) in &self.removed_with_desc {
writeln!(f, " --> {} [selected based on {}]", id, desc).expect("write failed");
let health = self
.health_of_nodes
.get(id)
.map(|h| h.to_string().to_lowercase())
.unwrap_or("unknown".to_string());
writeln!(f, " --> {} [health: {}, impact on decentralization: {}]", id, health, desc).expect("write failed");
}
writeln!(f, "\nNodes added:")?;
for (id, desc) in &self.added_with_desc {
writeln!(f, " ++> {} [selected based on {}]", id, desc).expect("write failed");
let health = self
.health_of_nodes
.get(id)
.map(|h| h.to_string().to_lowercase())
.unwrap_or("unknown".to_string());
writeln!(f, " ++> {} [health: {}, impact on decentralization: {}]", id, health, desc).expect("write failed");
}

let rows = self.feature_diff.values().map(|diff| diff.len()).max().unwrap_or(0);
Expand Down
Loading

0 comments on commit 3c91b71

Please sign in to comment.