From 11a3e0b1e4e031a3049994e5dc7de61478c8c178 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 2 Dec 2024 10:50:12 -0500 Subject: [PATCH 1/2] [oximeter] Don't stop refreshing producers forever if one attempt fails --- oximeter/collector/src/agent.rs | 140 +++++++++++++++++--------------- 1 file changed, 75 insertions(+), 65 deletions(-) diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 8572f7f508..073c91fa59 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -760,8 +760,10 @@ impl OximeterAgent { ) { let mut task = self.refresh_task.lock().unwrap(); if task.is_none() { - let refresh_task = - tokio::spawn(refresh_producer_list(self.clone(), nexus_pool)); + let refresh_task = tokio::spawn(refresh_producer_list_task( + self.clone(), + nexus_pool, + )); *task = Some(refresh_task); } } @@ -1074,7 +1076,7 @@ impl OximeterAgent { } // A task which periodically updates our list of producers from Nexus. -async fn refresh_producer_list( +async fn refresh_producer_list_task( agent: OximeterAgent, nexus_pool: Pool, ) { @@ -1089,77 +1091,85 @@ async fn refresh_producer_list( loop { interval.tick().await; info!(agent.log, "refreshing list of producers from Nexus"); + refresh_producer_list_once(&agent, &nexus_pool).await; + } +} - let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await; - let mut stream = client.cpapi_assigned_producers_list_stream( - &agent.id, - // This is a _total_ limit, not a page size, so `None` means "get - // all entries". - None, - Some(IdSortMode::IdAscending), - ); - let mut expected_producers = BTreeMap::new(); - loop { - match stream.try_next().await { - Err(e) => { - // TODO-robustness: Some errors here may not be "fatal", in - // the sense that we can continue to process the list we - // receive from Nexus. It's not clear which ones though, - // since most of these are either impossible (pre-hook - // errors) or indicate we've made a serious programming - // error or updated to incompatible versions of Nexus / - // Oximeter. One that we might be able to handle is a - // communication error, say failing to fetch the last page - // when we've already fetched the first few. But for now, - // we'll simply keep the list we have and try again on the - // next refresh. - // - // For now, let's just avoid doing anything at all here, on - // the theory that if we hit this, we should just continue - // collecting from the last known-good set of producers. - error!( - agent.log, - "error fetching next assigned producer, \ - abandoning this refresh attempt"; - "err" => ?e, - ); - return; - } - Ok(Some(p)) => { - let endpoint = match ProducerEndpoint::try_from(p) { - Ok(ep) => ep, - Err(e) => { - error!( - agent.log, - "failed to convert producer description \ - from Nexus, skipping producer"; - "err" => e - ); - continue; - } - }; - let old = expected_producers.insert(endpoint.id, endpoint); - if let Some(ProducerEndpoint { id, .. }) = old { +// Run a single "producer refresh from Nexus" operation (which may require +// multiple requests to Nexus to fetch multiple pages of producers). +async fn refresh_producer_list_once( + agent: &OximeterAgent, + nexus_pool: &Pool, +) { + let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await; + let mut stream = client.cpapi_assigned_producers_list_stream( + &agent.id, + // This is a _total_ limit, not a page size, so `None` means "get + // all entries". + None, + Some(IdSortMode::IdAscending), + ); + let mut expected_producers = BTreeMap::new(); + loop { + match stream.try_next().await { + Err(e) => { + // TODO-robustness: Some errors here may not be "fatal", in + // the sense that we can continue to process the list we + // receive from Nexus. It's not clear which ones though, + // since most of these are either impossible (pre-hook + // errors) or indicate we've made a serious programming + // error or updated to incompatible versions of Nexus / + // Oximeter. One that we might be able to handle is a + // communication error, say failing to fetch the last page + // when we've already fetched the first few. But for now, + // we'll simply keep the list we have and try again on the + // next refresh. + // + // For now, let's just avoid doing anything at all here, on + // the theory that if we hit this, we should just continue + // collecting from the last known-good set of producers. + error!( + agent.log, + "error fetching next assigned producer, \ + abandoning this refresh attempt"; + "err" => ?e, + ); + return; + } + Ok(Some(p)) => { + let endpoint = match ProducerEndpoint::try_from(p) { + Ok(ep) => ep, + Err(e) => { error!( agent.log, - "Nexus appears to have sent duplicate producer info"; - "producer_id" => %id, + "failed to convert producer description \ + from Nexus, skipping producer"; + "err" => e ); + continue; } + }; + let old = expected_producers.insert(endpoint.id, endpoint); + if let Some(ProducerEndpoint { id, .. }) = old { + error!( + agent.log, + "Nexus appears to have sent duplicate producer info"; + "producer_id" => %id, + ); } - Ok(None) => break, } + Ok(None) => break, } - let n_current_tasks = expected_producers.len(); - let n_pruned_tasks = agent.ensure_producers(expected_producers).await; - *agent.last_refresh_time.lock().unwrap() = Some(Utc::now()); - info!( - agent.log, - "refreshed list of producers from Nexus"; - "n_pruned_tasks" => n_pruned_tasks, - "n_current_tasks" => n_current_tasks, - ); } + let n_current_tasks = expected_producers.len(); + let n_pruned_tasks = agent.ensure_producers(expected_producers).await; + *agent.last_refresh_time.lock().unwrap() = Some(Utc::now()); + info!( + agent.log, + "refreshed list of producers from Nexus"; + "n_pruned_tasks" => n_pruned_tasks, + "n_current_tasks" => n_current_tasks, + ); } async fn claim_nexus_with_backoff( From 86522dc9102036feccd3de257415fc594cbc98e1 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 2 Dec 2024 10:58:32 -0500 Subject: [PATCH 2/2] use slog-error-chain to get more details on errors --- Cargo.lock | 1 + oximeter/collector/Cargo.toml | 1 + oximeter/collector/src/agent.rs | 19 +++++++++++-------- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8a1d33a6b..71ead293b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7839,6 +7839,7 @@ dependencies = [ "slog", "slog-async", "slog-dtrace", + "slog-error-chain", "slog-term", "strum", "subprocess", diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index 87c82d1183..10ea4e6a9d 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -32,6 +32,7 @@ semver.workspace = true serde.workspace = true slog.workspace = true slog-async.workspace = true +slog-error-chain.workspace = true slog-dtrace.workspace = true slog-term.workspace = true strum.workspace = true diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 073c91fa59..6fa8c01c56 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -32,6 +32,7 @@ use slog::o; use slog::trace; use slog::warn; use slog::Logger; +use slog_error_chain::InlineErrorChain; use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::net::SocketAddrV6; @@ -126,7 +127,7 @@ async fn perform_collection( warn!( log, "failed to collect results from producer"; - "error" => ?e, + InlineErrorChain::new(&e), ); Err(self_stats::FailureReason::Deserialization) } @@ -144,7 +145,7 @@ async fn perform_collection( error!( log, "failed to send collection request to producer"; - "error" => ?e + InlineErrorChain::new(&e), ); Err(self_stats::FailureReason::Unreachable) } @@ -238,7 +239,7 @@ async fn inner_collection_loop( log, "failed to receive on producer update \ watch channel, exiting"; - "error" => ?e, + InlineErrorChain::new(&e), ); return; } @@ -548,7 +549,7 @@ async fn results_printer( error!( log, "received error from a producer"; - "err" => ?e, + InlineErrorChain::new(&e), ); } } @@ -1028,7 +1029,7 @@ impl OximeterAgent { self.log, "failed to shut down collection task"; "producer_id" => %id, - "error" => ?e, + InlineErrorChain::new(&e), ), } Ok(()) @@ -1132,7 +1133,7 @@ async fn refresh_producer_list_once( agent.log, "error fetching next assigned producer, \ abandoning this refresh attempt"; - "err" => ?e, + InlineErrorChain::new(&e), ); return; } @@ -1144,7 +1145,8 @@ async fn refresh_producer_list_once( agent.log, "failed to convert producer description \ from Nexus, skipping producer"; - "err" => e + // No `InlineErrorChain` here: `e` is a string + "error" => e, ); continue; } @@ -1181,7 +1183,8 @@ async fn claim_nexus_with_backoff( log, "failed to lookup Nexus IP, will retry"; "delay" => ?delay, - "error" => ?error, + // No `InlineErrorChain` here: `error` is a string + "error" => error, ); }; let do_lookup = || async {