diff --git a/common/src/api/internal/nexus.rs b/common/src/api/internal/nexus.rs index 2f5c8be220..6705569ae4 100644 --- a/common/src/api/internal/nexus.rs +++ b/common/src/api/internal/nexus.rs @@ -218,7 +218,7 @@ pub enum ProducerKind { /// Information announced by a metric server, used so that clients can contact it and collect /// available metric data from it. -#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize, PartialEq)] +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize, PartialEq)] pub struct ProducerEndpoint { /// A unique ID for this producer. pub id: Uuid, diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index d19a9323c2..0b33276de4 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -183,10 +183,20 @@ async fn collection_task( perform_collection(&log, &mut stats, &client, &producer, &outbox, Some(token)).await; }, Some(CollectionMessage::Update(new_info)) => { + // If the collection interval is shorter than the + // interval on which we receive these update messages, + // we'll never actually collect anything! Instead, only + // do the update if the information has changed. This + // should also be guarded against by the main agent, but + // we're being cautious here. + if producer == new_info { + continue; + } producer = new_info; debug!( log, - "collection task received request to update its producer information"; + "collection task received request to update \ + its producer information"; "interval" => ?producer.interval, "address" => producer.address, ); @@ -606,7 +616,7 @@ impl OximeterAgent { "component" => "collection-task", "producer_id" => id.to_string(), )); - let info_clone = info.clone(); + let info_clone = info; let target = self.collection_target; let task = tokio::spawn(async move { collection_task(log, target, info_clone, rx, q).await; @@ -614,22 +624,35 @@ impl OximeterAgent { value.insert((info, CollectionTask { inbox: tx, task })); } Entry::Occupied(mut value) => { - debug!( - self.log, - "received request to register existing metric \ - producer, updating collection information"; - "producer_id" => id.to_string(), - "interval" => ?info.interval, - "address" => info.address, - ); - value.get_mut().0 = info.clone(); - value - .get() - .1 - .inbox - .send(CollectionMessage::Update(info)) - .await - .unwrap(); + // Only update the endpoint information if it's actually + // different, to avoid indefinitely delaying the collection + // timer from expiring. + if value.get().0 == info { + trace!( + self.log, + "ignoring request to update existing metric \ + producer, since the endpoint information is \ + the same as the existing"; + "producer_id" => %id, + ); + } else { + debug!( + self.log, + "received request to register existing metric \ + producer, updating collection information"; + "producer_id" => id.to_string(), + "interval" => ?info.interval, + "address" => info.address, + ); + value.get_mut().0 = info; + value + .get() + .1 + .inbox + .send(CollectionMessage::Update(info)) + .await + .unwrap(); + } } } } @@ -675,7 +698,7 @@ impl OximeterAgent { .await .range((start, Bound::Unbounded)) .take(limit) - .map(|(_id, (info, _t))| info.clone()) + .map(|(_id, (info, _t))| *info) .collect() } @@ -766,7 +789,14 @@ async fn refresh_producer_list( agent: OximeterAgent, nexus_pool: Pool, ) { + // Setup our refresh timer. + // + // If we miss a tick, we'll skip until the next multiple of the interval + // from the start time. This is a good compromise between taking a bunch of + // quick updates (burst) and shifting our interval (delay). let mut interval = tokio::time::interval(agent.refresh_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { interval.tick().await; info!(agent.log, "refreshing list of producers from Nexus"); @@ -783,11 +813,28 @@ async fn refresh_producer_list( loop { match stream.try_next().await { Err(e) => { + // TODO-robustness: Some errors here may not be "fatal", in + // the sense that we can continue to process the list we + // receive from Nexus. It's not clear which ones though, + // since most of these are either impossible (pre-hook + // errors) or indicate we've made a serious programming + // error or updated to incompatible versions of Nexus / + // Oximeter. One that we might be able to handle is a + // communication error, say failing to fetch the last page + // when we've already fetched the first few. But for now, + // we'll simply keep the list we have and try again on the + // next refresh. + // + // For now, let's just avoid doing anything at all here, on + // the theory that if we hit this, we should just continue + // collecting from the last known-good set of producers. error!( agent.log, - "error fetching next assigned producer"; + "error fetching next assigned producer, \ + abandoning this refresh attempt"; "err" => ?e, ); + return; } Ok(Some(p)) => { let endpoint = match ProducerEndpoint::try_from(p) { diff --git a/oximeter/collector/src/standalone.rs b/oximeter/collector/src/standalone.rs index faa27ce04b..d70bfef9e0 100644 --- a/oximeter/collector/src/standalone.rs +++ b/oximeter/collector/src/standalone.rs @@ -112,7 +112,7 @@ impl StandaloneNexus { )); }; let assignment = - ProducerAssignment { producer: info.clone(), collector_id }; + ProducerAssignment { producer: *info, collector_id }; assignment } Some(existing_assignment) => { @@ -126,7 +126,7 @@ impl StandaloneNexus { // changed its IP address. The collector will learn of this when // it next fetches its list. let collector_id = existing_assignment.collector_id; - ProducerAssignment { producer: info.clone(), collector_id } + ProducerAssignment { producer: *info, collector_id } } }; inner.producers.insert(info.id, assignment); diff --git a/oximeter/producer/src/lib.rs b/oximeter/producer/src/lib.rs index 522ce983b4..42d2b03808 100644 --- a/oximeter/producer/src/lib.rs +++ b/oximeter/producer/src/lib.rs @@ -130,7 +130,7 @@ impl Server { ) -> Result { Self::new_impl( registry, - config.server_info.clone(), + config.server_info, config.registration_address.as_ref(), config.request_body_max_bytes, &config.log,