Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist definitions to disk on add/remove #192

Merged
merged 1 commit into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 47 additions & 31 deletions rs/ic-observability/multiservice-discovery/src/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,18 @@ pub(super) struct DefinitionsSupervisor {
rt: tokio::runtime::Handle,
pub(super) definitions: Arc<Mutex<BTreeMap<String, RunningDefinition>>>,
allow_mercury_deletion: bool,
networks_state_file: Option<PathBuf>,
log: Logger,
}

impl DefinitionsSupervisor {
pub(crate) fn new(rt: tokio::runtime::Handle, allow_mercury_deletion: bool) -> Self {
pub(crate) fn new(rt: tokio::runtime::Handle, allow_mercury_deletion: bool, networks_state_file: Option<PathBuf>, log: Logger) -> Self {
DefinitionsSupervisor {
rt,
definitions: Arc::new(Mutex::new(BTreeMap::new())),
allow_mercury_deletion,
networks_state_file,
log
}
}

Expand All @@ -466,43 +470,52 @@ impl DefinitionsSupervisor {
// definitions, no action should be taken on this MSD.
pub(crate) async fn load_or_create_defs(
&self,
networks_state_file: PathBuf,
metrics: RunningDefinitionsMetrics,
) -> Result<(), Box<dyn Error>> {
if networks_state_file.exists() {
let file_content = fs::read_to_string(networks_state_file)?;
let initial_definitions: Vec<FSDefinition> = serde_json::from_str(&file_content)?;
self.start(
initial_definitions.into_iter().map(|def| def.into()).collect(),
StartMode::AddToDefinitions,
metrics,
)
.await?;
if let Some(networks_state_file) = self.networks_state_file.clone() {
if networks_state_file.exists() {
let file_content = fs::read_to_string(networks_state_file.clone())?;
let initial_definitions: Vec<FSDefinition> = serde_json::from_str(&file_content)?;
let names = initial_definitions.iter().map(|def| def.name.clone()).collect::<Vec<_>>();
info!(
self.log,
"Definitions loaded from {:?}:\n{:?}",
networks_state_file.as_path(),
names
);
self.start(
initial_definitions.into_iter().map(|def| def.into()).collect(),
StartMode::AddToDefinitions,
metrics,
)
.await?;
}
}
Ok(())
}

// FIXME: if the file contents on disk are the same as the contents about to
// be persisted, then the file should not be overwritten because it was
// already updated by another MSD sharing the same directory.
pub(crate) async fn persist_defs(&self, networks_state_file: PathBuf) -> Result<(), Box<dyn Error>> {
let existing = self.definitions.lock().await;
retry::retry(retry::delay::Exponential::from_millis(10).take(5), || {
std::fs::OpenOptions::new()
.create(true)
.write(true)
.open(networks_state_file.as_path())
.and_then(|mut file| {
let fs_def: Vec<FSDefinition> = existing
.values()
.cloned()
.map(|running_def| running_def.definition.into())
.collect::<Vec<_>>();

file.write_all(serde_json::to_string(&fs_def)?.as_bytes()).map(|_| file)
})
.and_then(|mut file| file.flush())
})?;
pub(crate) async fn persist_defs(&self, existing: &mut BTreeMap<String, RunningDefinition>) -> Result<(), Box<dyn Error>> {
if let Some(networks_state_file) = self.networks_state_file.clone() {
retry::retry(retry::delay::Exponential::from_millis(10).take(5), || {
std::fs::OpenOptions::new()
.create(true)
.write(true)
.open(networks_state_file.as_path())
.and_then(|mut file| {
let fs_def: Vec<FSDefinition> = existing
.values()
.cloned()
.map(|running_def| running_def.definition.into())
.collect::<Vec<_>>();

file.write_all(serde_json::to_string(&fs_def)?.as_bytes()).map(|_| file)
})
.and_then(|mut file| file.flush())
})?;
}
Ok(())
}

Expand Down Expand Up @@ -567,15 +580,18 @@ impl DefinitionsSupervisor {
// End them and join them all.
join_all(defs_to_end.iter_mut().map(|def| async { def.end().await })).await;
drop(defs_to_end);
drop(ic_names_to_end);

drop(ic_names_to_end);
// Now we add the incoming definitions.
for definition in definitions.into_iter() {
existing.insert(
definition.name.clone(),
definition.run(self.rt.clone(), metrics.clone()).await,
);
}
// Now we rewrite definitions to disk.
if let Err(e) = self.persist_defs(existing).await {
warn!(self.log, "Error while peristing definitions to disk '{}'", e);
}
Ok(())
}

Expand Down
24 changes: 10 additions & 14 deletions rs/ic-observability/multiservice-discovery/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,23 @@ fn main() {
print!("{}", text);
}
} else {
let supervisor = DefinitionsSupervisor::new(rt.handle().clone(), cli_args.start_without_mainnet);
let supervisor = DefinitionsSupervisor::new(
rt.handle().clone(),
cli_args.start_without_mainnet,
cli_args.networks_state_file.clone(),
make_logger(),
);
let (server_stop, server_stop_receiver) = oneshot::channel();

// Initialize the metrics layer because in the build method the `global::provider`
// is set. We can use global::meter only after that call.
let metrics_layer = HttpMetricsLayerBuilder::new().build();
let metrics = MSDMetrics::new();

if let Some(networks_state_file) = cli_args.networks_state_file.clone() {
rt.block_on(
supervisor.load_or_create_defs(networks_state_file, metrics.running_definition_metrics.clone()),
)
.unwrap();
}
rt.block_on(
supervisor.load_or_create_defs(metrics.running_definition_metrics.clone()),
)
.unwrap();

// First check if we should start the mainnet definition so we can
// serve it right after the server starts.
Expand Down Expand Up @@ -117,13 +120,6 @@ fn main() {
// Signal server to stop. Stop happens in parallel with supervisor stop.
server_stop.send(()).unwrap();

// Persist definitions to disk before ending the supervisor.
// FIXME: persistence should happen when the definitions structure
// changes, not just on end. E.g. when a public key is updated.
if let Some(networks_state_file) = cli_args.networks_state_file.clone() {
rt.block_on(supervisor.persist_defs(networks_state_file)).unwrap();
}

//Stop all definitions. End happens in parallel with server stop.
rt.block_on(supervisor.end());

Expand Down
Loading