Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistence to MSD definitions #169

Merged
merged 5 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "86820685790dc151c444b969cb6cb2ce2835933116a79bd69d9979b0df78d0db",
"checksum": "2bda21c3422f1e2a634bfe6e3d48e0041bd5dbb73f9d63b191e5393c2e338c19",
"crates": {
"actix-codec 0.5.2": {
"name": "actix-codec",
Expand Down Expand Up @@ -28157,23 +28157,29 @@
],
"crate_features": {
"common": [
"elf",
"errno",
"general",
"ioctl",
"no_std"
],
"selects": {
"aarch64-unknown-linux-gnu": [
"elf",
"errno",
"std"
],
"arm-unknown-linux-gnueabi": [
"elf",
"errno",
"std"
],
"armv7-unknown-linux-gnueabi": [
"elf",
"errno",
"std"
],
"i686-unknown-linux-gnu": [
"elf",
"errno",
"std"
],
"powerpc-unknown-linux-gnu": [
Expand All @@ -28183,6 +28189,8 @@
"std"
],
"x86_64-unknown-linux-gnu": [
"elf",
"errno",
"std"
]
}
Expand Down Expand Up @@ -29397,6 +29405,10 @@
"id": "regex 1.10.3",
"target": "regex"
},
{
"id": "retry 2.0.0",
"target": "retry"
},
{
"id": "serde 1.0.196",
"target": "serde"
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/ic-observability/multiservice-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio = { workspace = true }
url = { workspace = true }
futures.workspace = true
axum = "0.7.4"
retry = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
83 changes: 82 additions & 1 deletion rs/ic-observability/multiservice-discovery/src/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crossbeam_channel::Sender;
use futures_util::future::join_all;
use ic_management_types::Network;
use ic_registry_client::client::ThresholdSigPublicKey;
use serde::Deserialize;
use serde::Serialize;
use service_discovery::job_types::JobType;
use service_discovery::registry_sync::Interrupted;
use service_discovery::IcServiceDiscovery;
Expand All @@ -16,6 +18,8 @@ use std::collections::HashSet;
use std::error::Error;
use std::fmt::Debug;
use std::fmt::{Display, Error as FmtError, Formatter};
use std::fs;
use std::io::Write;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{
Expand All @@ -25,6 +29,33 @@ use std::{
use tokio::sync::Mutex;
use url::Url;

use crate::make_logger;

#[derive(Clone, Serialize, Deserialize)]
pub struct FSDefinition {
pub nns_urls: Vec<Url>,
pub registry_path: PathBuf,
pub name: String,
pub public_key: Option<ThresholdSigPublicKey>,
pub poll_interval: Duration,
pub registry_query_timeout: Duration,
pub boundary_nodes: Vec<BoundaryNode>,
}

impl From<Definition> for FSDefinition {
fn from(definition: Definition) -> Self {
Self {
nns_urls: definition.nns_urls,
registry_path: definition.registry_path,
name: definition.name,
public_key: definition.public_key,
poll_interval: definition.poll_interval,
registry_query_timeout: definition.registry_query_timeout,
boundary_nodes: definition.boundary_nodes
}
}
}

#[derive(Clone)]
pub struct Definition {
pub nns_urls: Vec<Url>,
Expand All @@ -38,6 +69,20 @@ pub struct Definition {
pub boundary_nodes: Vec<BoundaryNode>,
}

impl From<FSDefinition> for Definition {
fn from(fs_definition: FSDefinition) -> Self {
Definition::new(
fs_definition.nns_urls,
fs_definition.registry_path,
fs_definition.name,
make_logger(),
fs_definition.public_key,
fs_definition.poll_interval,
fs_definition.registry_query_timeout
)
}
}

impl Debug for Definition {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
write!(
Expand Down Expand Up @@ -300,7 +345,7 @@ impl RunningDefinition {
}
}

#[derive(Clone)]
#[derive(Clone, Serialize, Deserialize)]
pub struct BoundaryNode {
pub name: String,
pub targets: BTreeSet<SocketAddr>,
Expand Down Expand Up @@ -394,6 +439,42 @@ impl DefinitionsSupervisor {
}
}

pub(crate) async fn load_or_create_defs(&self, networks_state_file: PathBuf) -> Result<(), Box<dyn Error>> {
if networks_state_file.exists() {
let file_content = fs::read_to_string(networks_state_file)?;
let initial_definitions: Vec<FSDefinition> = serde_json::from_str(&file_content)?;
self.start(
initial_definitions.into_iter().map(|def| def.into()).collect(),
StartMode::AddToDefinitions,
)
.await?;
}
Ok(())
}

pub(crate) async fn persist_defs(&self, networks_state_file: PathBuf) -> Result<(), Box<dyn Error>> {
let existing = self.definitions.lock().await;
retry::retry(retry::delay::Exponential::from_millis(10).take(5), || {
std::fs::OpenOptions::new()
.create(true)
.write(true)
.open(&networks_state_file.as_path())
.and_then(|mut file| {
let fs_def: Vec<FSDefinition> = existing
.values()
.cloned()
.into_iter()
.map(|running_def| running_def.definition.into())
.collect::<Vec<_>>();

file.write_all(serde_json::to_string(&fs_def)?.as_bytes())
.map(|_| file)
})
.and_then(|mut file| file.flush())
})?;
Ok(())
}

async fn start_inner(
&self,
existing: &mut BTreeMap<String, RunningDefinition>,
Expand Down
20 changes: 20 additions & 0 deletions rs/ic-observability/multiservice-discovery/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ fn main() {
}
} else {
let supervisor = DefinitionsSupervisor::new(rt.handle().clone(), cli_args.start_without_mainnet);
if let Some(networks_state_file) = cli_args.networks_state_file.clone() {
rt.block_on(supervisor.load_or_create_defs(networks_state_file)).unwrap();
}

let (server_stop, server_stop_receiver) = oneshot::channel();

//Configure server
Expand Down Expand Up @@ -98,6 +102,11 @@ fn main() {
// Signal server to stop. Stop happens in parallel with supervisor stop.
server_stop.send(()).unwrap();

// Persist definitions to disk before ending the supervisor.
if let Some(networks_state_file) = cli_args.networks_state_file.clone() {
rt.block_on(supervisor.persist_defs(networks_state_file)).unwrap();
}

//Stop all definitions. End happens in parallel with server stop.
rt.block_on(supervisor.end());

Expand Down Expand Up @@ -180,4 +189,15 @@ the Prometheus targets of mainnet as a JSON structure on stdout.
"#
)]
render_prom_targets_to_stdout: bool,

#[clap(
long = "networks-state-file",
default_value = None,
action,
help = r#"
Preload networks definitions from file path. In case the file does not
exist, it will be created.
"#
)]
networks_state_file: Option<PathBuf>,
}
Loading