Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[oximeter] Don't stop refreshing producers forever if one attempt fails #7191

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions oximeter/collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ semver.workspace = true
serde.workspace = true
slog.workspace = true
slog-async.workspace = true
slog-error-chain.workspace = true
slog-dtrace.workspace = true
slog-term.workspace = true
strum.workspace = true
Expand Down
155 changes: 84 additions & 71 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use slog::o;
use slog::trace;
use slog::warn;
use slog::Logger;
use slog_error_chain::InlineErrorChain;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::net::SocketAddrV6;
Expand Down Expand Up @@ -126,7 +127,7 @@ async fn perform_collection(
warn!(
log,
"failed to collect results from producer";
"error" => ?e,
InlineErrorChain::new(&e),
);
Err(self_stats::FailureReason::Deserialization)
}
Expand All @@ -144,7 +145,7 @@ async fn perform_collection(
error!(
log,
"failed to send collection request to producer";
"error" => ?e
InlineErrorChain::new(&e),
);
Err(self_stats::FailureReason::Unreachable)
}
Expand Down Expand Up @@ -238,7 +239,7 @@ async fn inner_collection_loop(
log,
"failed to receive on producer update \
watch channel, exiting";
"error" => ?e,
InlineErrorChain::new(&e),
);
return;
}
Expand Down Expand Up @@ -548,7 +549,7 @@ async fn results_printer(
error!(
log,
"received error from a producer";
"err" => ?e,
InlineErrorChain::new(&e),
);
}
}
Expand Down Expand Up @@ -760,8 +761,10 @@ impl OximeterAgent {
) {
let mut task = self.refresh_task.lock().unwrap();
if task.is_none() {
let refresh_task =
tokio::spawn(refresh_producer_list(self.clone(), nexus_pool));
let refresh_task = tokio::spawn(refresh_producer_list_task(
self.clone(),
nexus_pool,
));
*task = Some(refresh_task);
}
}
Expand Down Expand Up @@ -1026,7 +1029,7 @@ impl OximeterAgent {
self.log,
"failed to shut down collection task";
"producer_id" => %id,
"error" => ?e,
InlineErrorChain::new(&e),
),
}
Ok(())
Expand Down Expand Up @@ -1074,7 +1077,7 @@ impl OximeterAgent {
}

// A task which periodically updates our list of producers from Nexus.
async fn refresh_producer_list(
async fn refresh_producer_list_task(
agent: OximeterAgent,
nexus_pool: Pool<NexusClient>,
) {
Expand All @@ -1089,77 +1092,86 @@ async fn refresh_producer_list(
loop {
interval.tick().await;
info!(agent.log, "refreshing list of producers from Nexus");
refresh_producer_list_once(&agent, &nexus_pool).await;
}
}

let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await;
let mut stream = client.cpapi_assigned_producers_list_stream(
&agent.id,
// This is a _total_ limit, not a page size, so `None` means "get
// all entries".
None,
Some(IdSortMode::IdAscending),
);
let mut expected_producers = BTreeMap::new();
loop {
match stream.try_next().await {
Err(e) => {
// TODO-robustness: Some errors here may not be "fatal", in
// the sense that we can continue to process the list we
// receive from Nexus. It's not clear which ones though,
// since most of these are either impossible (pre-hook
// errors) or indicate we've made a serious programming
// error or updated to incompatible versions of Nexus /
// Oximeter. One that we might be able to handle is a
// communication error, say failing to fetch the last page
// when we've already fetched the first few. But for now,
// we'll simply keep the list we have and try again on the
// next refresh.
//
// For now, let's just avoid doing anything at all here, on
// the theory that if we hit this, we should just continue
// collecting from the last known-good set of producers.
error!(
agent.log,
"error fetching next assigned producer, \
abandoning this refresh attempt";
"err" => ?e,
);
return;
}
Ok(Some(p)) => {
let endpoint = match ProducerEndpoint::try_from(p) {
Ok(ep) => ep,
Err(e) => {
error!(
agent.log,
"failed to convert producer description \
from Nexus, skipping producer";
"err" => e
);
continue;
}
};
let old = expected_producers.insert(endpoint.id, endpoint);
if let Some(ProducerEndpoint { id, .. }) = old {
// Run a single "producer refresh from Nexus" operation (which may require
// multiple requests to Nexus to fetch multiple pages of producers).
async fn refresh_producer_list_once(
agent: &OximeterAgent,
nexus_pool: &Pool<NexusClient>,
) {
let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await;
let mut stream = client.cpapi_assigned_producers_list_stream(
&agent.id,
// This is a _total_ limit, not a page size, so `None` means "get
// all entries".
None,
Some(IdSortMode::IdAscending),
);
let mut expected_producers = BTreeMap::new();
loop {
match stream.try_next().await {
Err(e) => {
// TODO-robustness: Some errors here may not be "fatal", in
// the sense that we can continue to process the list we
// receive from Nexus. It's not clear which ones though,
// since most of these are either impossible (pre-hook
// errors) or indicate we've made a serious programming
// error or updated to incompatible versions of Nexus /
// Oximeter. One that we might be able to handle is a
// communication error, say failing to fetch the last page
// when we've already fetched the first few. But for now,
// we'll simply keep the list we have and try again on the
// next refresh.
//
// For now, let's just avoid doing anything at all here, on
// the theory that if we hit this, we should just continue
// collecting from the last known-good set of producers.
error!(
agent.log,
"error fetching next assigned producer, \
abandoning this refresh attempt";
InlineErrorChain::new(&e),
);
return;
}
Ok(Some(p)) => {
let endpoint = match ProducerEndpoint::try_from(p) {
Ok(ep) => ep,
Err(e) => {
error!(
agent.log,
"Nexus appears to have sent duplicate producer info";
"producer_id" => %id,
"failed to convert producer description \
from Nexus, skipping producer";
// No `InlineErrorChain` here: `e` is a string
"error" => e,
);
continue;
}
};
let old = expected_producers.insert(endpoint.id, endpoint);
if let Some(ProducerEndpoint { id, .. }) = old {
error!(
agent.log,
"Nexus appears to have sent duplicate producer info";
"producer_id" => %id,
);
}
Ok(None) => break,
}
Ok(None) => break,
}
let n_current_tasks = expected_producers.len();
let n_pruned_tasks = agent.ensure_producers(expected_producers).await;
*agent.last_refresh_time.lock().unwrap() = Some(Utc::now());
info!(
agent.log,
"refreshed list of producers from Nexus";
"n_pruned_tasks" => n_pruned_tasks,
"n_current_tasks" => n_current_tasks,
);
}
let n_current_tasks = expected_producers.len();
let n_pruned_tasks = agent.ensure_producers(expected_producers).await;
*agent.last_refresh_time.lock().unwrap() = Some(Utc::now());
info!(
agent.log,
"refreshed list of producers from Nexus";
"n_pruned_tasks" => n_pruned_tasks,
"n_current_tasks" => n_current_tasks,
);
}

async fn claim_nexus_with_backoff(
Expand All @@ -1171,7 +1183,8 @@ async fn claim_nexus_with_backoff(
log,
"failed to lookup Nexus IP, will retry";
"delay" => ?delay,
"error" => ?error,
// No `InlineErrorChain` here: `error` is a string
"error" => error,
);
};
let do_lookup = || async {
Expand Down
Loading