Skip to content

Commit

Permalink
(GH Issue #27521 Workaround) Module api refactoring gh pr issue (#313)
Browse files Browse the repository at this point in the history
* refactor: replace pulsar module with a trait

* refactor: module metadata

* feat: move module loops inside module manager

* feat(modules_api): add support for an extra function working the state

* fix: start module command

* Refactored PulsarModule trait

* Comment PulsarModule traits

* Rename ExtraState to Extension

* apply review suggestions

* Apply review

* fix: daemon refactoring

* chore: fix clippy

* chore: fmt

* fix(module_manager): make tokio select biased
  • Loading branch information
banditopazzo authored Sep 18, 2024
1 parent fc90778 commit cece664
Show file tree
Hide file tree
Showing 25 changed files with 1,283 additions and 1,102 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/engine-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl From<PulsarDaemonError> for EngineApiError {
fn from(error: PulsarDaemonError) -> Self {
match &error {
PulsarDaemonError::ModuleNotFound(_) => Self::BadRequest(error.to_string()),
PulsarDaemonError::StartError(_) => Self::BadRequest(error.to_string()),
PulsarDaemonError::StopError(_) => Self::BadRequest(error.to_string()),
PulsarDaemonError::ConfigurationUpdateError(_) => {
log::error!("Unexpected Error {}", error.to_string());
Expand Down
61 changes: 26 additions & 35 deletions crates/modules/desktop-notifier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,44 @@
use std::{
os::unix::process::CommandExt,
process::{Command, Stdio},
sync::Arc,
};

use anyhow::{Context, Result};
use anyhow::Context;
use pulsar_core::{
event::Threat,
pdk::{
CleanExit, ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, PulsarModule,
ShutdownSignal, Version,
},
pdk::{ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, SimplePulsarModule},
};

const MODULE_NAME: &str = "desktop-notifier";
pub struct DesktopNotifierModule;

pub fn module() -> PulsarModule {
PulsarModule::new(
MODULE_NAME,
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
false,
desktop_nitifier_task,
)
}
impl SimplePulsarModule for DesktopNotifierModule {
type Config = Config;
type State = ();

async fn desktop_nitifier_task(
ctx: ModuleContext,
mut shutdown: ShutdownSignal,
) -> Result<CleanExit, ModuleError> {
let mut receiver = ctx.get_receiver();
let mut rx_config = ctx.get_config();
let mut config = rx_config.read()?;
const MODULE_NAME: &'static str = "desktop-notifier";
const DEFAULT_ENABLED: bool = false;

loop {
tokio::select! {
r = shutdown.recv() => return r,
_ = rx_config.changed() => {
config = rx_config.read()?;
continue;
}
msg = receiver.recv() => {
handle_event(&config, msg?).await;
}
}
async fn init_state(
&self,
_config: &Self::Config,
_ctx: &ModuleContext,
) -> Result<Self::State, ModuleError> {
Ok(())
}

async fn on_event(
event: &Event,
config: &Self::Config,
_state: &mut Self::State,
_ctx: &ModuleContext,
) -> Result<(), ModuleError> {
handle_event(config, event).await;
Ok(())
}
}

/// Check if the given event is a threat which should be notified to the user
async fn handle_event(config: &Config, event: Arc<Event>) {
async fn handle_event(config: &Config, event: &Event) {
if let Some(Threat {
source,
description,
Expand Down Expand Up @@ -99,7 +90,7 @@ async fn notify_send(config: &Config, args: Vec<String>) {
}

#[derive(Clone)]
struct Config {
pub struct Config {
user_id: u32,
display: String,
notify_send_executable: String,
Expand Down
76 changes: 36 additions & 40 deletions crates/modules/file-system-monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,51 +83,48 @@ pub mod pulsar {
use pulsar_core::{
event::FileFlags,
pdk::{
CleanExit, ConfigError, Event, IntoPayload, ModuleConfig, ModuleContext, ModuleError,
ModuleSender, Payload, PulsarModule, ShutdownSignal, Version,
ConfigError, Event, IntoPayload, ModuleConfig, ModuleContext, ModuleError, Payload,
SimplePulsarModule,
},
};
use tokio::{fs::File, io::AsyncReadExt};

pub fn module() -> PulsarModule {
PulsarModule::new(
MODULE_NAME,
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
true,
fs_monitor_task,
)
}
pub struct FileSystemMonitorModule;

async fn fs_monitor_task(
ctx: ModuleContext,
mut shutdown: ShutdownSignal,
) -> Result<CleanExit, ModuleError> {
let _program = program(ctx.get_bpf_context(), ctx.get_sender()).await?;
let mut receiver = ctx.get_receiver();
let mut rx_config = ctx.get_config();
let mut config: Config = rx_config.read()?;
let sender = ctx.get_sender();
loop {
// enable receiver only if the elf checker is enabled
let receiver_recv = async {
if config.elf_check_enabled {
receiver.recv().await
} else {
std::future::pending().await
}
};
tokio::select! {
Ok(msg) = receiver_recv => {
check_elf(&sender, &config, msg.as_ref()).await;
}
_ = rx_config.changed() => {
config = rx_config.read()?;
}
r = shutdown.recv() => return r,
impl SimplePulsarModule for FileSystemMonitorModule {
type Config = Config;
type State = FileSystemMonitorState;

const MODULE_NAME: &'static str = MODULE_NAME;
const DEFAULT_ENABLED: bool = true;

async fn init_state(
&self,
_config: &Self::Config,
ctx: &ModuleContext,
) -> Result<Self::State, ModuleError> {
Ok(Self::State {
_ebpf_program: program(ctx.get_bpf_context(), ctx.clone()).await?,
})
}

async fn on_event(
event: &Event,
config: &Self::Config,
_state: &mut Self::State,
ctx: &ModuleContext,
) -> Result<(), ModuleError> {
if config.elf_check_enabled {
check_elf(ctx, &config.elf_check_whitelist, event).await;
}
Ok(())
}
}

pub struct FileSystemMonitorState {
_ebpf_program: Program,
}

impl IntoPayload for FsEvent {
type Error = IndexError;

Expand Down Expand Up @@ -197,15 +194,14 @@ pub mod pulsar {
}

/// Check if an opened file is an ELF
async fn check_elf(sender: &ModuleSender, config: &Config, event: &Event) {
async fn check_elf(ctx: &ModuleContext, elf_check_whitelist: &[String], event: &Event) {
if let Payload::FileOpened { filename, flags } = event.payload() {
let now = Instant::now();
let should_check = !config
.elf_check_whitelist
let should_check = !elf_check_whitelist
.iter()
.any(|path| filename.starts_with(path));
if should_check && is_elf(filename).await {
sender.send_derived(
ctx.send_derived(
event,
Payload::ElfOpened {
filename: filename.to_string(),
Expand Down
126 changes: 68 additions & 58 deletions crates/modules/logger/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use pulsar_core::pdk::{
CleanExit, ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, PulsarModule,
ShutdownSignal, Version,
};
use std::{
borrow::Cow,
cell::OnceCell,
Expand All @@ -14,62 +10,76 @@ use std::{
},
str::FromStr,
};

use pulsar_core::pdk::{
ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, SimplePulsarModule,
};
use thiserror::Error;

const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
const MODULE_NAME: &str = "logger";
const PRIORITY: u8 = 25; // facility * 8 + severity. facility: daemon (3); severity: alert (1)

pub fn module() -> PulsarModule {
PulsarModule::new(
MODULE_NAME,
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
true,
logger_task,
)
}
pub struct LoggerModule;

impl SimplePulsarModule for LoggerModule {
type Config = Config;
type State = LoggerState;

const MODULE_NAME: &'static str = "threat-logger";
const DEFAULT_ENABLED: bool = true;

async fn init_state(
&self,
config: &Self::Config,
ctx: &ModuleContext,
) -> Result<Self::State, ModuleError> {
let logger = match Logger::from_config(config) {
Ok(logr) => logr,
Err(logr) => {
ctx.raise_warning("Failed to connect to syslog".into())
.await;
logr
}
};

async fn logger_task(
ctx: ModuleContext,
mut shutdown: ShutdownSignal,
) -> Result<CleanExit, ModuleError> {
let mut receiver = ctx.get_receiver();
let mut rx_config = ctx.get_config();
let sender = ctx.get_sender();

let mut logger = match Logger::from_config(rx_config.read()?) {
Ok(logr) => logr,
Err(logr) => {
sender
.raise_warning("Failed to connect to syslog".into())
.await;
logr
}
};

loop {
tokio::select! {
r = shutdown.recv() => return r,
_ = rx_config.changed() => {
logger = match Logger::from_config(rx_config.read()?) {
Ok(logr) => logr,
Err(logr) => {
sender.raise_warning("Failed to connect to syslog".into()).await;
logr
}
}
Ok(LoggerState { logger })
}

async fn on_config_change(
new_config: &Self::Config,
state: &mut Self::State,
ctx: &ModuleContext,
) -> Result<(), ModuleError> {
state.logger = match Logger::from_config(new_config) {
Ok(logr) => logr,
Err(logr) => {
ctx.raise_warning("Failed to connect to syslog".into())
.await;
logr
}
msg = receiver.recv() => {
let msg = msg?;
if let Err(e) = logger.process(&msg) {
sender.raise_warning(format!("Writing to logs failed: {e}")).await;
logger = Logger { syslog: None, ..logger };
}
},
};
Ok(())
}

async fn on_event(
event: &Event,
_config: &Self::Config,
state: &mut Self::State,
ctx: &ModuleContext,
) -> Result<(), ModuleError> {
if let Err(e) = state.logger.process(event) {
ctx.raise_warning(format!("Writing to logs failed: {e}, syslog disabled"))
.await;
state.logger.syslog = None;
}
Ok(())
}
}

pub struct LoggerState {
logger: Logger,
}

#[derive(Clone, Debug)]
enum OutputFormat {
Plaintext,
Expand All @@ -92,7 +102,7 @@ impl FromStr for OutputFormat {
}

#[derive(Clone)]
struct Config {
pub struct Config {
console: bool,
// file: bool, //TODO:
syslog: bool,
Expand Down Expand Up @@ -128,12 +138,12 @@ enum LoggerError {
}

impl Logger {
fn from_config(rx_config: Config) -> Result<Self, Self> {
fn from_config(config: &Config) -> Result<Self, Self> {
let Config {
console,
syslog,
output_format,
} = rx_config;
} = config;

let connected_to_journal = io::stderr()
.as_fd()
Expand All @@ -146,7 +156,7 @@ impl Logger {
})
.unwrap_or(false);

let opt_sock = (syslog && !connected_to_journal)
let opt_sock = (*syslog && !connected_to_journal)
.then(|| {
let sock = UnixDatagram::unbound().ok()?;
UNIX_SOCK_PATHS
Expand All @@ -156,17 +166,17 @@ impl Logger {
})
.flatten();

if syslog && opt_sock.is_none() {
if *syslog && opt_sock.is_none() {
Err(Self {
console,
console: *console,
syslog: opt_sock,
output_format,
output_format: output_format.clone(),
})
} else {
Ok(Self {
console,
console: *console,
syslog: opt_sock,
output_format,
output_format: output_format.clone(),
})
}
}
Expand Down
Loading

0 comments on commit cece664

Please sign in to comment.