Skip to content

Commit

Permalink
sessions: store session in database
Browse files Browse the repository at this point in the history
This allows a session to be restored after restart so users don't have
to login.

Somehow ended up pulling in SQLx for this as it provides a nicer
database experience and also includes migrations and pooling, but that
will be a work in progress.
  • Loading branch information
jasonish committed May 11, 2024
1 parent b0b7762 commit 344f3cc
Show file tree
Hide file tree
Showing 18 changed files with 1,075 additions and 297 deletions.
626 changes: 604 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ bcrypt = "0.15.0"
bytes = "1.5.0"
clap = { version = "4.5.4", features = ["env", "derive", "color"] }

# Trying SQLx, it wrap up all 3 of the other SQLite crates into one.
sqlx = { git = "https://github.com/launchbadge/sqlx", default-features = true, features = ["runtime-tokio", "sqlite", "postgres", "tls-rustls"] }

# These 3 somewhat depend on each other.
deadpool-sqlite = { version = "= 0.8.0", features = ["rt_tokio_1"] }
rusqlite = { version = "= 0.31", default-features = false, features = ["bundled", "serde_json"] }
Expand Down Expand Up @@ -72,3 +75,4 @@ directories = "5.0.1"
gethostname = "0.4.3"
tempfile = "3.9.0"
inquire = "0.6.2"
chrono = { version = "0.4.38", default-features = false, features = ["std", "now"] }
11 changes: 11 additions & 0 deletions resources/configdb/migrations/V2__Sessions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
DROP TABLE IF EXISTS sessions;

CREATE TABLE sessions (
rowid INTEGER PRIMARY KEY,
token STRING UNIQUE NOT NULL,
uuid STRING NOT NULL,
expires_at INTEGER NOT NULL,
FOREIGN KEY (uuid) REFERENCES users(uuid)
);

CREATE INDEX sessions_token_index ON sessions (token);
8 changes: 7 additions & 1 deletion src/bin/evebox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ async fn evebox_main() -> Result<(), Box<dyn std::error::Error>> {
.action(ArgAction::SetTrue)
.help("Don't open browser"),
)
.arg(
Arg::new("fts")
.long("fts")
.action(ArgAction::SetTrue)
.help("Enable full text search"),
)
.arg(
Arg::new("no-wait")
.long("no-wait")
Expand Down Expand Up @@ -324,7 +330,7 @@ async fn evebox_main() -> Result<(), Box<dyn std::error::Error>> {
}
Some(("oneshot", args)) => evebox::commands::oneshot::main(args).await,
Some(("agent", args)) => evebox::commands::agent::main(args).await,
Some(("config", args)) => evebox::commands::config::main(args),
Some(("config", args)) => evebox::commands::config::main(args).await,
Some(("print", args)) => evebox::commands::print::main(args),
Some(("elastic", args)) => evebox::commands::elastic::main::main(args).await,
Some(("sqlite", args)) => evebox::commands::sqlite::main(args).await,
Expand Down
4 changes: 2 additions & 2 deletions src/commands/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ pub fn config_subcommand() -> clap::Command {
Args::command()
}

pub fn main(args: &clap::ArgMatches) -> anyhow::Result<()> {
pub async fn main(args: &clap::ArgMatches) -> anyhow::Result<()> {
match args.subcommand() {
Some(("users", args)) => users::main(args),
Some(("users", args)) => users::main(args).await,
_ => Err(anyhow!("no subcommand provided")),
}
}
40 changes: 20 additions & 20 deletions src/commands/config/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ struct AddArgs {
data_directory: Option<String>,
}

pub(crate) fn main(args: &clap::ArgMatches) -> Result<()> {
pub(crate) async fn main(args: &clap::ArgMatches) -> Result<()> {
let args = UsersCommands::from_arg_matches(args)?;
match args {
UsersCommands::Add(args) => add(args),
UsersCommands::List { data_directory } => list(data_directory),
UsersCommands::Add(args) => add(args).await,
UsersCommands::List { data_directory } => list(data_directory).await,
UsersCommands::Rm {
username,
data_directory,
} => remove(username, data_directory),
} => remove(username, data_directory).await,
UsersCommands::Passwd {
username,
data_directory,
} => password(username, data_directory),
} => password(username, data_directory).await,
}
}

fn open_config_repo<P: AsRef<Path>>(data_directory: Option<P>) -> Result<ConfigRepo> {
async fn open_config_repo<P: AsRef<Path>>(data_directory: Option<P>) -> Result<ConfigRepo> {
let data_directory = data_directory
.map(|p| PathBuf::from(p.as_ref()))
.or_else(crate::path::data_directory);
Expand All @@ -81,21 +81,21 @@ fn open_config_repo<P: AsRef<Path>>(data_directory: Option<P>) -> Result<ConfigR
};
info!("Using data directory {}", data_directory.display());
let filename = data_directory.join("config.sqlite");
let repo = ConfigRepo::new(Some(&filename))?;
let repo = ConfigRepo::new(Some(&filename)).await?;
Ok(repo)
}

fn list(dir: Option<String>) -> Result<()> {
let repo = open_config_repo(dir.as_deref())?;
let users = repo.get_users()?;
async fn list(dir: Option<String>) -> Result<()> {
let repo = open_config_repo(dir.as_deref()).await?;
let users = repo.get_users().await?;
for user in users {
println!("{}", serde_json::to_string(&user).unwrap());
}
Ok(())
}

fn add(args: AddArgs) -> Result<()> {
let repo = open_config_repo(args.data_directory.as_deref())?;
async fn add(args: AddArgs) -> Result<()> {
let repo = open_config_repo(args.data_directory.as_deref()).await?;

let username = if let Some(username) = args.username {
username.to_string()
Expand All @@ -117,30 +117,30 @@ fn add(args: AddArgs) -> Result<()> {
.prompt()?
};

repo.add_user(&username, &password)?;
repo.add_user(&username, &password).await?;
println!("User added: username=\"{username}\"");

Ok(())
}

fn remove(username: String, dir: Option<String>) -> Result<()> {
let repo = open_config_repo(dir.as_deref())?;
if repo.remove_user(&username)? == 0 {
async fn remove(username: String, dir: Option<String>) -> Result<()> {
let repo = open_config_repo(dir.as_deref()).await?;
if repo.remove_user(&username).await? == 0 {
return Err(anyhow!("user does not exist"));
}
println!("User removed: username=\"{username}\"");
Ok(())
}

fn password(username: String, data_directory: Option<String>) -> Result<()> {
let repo = open_config_repo(data_directory)?;
let user = repo.get_user_by_name(&username)?;
async fn password(username: String, data_directory: Option<String>) -> Result<()> {
let repo = open_config_repo(data_directory).await?;
let user = repo.get_user_by_name(&username).await?;
let password = inquire::Password::new("Password:")
.with_display_toggle_enabled()
.with_display_mode(inquire::PasswordDisplayMode::Masked)
.with_validator(inquire::required!())
.prompt()?;
if repo.update_password_by_id(&user.uuid, &password)? {
if repo.update_password_by_id(&user.uuid, &password).await? {
println!("Password has been updated.");
Ok(())
} else {
Expand Down
32 changes: 23 additions & 9 deletions src/commands/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub async fn main(args: &clap::ArgMatches) -> anyhow::Result<()> {
let db_filename: String = config_loader.get("database-filename")?.unwrap();
let host: String = config_loader.get("http.host")?.unwrap();
let input = args.get_one::<String>("INPUT").unwrap().to_string();
let fts: bool = config_loader.get_bool("fts")?;

info!("Using database filename {}", &db_filename);

Expand All @@ -29,13 +30,13 @@ pub async fn main(args: &clap::ArgMatches) -> anyhow::Result<()> {
)));
let mut db =
sqlite::ConnectionBuilder::filename(Some(&PathBuf::from(&db_filename))).open(true)?;
let fts = false;
sqlite::init_event_db(&mut db)?;
let db = Arc::new(Mutex::new(db));
let pool = sqlite::pool::open_pool(&db_filename).await?;
let pool = sqlite::connection::open_deadpool(Some(&db_filename))?;
let xpool = sqlite::connection::open_sqlx_pool(Some(&db_filename), false).await?;
let db = crate::sqlite::connection::open_sqlx_connection(Some(&db_filename), true).await?;
let db = Arc::new(tokio::sync::Mutex::new(db));

let import_task = {
let db = db.clone();
tokio::spawn(async move {
if let Err(err) = run_import(db, limit, &input, fts).await {
error!("Import failure: {}", err);
Expand All @@ -59,8 +60,19 @@ pub async fn main(args: &clap::ArgMatches) -> anyhow::Result<()> {
let mut port = 5636;
loop {
let connection = Arc::new(Mutex::new(db_connection_builder.open(false).unwrap()));
let sqlite_datastore =
sqlite::eventrepo::SqliteEventRepo::new(connection, pool.clone(), fts);
let xconnection = Arc::new(tokio::sync::Mutex::new(
db_connection_builder
.open_sqlx_connection(false)
.await
.unwrap(),
));
let sqlite_datastore = sqlite::eventrepo::SqliteEventRepo::new(
xconnection,
connection,
xpool.clone(),
pool.clone(),
fts,
);
let ds = crate::eventrepo::EventRepo::SQLite(sqlite_datastore);
let config = crate::server::ServerConfig {
port,
Expand Down Expand Up @@ -139,7 +151,7 @@ pub async fn main(args: &clap::ArgMatches) -> anyhow::Result<()> {
}

async fn run_import(
db: Arc<Mutex<rusqlite::Connection>>,
sqlx: Arc<tokio::sync::Mutex<sqlx::SqliteConnection>>,
limit: u64,
input: &str,
fts: bool,
Expand All @@ -148,11 +160,12 @@ async fn run_import(
Ok(geoipdb) => Some(geoipdb),
Err(_) => None,
};
let mut indexer = sqlite::importer::SqliteEventSink::new(db, fts);
let mut indexer = sqlite::importer::SqliteEventSink::new(sqlx, fts);
let mut reader = eve::reader::EveReader::new(input.into());
info!("Reading {} ({} bytes)", input, reader.file_size());
let mut last_percent = 0;
let mut count = 0;
let start = std::time::Instant::now();
loop {
match reader.next_record() {
Ok(None) | Err(_) => {
Expand Down Expand Up @@ -181,6 +194,7 @@ async fn run_import(
}
}
indexer.commit().await?;
info!("Read {} events", count);
let elapsed = start.elapsed();
info!("Read {} events in {}s", count, elapsed.as_secs_f64());
Ok(())
}
16 changes: 12 additions & 4 deletions src/commands/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
use crate::{
elastic::AlertQueryOptions,
sqlite::{
eventrepo::SqliteEventRepo, info::Info, init_event_db, pool::open_pool, ConnectionBuilder,
SqliteExt,
connection::open_deadpool, eventrepo::SqliteEventRepo, info::Info, init_event_db,
ConnectionBuilder, SqliteExt,
},
};
use anyhow::Result;
Expand Down Expand Up @@ -275,9 +275,17 @@ fn query(filename: &str, sql: &str) -> Result<()> {
async fn optimize(args: &OptimizeArgs) -> Result<()> {
let conn = ConnectionBuilder::filename(Some(&args.filename)).open(false)?;
let conn = Arc::new(Mutex::new(conn));
let pool = open_pool(&args.filename).await?;

let xconn = ConnectionBuilder::filename(Some(&args.filename))
.open_sqlx_connection(false)
.await?;
let xconn = Arc::new(tokio::sync::Mutex::new(xconn));

let pool = open_deadpool(Some(&args.filename))?;
pool.resize(1);
let repo = SqliteEventRepo::new(conn, pool.clone(), false);
let xpool = crate::sqlite::connection::open_sqlx_pool(Some(&args.filename), false).await?;
// TODO: Set size to 1.
let repo = SqliteEventRepo::new(xconn, conn, xpool.clone(), pool.clone(), false);

info!("Running inbox style query");
let gte = time::OffsetDateTime::now_utc() - time::Duration::days(1);
Expand Down
3 changes: 3 additions & 0 deletions src/eventrepo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub enum DatastoreError {
#[error("sql: {0}")]
FromSql(#[from] rusqlite::types::FromSqlError),

#[error("sqlx: {0}")]
SqlxError(#[from] sqlx::Error),

// Fallback...
#[error("error: {0}")]
AnyhowError(#[from] anyhow::Error),
Expand Down
17 changes: 16 additions & 1 deletion src/server/api/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ pub(crate) async fn post(
session.username = Some(user.username);
let session = Arc::new(session);
context.session_store.put(session.clone()).unwrap();

// Create expiry data one week in the future.
let expiry = chrono::Utc::now() + chrono::Duration::weeks(1);
if let Err(err) = context
.config_repo
.save_session(
session.session_id.as_ref().unwrap(),
&user.uuid,
expiry.timestamp(),
)
.await
{
error!("Failed to save session: {:?}", err);
}

(
StatusCode::OK,
Json(serde_json::json!({
Expand All @@ -87,7 +102,7 @@ pub(crate) async fn post(
}
}

pub(crate) async fn logout_new(
pub(crate) async fn logout(
context: Extension<Arc<ServerContext>>,
SessionExtractor(session): SessionExtractor,
) -> impl IntoResponse {
Expand Down
Loading

0 comments on commit 344f3cc

Please sign in to comment.