Skip to content

Commit

Permalink
feat: Allow direct replacement of a node in subnet (#3377)
Browse files Browse the repository at this point in the history
### Summary
This PR resolves a long-standing issue faced by many node providers,
that a node cannot be replaced if it's currently in a subnet.
However, if the IP address of the node matches, this means that the new
node is a direct replacement for the old node, which also means that the
subnet decentralization is unaffected. Therefore, we now allow a direct
replacement of nodes that are in a subnet. Direct removal of such nodes
is still not allowed though, since that would change the subnet
decentralization.

### Changes
- **Node Addition**: 
- If there is exactly one node with the same IP, prepare node remove
mutations `make_remove_or_replace_node_mutations`.
- Atomically remove and add nodes (change in behavior compared to
before).
  - Adjusts node allowance based on nodes removed and added.

- **Node Removal & Replacement**:
- Implements `do_replace_node_with_another` method for replacing nodes
within a subnet, for testing purposes.
  - Adjusts node operator's node allowance after node mutations.

- **Tests**:
  - Enhances test coverage to verify node replacement in subnets.
  - Adds tests to validate allowance update during node replacements.

### Miscellaneous
- Internal private methods adapted to support new logic without breaking
existing functionality.
  • Loading branch information
sasa-tomic authored Jan 17, 2025
1 parent 1187a89 commit de11e79
Show file tree
Hide file tree
Showing 4 changed files with 475 additions and 67 deletions.
60 changes: 59 additions & 1 deletion rs/registry/canister/src/common/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ use ic_nns_test_utils::registry::{
use ic_protobuf::registry::crypto::v1::PublicKey;
use ic_protobuf::registry::node::v1::IPv4InterfaceConfig;
use ic_protobuf::registry::node::v1::NodeRecord;
use ic_protobuf::registry::node_operator::v1::NodeOperatorRecord;
use ic_protobuf::registry::subnet::v1::SubnetListRecord;
use ic_protobuf::registry::subnet::v1::SubnetRecord;
use ic_registry_keys::make_node_operator_record_key;
use ic_registry_keys::make_subnet_list_record_key;
use ic_registry_keys::make_subnet_record_key;
use ic_registry_transport::pb::v1::{
registry_mutation::Type, RegistryAtomicMutateRequest, RegistryMutation,
};
use ic_registry_transport::upsert;
use ic_registry_transport::{insert, upsert};
use ic_test_utilities_types::ids::subnet_test_id;
use ic_types::ReplicaVersion;
use prost::Message;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -148,3 +151,58 @@ pub fn prepare_registry_with_nodes_and_node_operator_id(
};
(mutate_request, node_ids_and_dkg_pks)
}

pub fn registry_create_subnet_with_nodes(
registry: &mut Registry,
node_ids_and_dkg_pks: &BTreeMap<NodeId, PublicKey>,
node_offsets: &[usize],
) -> ic_types::SubnetId {
let node_ids: Vec<NodeId> = node_ids_and_dkg_pks.keys().cloned().collect();

// Create a subnet with the specified nodes
let subnet_id = subnet_test_id(1000);
let mut subnet_list_record = registry.get_subnet_list_record();
let subnet_record: SubnetRecord =
get_invariant_compliant_subnet_record(node_offsets.iter().map(|&i| node_ids[i]).collect());
let subnet_nodes = node_offsets
.iter()
.map(|&i| (node_ids[i], node_ids_and_dkg_pks[&node_ids[i]].clone()))
.collect();
registry.maybe_apply_mutation_internal(add_fake_subnet(
subnet_id,
&mut subnet_list_record,
subnet_record,
&subnet_nodes,
));

subnet_id
}

pub fn registry_add_node_operator_for_node(
registry: &mut Registry,
node_id: NodeId,
node_allowance: u64,
) -> PrincipalId {
let node_operator_id =
PrincipalId::try_from(registry.get_node_or_panic(node_id).node_operator_id).unwrap();
let node_operator_record_key = make_node_operator_record_key(node_operator_id);

if registry
.get(
node_operator_record_key.as_bytes(),
registry.latest_version(),
)
.is_none()
{
let node_operator_record = NodeOperatorRecord {
node_allowance,
..Default::default()
};

registry.maybe_apply_mutation_internal(vec![insert(
node_operator_record_key,
node_operator_record.encode_to_vec(),
)]);
};
node_operator_id
}
223 changes: 195 additions & 28 deletions rs/registry/canister/src/mutations/node_management/do_add_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ impl Registry {
let mut node_operator_record = get_node_operator_record(self, caller_id)
.map_err(|err| format!("{}do_add_node: Aborting node addition: {}", LOG_PREFIX, err))?;

// 1. Clear out any nodes that already exist at this IP.
// 1. Validate keys and get the node id
let (node_id, valid_pks) = valid_keys_from_payload(&payload)
.map_err(|err| format!("{}do_add_node: {}", LOG_PREFIX, err))?;

println!("{}do_add_node: The node id is {:?}", LOG_PREFIX, node_id);

// 2. Clear out any nodes that already exist at this IP.
// This will only succeed if:
// - the same NO was in control of the original nodes.
// - the nodes are no longer in subnets.
Expand All @@ -57,26 +63,47 @@ impl Registry {
// release dashboard.)
let http_endpoint = connection_endpoint_from_string(&payload.http_endpoint);
let nodes_with_same_ip = scan_for_nodes_by_ip(self, &http_endpoint.ip_addr);
let mut mutations = Vec::new();
let num_removed_nodes = nodes_with_same_ip.len() as u64;
if !nodes_with_same_ip.is_empty() {
for node_id in nodes_with_same_ip {
self.do_remove_node(RemoveNodeDirectlyPayload { node_id }, caller_id);
if nodes_with_same_ip.len() == 1 {
mutations = self.make_remove_or_replace_node_mutations(
RemoveNodeDirectlyPayload {
node_id: nodes_with_same_ip[0],
},
caller_id,
Some(node_id),
);
} else {
// In the unlikely situation that multiple nodes share the same IP address as the new node,
// this will remove the existing nodes.
// While the situation is unexpected, the behavior is backwards compatible.
// This may happen only if there is a bug in the registry code and the registry invariant isn't enforced,
// due to which the node id was not properly removed.
for previous_node_id in nodes_with_same_ip {
mutations.extend(self.make_remove_or_replace_node_mutations(
RemoveNodeDirectlyPayload {
node_id: previous_node_id,
},
caller_id,
// If there are multiple nodes with the same IP, then each of them could in principle be in a (different) subnet.
// In that case replacing all different node ids with the same new node isn't an option.
// To cover for this corner case, we don't replace the node id but just remove the node and potentially fail.
None,
));
}
}

// Update the NO record, as the available allowance may have changed.
node_operator_record = get_node_operator_record(self, caller_id).map_err(|err| {
format!("{}do_add_node: Aborting node addition: {}", LOG_PREFIX, err)
})?
}

// 2. Check if adding one more node will get us over the cap for the Node Operator
if node_operator_record.node_allowance == 0 {
// 3. Check if adding one more node will get us over the cap for the Node Operator
if node_operator_record.node_allowance + num_removed_nodes == 0 {
return Err(format!(
"{}do_add_node: Node allowance for this Node Operator is exhausted",
LOG_PREFIX
));
}

// 3. Get valid type if type is in request
// 4. Get valid type if type is in request
let node_reward_type = payload
.node_reward_type
.as_ref()
Expand All @@ -91,10 +118,6 @@ impl Registry {
.transpose()?
.map(|node_reward_type| node_reward_type as i32);

// 4. Validate keys and get the node id
let (node_id, valid_pks) = valid_keys_from_payload(&payload)
.map_err(|err| format!("{}do_add_node: {}", LOG_PREFIX, err))?;

// 5. Validate the domain is valid
let domain: Option<String> = payload
.domain
Expand Down Expand Up @@ -127,8 +150,6 @@ impl Registry {
}
}

println!("{}do_add_node: The node id is {:?}", LOG_PREFIX, node_id);

// 7. Create the Node Record
let node_record = NodeRecord {
xnet: Some(connection_endpoint_from_string(&payload.xnet_endpoint)),
Expand All @@ -142,17 +163,22 @@ impl Registry {
};

// 8. Insert node, public keys, and crypto keys
let mut mutations = make_add_node_registry_mutations(node_id, node_record, valid_pks);
mutations.extend(make_add_node_registry_mutations(
node_id,
node_record,
valid_pks,
));

// 9. Update the Node Operator record
node_operator_record.node_allowance -= 1;
node_operator_record.node_allowance =
node_operator_record.node_allowance + num_removed_nodes - 1;

let update_node_operator_record =
make_update_node_operator_mutation(caller_id, &node_operator_record);

mutations.push(update_node_operator_record);

// 10. Check invariants before applying mutations
// 10. Check invariants and then apply mutations
self.maybe_apply_mutation_internal(mutations);

println!("{}do_add_node finished: {:?}", LOG_PREFIX, payload);
Expand Down Expand Up @@ -287,21 +313,23 @@ fn now() -> Result<Time, String> {

#[cfg(test)]
mod tests {
use std::str::FromStr;

use crate::{
common::test_helpers::invariant_compliant_registry, mutations::common::test::TEST_NODE_ID,
};

use super::*;
use crate::common::test_helpers::{
invariant_compliant_registry, prepare_registry_with_nodes,
registry_add_node_operator_for_node, registry_create_subnet_with_nodes,
};
use crate::mutations::common::test::TEST_NODE_ID;
use ic_base_types::{NodeId, PrincipalId};
use ic_config::crypto::CryptoConfig;
use ic_crypto_node_key_generation::generate_node_keys_once;
use ic_protobuf::registry::node_operator::v1::NodeOperatorRecord;
use ic_registry_canister_api::IPv4Config;
use ic_registry_keys::{make_node_operator_record_key, make_node_record_key};
use ic_registry_transport::insert;
use itertools::Itertools;
use lazy_static::lazy_static;
use prost::Message;
use std::str::FromStr;

/// Prepares the payload to add a new node, for tests.
pub fn prepare_add_node_payload(mutation_id: u8) -> (AddNodePayload, ValidNodePublicKeys) {
Expand All @@ -326,8 +354,8 @@ mod tests {
ni_dkg_dealing_encryption_pk,
transport_tls_cert,
idkg_dealing_encryption_pk: Some(idkg_dealing_encryption_pk),
xnet_endpoint: format!("128.0.{mutation_id}.1:1234"),
http_endpoint: format!("128.0.{mutation_id}.1:4321"),
xnet_endpoint: format!("128.0.{mutation_id}.100:1234"),
http_endpoint: format!("128.0.{mutation_id}.100:4321"),
chip_id: None,
public_ipv4_config: None,
domain: Some("api-example.com".to_string()),
Expand Down Expand Up @@ -737,4 +765,143 @@ mod tests {
.unwrap_err();
assert!(e.contains("do_add_node: There is already another node with the same IPv4 address"));
}

#[test]
fn should_add_node_and_replace_existing_node_in_subnet() {
// This test verifies that adding a new node replaces an existing node in a subnet
let mut registry = invariant_compliant_registry(0);

// Add nodes to the registry
let (mutate_request, node_ids_and_dkg_pks) = prepare_registry_with_nodes(1, 6);
registry.maybe_apply_mutation_internal(mutate_request.mutations);
let node_ids: Vec<NodeId> = node_ids_and_dkg_pks.keys().cloned().collect();
let node_operator_id = registry_add_node_operator_for_node(&mut registry, node_ids[0], 0);

// Create a subnet with the first 4 nodes
let subnet_id =
registry_create_subnet_with_nodes(&mut registry, &node_ids_and_dkg_pks, &[0, 1, 2, 3]);
let subnet_record = registry.get_subnet_or_panic(subnet_id);
let subnet_membership = subnet_record
.membership
.iter()
.map(|bytes| NodeId::from(PrincipalId::try_from(bytes).unwrap()))
.collect::<Vec<NodeId>>();
let expected_remove_node_id = node_ids[1]; // same offset as the subnet membership vector
let expected_remove_node = registry.get_node(subnet_membership[1]).unwrap();

println!(
"Original subnet membership (node ids): {:?}",
subnet_membership
);

// Add a new node with the same IP address and port as an existing node, which should replace the existing node
let (mut payload, _valid_pks) = prepare_add_node_payload(2);
let http = expected_remove_node.http.unwrap();
payload
.http_endpoint
.clone_from(&format!("[{}]:{}", http.ip_addr, http.port));
let new_node_id = registry
.do_add_node_(payload.clone(), node_operator_id)
.expect("failed to add a node");

// Verify the subnet record is updated with the new node
let subnet_record = registry.get_subnet_or_panic(subnet_id);
let mut expected_membership = subnet_membership.clone();
expected_membership[1] = new_node_id;
expected_membership.sort();
let actual_membership: Vec<NodeId> = subnet_record
.membership
.iter()
.map(|bytes| NodeId::from(PrincipalId::try_from(bytes).unwrap()))
.sorted()
.collect();
assert_eq!(actual_membership, expected_membership);

// Verify the old node is removed from the registry
assert!(registry.get_node(expected_remove_node_id).is_none());

// Verify the new node is present in the registry
assert!(registry.get_node(new_node_id).is_some());

// Verify node operator allowance is unchanged
let updated_operator = get_node_operator_record(&registry, node_operator_id).unwrap();
assert_eq!(updated_operator.node_allowance, 0);
}

#[test]
fn should_add_node_with_no_subnet_conflict() {
let mut registry = invariant_compliant_registry(0);

// Add nodes to the registry
let (mutate_request, node_ids_and_dkg_pks) = prepare_registry_with_nodes(1, 4);
registry.maybe_apply_mutation_internal(mutate_request.mutations);
let node_ids: Vec<NodeId> = node_ids_and_dkg_pks.keys().cloned().collect();
let node_operator_id = registry_add_node_operator_for_node(&mut registry, node_ids[0], 1);

// Prepare payload to add a new node
let (payload, _valid_pks) = prepare_add_node_payload(2);

// Add the new node
let new_node_id = registry
.do_add_node_(payload.clone(), node_operator_id)
.expect("failed to add a node");

// Verify the new node is present in the registry
assert!(registry.get_node(new_node_id).is_some());

// Verify node operator allowance is decremented
let updated_operator = get_node_operator_record(&registry, node_operator_id).unwrap();
assert_eq!(updated_operator.node_allowance, 0);

// Verify all nodes are in the registry
for node_id in node_ids {
assert!(registry.get_node(node_id).is_some());
}
}

#[test]
#[should_panic(expected = "Node allowance for this Node Operator is exhausted")]
fn should_panic_if_node_allowance_is_exhausted() {
let mut registry = invariant_compliant_registry(0);

// Add nodes to the registry
let (mutate_request, node_ids_and_dkg_pks) = prepare_registry_with_nodes(1, 1);
registry.maybe_apply_mutation_internal(mutate_request.mutations);
let node_ids: Vec<NodeId> = node_ids_and_dkg_pks.keys().cloned().collect();
let node_operator_id = registry_add_node_operator_for_node(&mut registry, node_ids[0], 0);

// Prepare payload to add a new node
let (payload, _valid_pks) = prepare_add_node_payload(2);

// Attempt to add the new node, which should panic due to exhausted allowance
registry
.do_add_node_(payload.clone(), node_operator_id)
.unwrap();
}

#[test]
fn should_add_node_and_update_allowance() {
let mut registry = invariant_compliant_registry(0);

// Add nodes to the registry
let (mutate_request, node_ids_and_dkg_pks) = prepare_registry_with_nodes(1, 1);
registry.maybe_apply_mutation_internal(mutate_request.mutations);
let node_ids: Vec<NodeId> = node_ids_and_dkg_pks.keys().cloned().collect();
let node_operator_id = registry_add_node_operator_for_node(&mut registry, node_ids[0], 1);

// Prepare payload to add a new node
let (payload, _valid_pks) = prepare_add_node_payload(2);

// Add the new node
let new_node_id = registry
.do_add_node_(payload.clone(), node_operator_id)
.expect("failed to add a node");

// Verify the new node is present in the registry
assert!(registry.get_node(new_node_id).is_some());

// Verify node operator allowance is decremented
let updated_operator = get_node_operator_record(&registry, node_operator_id).unwrap();
assert_eq!(updated_operator.node_allowance, 0);
}
}
Loading

0 comments on commit de11e79

Please sign in to comment.