Skip to content

Commit

Permalink
refactor: cleaner event emitter and upgrade to sysinfo 0.30
Browse files Browse the repository at this point in the history
  • Loading branch information
leon3s committed Dec 30, 2023
1 parent 71f0bf8 commit 5cf252e
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 202 deletions.
85 changes: 53 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions bin/metrsd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "metrsd"
version = "0.4.1"
version = "0.5.0"
edition = "2021"
authors = ["nanocl contributors <[email protected]>"]
description = "The Metrs daemon"
Expand All @@ -17,13 +17,13 @@ path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { version = "4.4.11", features = ["derive"] }
env_logger = "0.10.1"
futures = "0.3.29"
log = "0.4.20"
ntex = { version = "0.7.16", features = ["tokio"] }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
sysinfo = "0.29.11"
tokio = "1.35.0"
metrs_stubs = { version = "0.4.1", features = ["serde", "sysinfo"] }
clap = { version = "4.4", features = ["derive"] }
env_logger = "0.10"
futures = "0.3"
log = "0.4"
ntex = { version = "0.7", features = ["tokio"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sysinfo = "0.30"
tokio = "1.35"
metrs_stubs = { version = "0.5", features = ["serde", "sysinfo", "bytes"] }
62 changes: 22 additions & 40 deletions bin/metrsd/src/event_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,23 @@
use std::pin::Pin;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::{
pin::Pin,
time::Duration,
sync::{Arc, Mutex},
task::{Context, Poll},
};

use ntex::rt;
use ntex::web;
use ntex::util::Bytes;
use ntex::time::interval;
use ntex::http::StatusCode;
use ntex::web::error::{Error, BlockingError};
use ntex::{
rt, web,
util::Bytes,
time::interval,
http::StatusCode,
web::error::{Error, BlockingError},
};
use futures::Stream;
use tokio::sync::mpsc::{Sender, Receiver, channel};
use metrs_stubs::*;

use crate::error::{HttpError, MetrsError};

#[derive(Clone, Debug, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
#[serde(tag = "Type", content = "Data")]
pub enum Event {
Memory(MemoryInfo),
Cpu(Vec<CpuInfo>),
Disk(Vec<DiskInfo>),
Network(Vec<NetworkInfo>),
}
use metrs_stubs::*;

impl TryFrom<Event> for Bytes {
type Error = MetrsError;

fn try_from(value: Event) -> Result<Self, Self::Error> {
serde_json::to_string(&value)
.map_err(|err| {
MetrsError::Error(format!("Unable to serialize memory info: {err}"))
})
.map(|res| Bytes::from(res + "\n"))
}
}
use crate::error::HttpError;

// Wrap Receiver in our own type, with correct error type
pub struct Client(Receiver<Bytes>);
Expand Down Expand Up @@ -89,7 +71,7 @@ impl EventEmitter {

/// Check if clients are still connected
fn check_connection(&mut self) -> Result<(), HttpError> {
log::debug!("Checking alive connection...");
log::trace!("Checking alive connection...");
let mut alive_clients = Vec::new();
let clients = self
.inner
Expand All @@ -106,7 +88,7 @@ impl EventEmitter {
alive_clients.push(client.clone());
}
}
log::debug!("Alive clients: {}", alive_clients.len());
log::trace!("Alive clients: {}", alive_clients.len());
self
.inner
.lock()
Expand Down Expand Up @@ -145,9 +127,13 @@ impl EventEmitter {
Ok(Client(rx))
}

pub async fn emit(&self, ev: Event) -> Result<(), HttpError> {
pub async fn emit(&self, ev: MetrsdEvent) -> Result<(), HttpError> {
let this = self.clone();
rt::spawn(async move {
let msg = Bytes::try_from(ev).map_err(|err| HttpError {
status: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Unable to serialize event: {err}"),

Check warning on line 135 in bin/metrsd/src/event_emitter.rs

View check run for this annotation

Codecov / codecov/patch

bin/metrsd/src/event_emitter.rs#L134-L135

Added lines #L134 - L135 were not covered by tests
})?;
let clients = this
.inner
.lock()
Expand All @@ -158,10 +144,6 @@ impl EventEmitter {
.clients
.clone();
for client in clients {
let msg = Bytes::try_from(ev.clone()).map_err(|err| HttpError {
status: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Unable to serialize event: {err}"),
})?;
let _ = client.send(msg.clone()).await;
}
Ok::<(), HttpError>(())
Expand Down
Loading

0 comments on commit 5cf252e

Please sign in to comment.