From 345d34f6a5c414442c64f3baedf5ff5b5cb4ee11 Mon Sep 17 00:00:00 2001 From: Pietro Date: Wed, 7 Feb 2024 11:21:35 +0100 Subject: [PATCH 1/3] Add networks-state-file arg to persist definitions --- Cargo.Bazel.lock | 31 ++++++- Cargo.lock | 1 + .../multiservice-discovery/Cargo.toml | 3 +- .../multiservice-discovery/src/definition.rs | 83 ++++++++++++++++++- .../multiservice-discovery/src/main.rs | 20 +++++ 5 files changed, 132 insertions(+), 6 deletions(-) diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 44932973d..242d37461 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "a3372bb894f86634c80e5bf284230dda73a380e8d62210533b5e46d3f1a4d718", + "checksum": "0fe4d19b9ba0853d6402afb62b8b05080ede570eb67c7c7336921d3eff40874e", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -27583,13 +27583,32 @@ ], "crate_features": { "common": [ - "elf", - "errno", "general", "ioctl", "no_std" ], - "selects": {} + "selects": { + "aarch64-unknown-linux-gnu": [ + "elf", + "errno" + ], + "arm-unknown-linux-gnueabi": [ + "elf", + "errno" + ], + "armv7-unknown-linux-gnueabi": [ + "elf", + "errno" + ], + "i686-unknown-linux-gnu": [ + "elf", + "errno" + ], + "x86_64-unknown-linux-gnu": [ + "elf", + "errno" + ] + } }, "edition": "2021", "version": "0.4.13" @@ -28801,6 +28820,10 @@ "id": "regex 1.10.3", "target": "regex" }, + { + "id": "retry 2.0.0", + "target": "retry" + }, { "id": "serde 1.0.196", "target": "serde" diff --git a/Cargo.lock b/Cargo.lock index 6fd48035d..77d8558aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5793,6 +5793,7 @@ dependencies = [ "ic-utils 0.9.0", "multiservice-discovery-shared", "regex", + "retry", "serde", "serde_json", "service-discovery", diff --git a/rs/ic-observability/multiservice-discovery/Cargo.toml b/rs/ic-observability/multiservice-discovery/Cargo.toml index dd39a5123..2e5625c93 100644 --- a/rs/ic-observability/multiservice-discovery/Cargo.toml +++ b/rs/ic-observability/multiservice-discovery/Cargo.toml @@ -30,4 +30,5 @@ slog-term = { workspace = true } tokio = { workspace = true } url = { workspace = true } futures.workspace = true -axum = "0.7.4" \ No newline at end of file +axum = "0.7.4" +retry = { workspace = true } diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index c10199bed..23deb9285 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -3,6 +3,8 @@ use crossbeam_channel::Sender; use futures_util::future::join_all; use ic_management_types::Network; use ic_registry_client::client::ThresholdSigPublicKey; +use serde::Deserialize; +use serde::Serialize; use service_discovery::job_types::JobType; use service_discovery::registry_sync::Interrupted; use service_discovery::IcServiceDiscovery; @@ -16,6 +18,8 @@ use std::collections::HashSet; use std::error::Error; use std::fmt::Debug; use std::fmt::{Display, Error as FmtError, Formatter}; +use std::fs; +use std::io::Write; use std::net::SocketAddr; use std::sync::Arc; use std::{ @@ -25,6 +29,33 @@ use std::{ use tokio::sync::Mutex; use url::Url; +use crate::make_logger; + +#[derive(Clone, Serialize, Deserialize)] +pub struct FSDefinition { + pub nns_urls: Vec, + pub registry_path: PathBuf, + pub name: String, + pub public_key: Option, + pub poll_interval: Duration, + pub registry_query_timeout: Duration, + pub boundary_nodes: Vec, +} + +impl From for FSDefinition { + fn from(definition: Definition) -> Self { + Self { + nns_urls: definition.nns_urls, + registry_path: definition.registry_path, + name: definition.name, + public_key: definition.public_key, + poll_interval: definition.poll_interval, + registry_query_timeout: definition.registry_query_timeout, + boundary_nodes: definition.boundary_nodes + } + } +} + #[derive(Clone)] pub struct Definition { pub nns_urls: Vec, @@ -38,6 +69,20 @@ pub struct Definition { pub boundary_nodes: Vec, } +impl From for Definition { + fn from(fs_definition: FSDefinition) -> Self { + Definition::new( + fs_definition.nns_urls, + fs_definition.registry_path, + fs_definition.name, + make_logger(), + fs_definition.public_key, + fs_definition.poll_interval, + fs_definition.registry_query_timeout + ) + } +} + impl Debug for Definition { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { write!( @@ -300,7 +345,7 @@ impl RunningDefinition { } } -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct BoundaryNode { pub name: String, pub targets: BTreeSet, @@ -394,6 +439,42 @@ impl DefinitionsSupervisor { } } + pub(crate) async fn load_or_create_defs(&self, networks_state_file: PathBuf) -> Result<(), Box> { + if networks_state_file.exists() { + let file_content = fs::read_to_string(networks_state_file)?; + let initial_definitions: Vec = serde_json::from_str(&file_content)?; + self.start( + initial_definitions.into_iter().map(|def| def.into()).collect(), + StartMode::AddToDefinitions, + ) + .await?; + } + Ok(()) + } + + pub(crate) async fn persist_defs(&self, networks_state_file: PathBuf) -> Result<(), Box> { + let existing = self.definitions.lock().await; + retry::retry(retry::delay::Exponential::from_millis(10).take(5), || { + std::fs::OpenOptions::new() + .create(true) + .write(true) + .open(&networks_state_file.as_path()) + .and_then(|mut file| { + let fs_def: Vec = existing + .values() + .cloned() + .into_iter() + .map(|running_def| running_def.definition.into()) + .collect::>(); + + file.write_all(serde_json::to_string(&fs_def)?.as_bytes()) + .map(|_| file) + }) + .and_then(|mut file| file.flush()) + })?; + Ok(()) + } + async fn start_inner( &self, existing: &mut BTreeMap, diff --git a/rs/ic-observability/multiservice-discovery/src/main.rs b/rs/ic-observability/multiservice-discovery/src/main.rs index a9178df81..5fe3110d7 100644 --- a/rs/ic-observability/multiservice-discovery/src/main.rs +++ b/rs/ic-observability/multiservice-discovery/src/main.rs @@ -67,6 +67,10 @@ fn main() { } } else { let supervisor = DefinitionsSupervisor::new(rt.handle().clone(), cli_args.start_without_mainnet); + if let Some(networks_state_file) = cli_args.networks_state_file.clone() { + rt.block_on(supervisor.load_or_create_defs(networks_state_file)).unwrap(); + } + let (server_stop, server_stop_receiver) = oneshot::channel(); //Configure server @@ -98,6 +102,11 @@ fn main() { // Signal server to stop. Stop happens in parallel with supervisor stop. server_stop.send(()).unwrap(); + // Persist definitions to disk before ending the supervisor. + if let Some(networks_state_file) = cli_args.networks_state_file.clone() { + rt.block_on(supervisor.persist_defs(networks_state_file)).unwrap(); + } + //Stop all definitions. End happens in parallel with server stop. rt.block_on(supervisor.end()); @@ -180,4 +189,15 @@ the Prometheus targets of mainnet as a JSON structure on stdout. "# )] render_prom_targets_to_stdout: bool, + + #[clap( + long = "networks-state-file", + default_value = None, + action, + help = r#" +Preload networks definitions from file path. In case the file does not +exist, it will be created. +"# + )] + networks_state_file: Option, } From 10f59a14f65f553307e837be984d1c0341637e2c Mon Sep 17 00:00:00 2001 From: Pietro Date: Wed, 7 Feb 2024 13:26:35 +0100 Subject: [PATCH 2/3] repin --- Cargo.Bazel.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index b7a2c17af..419536a5d 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "0fe4d19b9ba0853d6402afb62b8b05080ede570eb67c7c7336921d3eff40874e", + "checksum": "0f4ce627fa1b7db78ee1d8b03f24a795ae82e98a64544ca23fb300913ca0347b", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", From 2a6c7ade5acf12e5a287147eddaddf4979fd963e Mon Sep 17 00:00:00 2001 From: Pietro Date: Wed, 7 Feb 2024 14:03:59 +0100 Subject: [PATCH 3/3] repin --- Cargo.Bazel.lock | 12 +++++++++++- Cargo.lock | 1 + .../multiservice-discovery/Cargo.toml | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 17ba6b109..f61606d84 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "86820685790dc151c444b969cb6cb2ce2835933116a79bd69d9979b0df78d0db", + "checksum": "2bda21c3422f1e2a634bfe6e3d48e0041bd5dbb73f9d63b191e5393c2e338c19", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -28163,15 +28163,23 @@ ], "selects": { "aarch64-unknown-linux-gnu": [ + "elf", + "errno", "std" ], "arm-unknown-linux-gnueabi": [ + "elf", + "errno", "std" ], "armv7-unknown-linux-gnueabi": [ + "elf", + "errno", "std" ], "i686-unknown-linux-gnu": [ + "elf", + "errno", "std" ], "powerpc-unknown-linux-gnu": [ @@ -28181,6 +28189,8 @@ "std" ], "x86_64-unknown-linux-gnu": [ + "elf", + "errno", "std" ] } diff --git a/Cargo.lock b/Cargo.lock index 6810b66bb..79b38f594 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5909,6 +5909,7 @@ dependencies = [ "multiservice-discovery-shared", "regex", "reqwest", + "retry", "serde", "serde_json", "service-discovery", diff --git a/rs/ic-observability/multiservice-discovery/Cargo.toml b/rs/ic-observability/multiservice-discovery/Cargo.toml index 132ee4506..85b65433a 100644 --- a/rs/ic-observability/multiservice-discovery/Cargo.toml +++ b/rs/ic-observability/multiservice-discovery/Cargo.toml @@ -31,6 +31,7 @@ tokio = { workspace = true } url = { workspace = true } futures.workspace = true axum = "0.7.4" +retry = { workspace = true } [dev-dependencies] tempfile = { workspace = true }