Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle MQTT errors in log #73

Merged
merged 2 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ rumqttc.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
tokio.workspace = true
toml.workspace = true
tracing.workspace = true
url.workspace = true

[dev-dependencies]
ctor.workspace = true
satori-testing-utils.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
2 changes: 1 addition & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ pub const SEGMENT_FILENAME_FORMAT: &str = "%Y-%m-%dT%H_%M_%S%z.ts";
mod version;

mod utils;
pub use self::utils::load_config_file;
pub use self::utils::{load_config_file, ThrottledErrorLogger};
15 changes: 12 additions & 3 deletions common/src/mqtt.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::utils::ThrottledErrorLogger;
use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, Outgoing, Publish, QoS};
use serde::Deserialize;
use std::time::Duration;
Expand All @@ -19,6 +20,7 @@ pub struct MqttConfig {
pub struct MqttClient {
client: AsyncClient,
event_loop: EventLoop,
poll_error_logger: ThrottledErrorLogger<String>,

topic: String,
}
Expand All @@ -34,6 +36,7 @@ impl From<MqttConfig> for MqttClient {
Self {
client,
event_loop,
poll_error_logger: ThrottledErrorLogger::new(Duration::from_secs(5)),
topic: config.topic,
}
}
Expand Down Expand Up @@ -64,7 +67,9 @@ impl MqttClient {
}
Ok(_) => None,
Err(e) => {
warn!("rumqttc error: {:?}", e);
if let Some(e) = self.poll_error_logger.log(format!("{:?}", e)) {
warn!("rumqttc error: {}", e);
}
None
}
}
Expand All @@ -81,7 +86,9 @@ impl MqttClient {
}
Ok(_) => {}
Err(e) => {
warn!("rumqttc error: {:?}", e);
if let Some(e) = self.poll_error_logger.log(format!("{:?}", e)) {
warn!("rumqttc error: {}", e);
}
}
}
}
Expand Down Expand Up @@ -114,7 +121,9 @@ impl MqttClient {
}
Ok(_) => {}
Err(e) => {
warn!("rumqttc error: {:?}", e);
if let Some(e) = self.poll_error_logger.log(format!("{:?}", e)) {
warn!("rumqttc error: {}", e);
}
}
}
}
Expand Down
File renamed without changes.
4 changes: 4 additions & 0 deletions common/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod config_file;
mod throttled_error;

pub use self::{config_file::load_config_file, throttled_error::ThrottledErrorLogger};
115 changes: 115 additions & 0 deletions common/src/utils/throttled_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use tokio::time::{Duration, Instant};
use tracing::warn;

struct ThrottledError<E> {
error: E,
first_seen: Instant,
times_seen: usize,
}

pub struct ThrottledErrorLogger<E> {
timeout: Duration,
last_error: Option<ThrottledError<E>>,
}

impl<E: PartialEq + std::fmt::Display> ThrottledErrorLogger<E> {
pub fn new(timeout: Duration) -> Self {
Self {
timeout,
last_error: None,
}
}

pub fn log(&mut self, error: E) -> Option<&E> {
let now = Instant::now();
let last_error = self.last_error.as_mut();

match last_error {
Some(last_error)
if last_error.error == error && last_error.first_seen.elapsed() > self.timeout =>
{
warn!(
"Last error ({}) was (seen {} times)",
last_error.error, last_error.times_seen
);
last_error.first_seen = now;
last_error.times_seen = 1;
Some(&self.last_error.as_ref().unwrap().error)
}
Some(last_error)
if last_error.error == error && last_error.first_seen.elapsed() <= self.timeout =>
{
last_error.times_seen += 1;
None
}
Some(last_error) => {
warn!(
"Last error ({}) was (seen {} times)",
last_error.error, last_error.times_seen
);
self.last_error = Some(ThrottledError {
error,
first_seen: now,
times_seen: 1,
});
Some(&self.last_error.as_ref().unwrap().error)
}
None => {
self.last_error = Some(ThrottledError {
error,
first_seen: now,
times_seen: 1,
});
Some(&self.last_error.as_ref().unwrap().error)
}
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn new() {
let te = ThrottledErrorLogger::<String>::new(Duration::from_millis(100));
assert!(te.last_error.is_none());
}

#[tokio::test]
async fn log_once() {
let mut te = ThrottledErrorLogger::<String>::new(Duration::from_millis(100));
assert_eq!(te.log("test".to_string()).unwrap(), "test");
}

#[tokio::test]
async fn log_duplicates_fast() {
let mut te = ThrottledErrorLogger::<String>::new(Duration::from_millis(100));
assert_eq!(te.log("test".to_string()).unwrap(), "test");
assert!(te.log("test".to_string()).is_none());
assert!(te.log("test".to_string()).is_none());
tokio::time::sleep(Duration::from_millis(95)).await;
assert!(te.log("test".to_string()).is_none());
}

#[tokio::test]
async fn log_duplicates_slow() {
let mut te = ThrottledErrorLogger::<String>::new(Duration::from_millis(100));
assert_eq!(te.log("test".to_string()).unwrap(), "test");
assert!(te.log("test".to_string()).is_none());
assert!(te.log("test".to_string()).is_none());
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(te.log("test".to_string()).unwrap(), "test");
assert!(te.log("test".to_string()).is_none());
}

#[tokio::test]
async fn log_unique_fast() {
let mut te = ThrottledErrorLogger::<String>::new(Duration::from_millis(100));
assert_eq!(te.log("test a".to_string()).unwrap(), "test a");
assert_eq!(te.log("test b".to_string()).unwrap(), "test b");
assert_eq!(te.log("test a".to_string()).unwrap(), "test a");
tokio::time::sleep(Duration::from_millis(95)).await;
assert_eq!(te.log("test b".to_string()).unwrap(), "test b");
}
}