Skip to content

Commit

Permalink
[server] Add cluster login #1378 (#1379)
Browse files Browse the repository at this point in the history
* Update cluster_test.rs

* Update cluster_test.rs

* add cluster_login to api

* Update http_client.rs

* Update logger.rs

* Update user.rs

* Update user.rs

* add cluster login

* Update redirect.rs

* Update cluster_test.rs
  • Loading branch information
michaelvlach authored Dec 12, 2024
1 parent bba1e83 commit dadcf02
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 62 deletions.
16 changes: 16 additions & 0 deletions agdb_api/rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,22 @@ impl<T: HttpClient> AgdbApi<T> {
self.client.get(&self.url("/cluster/status"), &None).await
}

pub async fn cluster_login(&mut self, user: &str, password: &str) -> AgdbApiResult<u16> {
let (status, token) = self
.client
.post::<UserLogin, String>(
&self.url("/cluster/login"),
&Some(UserLogin {
username: user.to_string(),
password: password.to_string(),
}),
&None,
)
.await?;
self.token = Some(token);
Ok(status)
}

pub async fn user_login(&mut self, user: &str, password: &str) -> AgdbApiResult<u16> {
let (status, token) = self
.client
Expand Down
3 changes: 1 addition & 2 deletions agdb_api/rust/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ impl HttpClient for ReqwestClient {
json: &Option<T>,
token: &Option<String>,
) -> AgdbApiResult<(u16, R)> {
let client = reqwest::Client::new();
let mut request = client.post(uri);
let mut request = self.client.post(uri);
if let Some(token) = token {
request = request.bearer_auth(token);
}
Expand Down
12 changes: 11 additions & 1 deletion agdb_server/src/action.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub(crate) mod cluster_login;
pub(crate) mod user_add;

use crate::action::cluster_login::ClusterLogin;
use crate::action::user_add::UserAdd;
use crate::db_pool::DbPool;
use crate::server_db::ServerDb;
Expand All @@ -9,7 +11,8 @@ use serde::Serialize;

#[derive(Clone, Serialize, Deserialize)]
pub(crate) enum ClusterAction {
UserAdd(user_add::UserAdd),
UserAdd(UserAdd),
ClusterLogin(ClusterLogin),
}

#[derive(Clone, Serialize, Deserialize)]
Expand All @@ -25,6 +28,7 @@ impl Action for ClusterAction {
async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool) -> ServerResult<ClusterResponse> {
match self {
ClusterAction::UserAdd(action) => action.exec(db, db_pool).await,
ClusterAction::ClusterLogin(action) => action.exec(db, db_pool).await,
}
}
}
Expand All @@ -34,3 +38,9 @@ impl From<UserAdd> for ClusterAction {
ClusterAction::UserAdd(value)
}
}

impl From<ClusterLogin> for ClusterAction {
fn from(value: ClusterLogin) -> Self {
ClusterAction::ClusterLogin(value)
}
}
23 changes: 23 additions & 0 deletions agdb_server/src/action/cluster_login.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::server_error::ServerResult;
use agdb::UserValue;
use serde::Deserialize;
use serde::Serialize;

#[derive(Clone, Serialize, Deserialize, UserValue)]
pub(crate) struct ClusterLogin {
pub(crate) user: String,
pub(crate) new_token: String,
}

impl Action for ClusterLogin {
async fn exec(self, db: &mut ServerDb, _db_pool: &mut DbPool) -> ServerResult<ClusterResponse> {
let user_id = db.user_id(&self.user).await?;
db.save_token(user_id, &self.new_token).await?;

Ok(ClusterResponse::None)
}
}
1 change: 1 addition & 0 deletions agdb_server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use utoipa::OpenApi;
routes::db::user::add,
routes::db::user::list,
routes::db::user::remove,
routes::cluster::login,
routes::cluster::status,
),
components(schemas(
Expand Down
3 changes: 2 additions & 1 deletion agdb_server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ pub(crate) fn app(
"/db/:user/:db/user/:other/remove",
routing::delete(routes::db::user::remove),
)
.route("/cluster/status", routing::get(routes::cluster::status))
.route("/cluster", routing::post(routes::cluster::cluster))
.route("/cluster/login", routing::post(routes::cluster::login))
.route("/cluster/status", routing::get(routes::cluster::status))
.route("/user/login", routing::post(routes::user::login))
.route("/user/logout", routing::post(routes::user::logout))
.route(
Expand Down
51 changes: 27 additions & 24 deletions agdb_server/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,33 +101,36 @@ fn mask_password(log_record: &mut LogRecord) {
|| log_record.uri.contains("/change_password")
|| (log_record.uri.contains("/admin/user/") && log_record.uri.contains("/add"))
{
const PASSWORD_PATTERN: &str = "\"password\"";
const PASSWORD_PATTERNS: [&str; 2] = ["\"password\"", "\"new_password\""];
const QUOTE_PATTERN: &str = "\"";
if let Some(starting_index) = log_record.request_body.find(PASSWORD_PATTERN) {
if let Some(start) = log_record.request_body[starting_index + PASSWORD_PATTERN.len()..]
.find(QUOTE_PATTERN)
{
let mut skip = false;
let start = starting_index + PASSWORD_PATTERN.len() + start;
let mut end = start + 1;

for c in log_record.request_body[start + 1..].chars() {
end += 1;

if skip {
skip = false;
} else if c == '\\' {
skip = true;
} else if c == '"' {
break;

for pattern in PASSWORD_PATTERNS {
if let Some(starting_index) = log_record.request_body.find(pattern) {
if let Some(start) =
log_record.request_body[starting_index + pattern.len()..].find(QUOTE_PATTERN)
{
let mut skip = false;
let start = starting_index + pattern.len() + start;
let mut end = start + 1;

for c in log_record.request_body[start + 1..].chars() {
end += 1;

if skip {
skip = false;
} else if c == '\\' {
skip = true;
} else if c == '"' {
break;
}
}
}

log_record.request_body = format!(
"{}\"***\"{}",
&log_record.request_body[..start],
&log_record.request_body[end..]
);
log_record.request_body = format!(
"{}\"***\"{}",
&log_record.request_body[..start],
&log_record.request_body[end..]
);
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion agdb_server/src/redirect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use axum::response::IntoResponse;
use axum::response::Response;
use reqwest::StatusCode;

const REDIRECT_PATHS: [&str; 12] = [
const REDIRECT_PATHS: [&str; 13] = [
"/add",
"/backup",
"/change_password",
"/clear",
"/cluster/login",
"/convert",
"/copy",
"/delete",
Expand Down
37 changes: 37 additions & 0 deletions agdb_server/src/routes/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use crate::action::cluster_login::ClusterLogin;
use crate::action::ClusterAction;
use crate::cluster;
use crate::cluster::Cluster;
use crate::config::Config;
use crate::raft::Request;
use crate::raft::Response;
use crate::routes::user::do_login;
use crate::server_db::ServerDb;
use crate::server_error::ServerResponse;
use crate::server_error::ServerResult;
use crate::user_id::ClusterId;
use agdb_api::ClusterStatus;
use agdb_api::UserLogin;
use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;
Expand All @@ -19,6 +25,37 @@ pub(crate) async fn cluster(
Ok((StatusCode::OK, Json(response)))
}

#[utoipa::path(post,
path = "/api/v1/cluster/login",
operation_id = "cluster_login",
tag = "agdb",
request_body = UserLogin,
responses(
(status = 200, description = "login successful", body = String),
(status = 401, description = "invalid credentials"),
)
)]
pub(crate) async fn login(
State(server_db): State<ServerDb>,
State(cluster): State<Cluster>,
Json(request): Json<UserLogin>,
) -> ServerResponse<(StatusCode, Json<String>)> {
let (token, user_id) = do_login(&server_db, &request.username, &request.password).await?;

if user_id.is_some() {
cluster::append(
cluster,
ClusterLogin {
user: request.username,
new_token: token.clone(),
},
)
.await?;
}

Ok((StatusCode::OK, Json(token)))
}

#[utoipa::path(get,
path = "/api/v1/cluster/status",
operation_id = "cluster_status",
Expand Down
45 changes: 30 additions & 15 deletions agdb_server/src/routes/user.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::config::Config;
use crate::password;
use crate::password::Password;
use crate::routes::ServerResult;
use crate::server_db::ServerDb;
use crate::server_error::ServerError;
use crate::server_error::ServerResponse;
use crate::user_id::UserId;
use crate::user_id::UserName;
use agdb::DbId;
use agdb_api::ChangePassword;
use agdb_api::UserLogin;
use agdb_api::UserStatus;
Expand All @@ -14,6 +16,32 @@ use axum::http::StatusCode;
use axum::Json;
use uuid::Uuid;

pub(crate) async fn do_login(
server_db: &ServerDb,
username: &str,
password: &str,
) -> ServerResult<(String, Option<DbId>)> {
let user = server_db
.user(username)
.await
.map_err(|_| ServerError::new(StatusCode::UNAUTHORIZED, "unuauthorized"))?;
let pswd = Password::new(&user.username, &user.password, &user.salt)?;

if !pswd.verify_password(password) {
return Err(ServerError::new(StatusCode::UNAUTHORIZED, "unuauthorized"));
}

let user_id = user.db_id.unwrap();
let mut token = server_db.user_token(user_id).await?;

if token.is_empty() {
let token_uuid = Uuid::new_v4();
token = token_uuid.to_string();
}

Ok((token, Some(user_id)))
}

#[utoipa::path(post,
path = "/api/v1/user/login",
operation_id = "user_login",
Expand All @@ -28,22 +56,9 @@ pub(crate) async fn login(
State(server_db): State<ServerDb>,
Json(request): Json<UserLogin>,
) -> ServerResponse<(StatusCode, Json<String>)> {
let user = server_db
.user(&request.username)
.await
.map_err(|_| ServerError::new(StatusCode::UNAUTHORIZED, "unuauthorized"))?;
let pswd = Password::new(&user.username, &user.password, &user.salt)?;
let (token, user_id) = do_login(&server_db, &request.username, &request.password).await?;

if !pswd.verify_password(&request.password) {
return Err(ServerError::new(StatusCode::UNAUTHORIZED, "unuauthorized"));
}

let user_id = user.db_id.unwrap();
let mut token = server_db.user_token(user_id).await?;

if token.is_empty() {
let token_uuid = Uuid::new_v4();
token = token_uuid.to_string();
if let Some(user_id) = user_id {
server_db.save_token(user_id, &token).await?;
}

Expand Down
14 changes: 14 additions & 0 deletions agdb_server/src/server_db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::action::cluster_login::ClusterLogin;
use crate::action::user_add::UserAdd;
use crate::action::ClusterAction;
use crate::config::Config;
Expand Down Expand Up @@ -414,6 +415,15 @@ impl ServerDb {
)?
.try_into()?,
)),
"ClusterLogin" => Ok(ClusterAction::ClusterLogin(
t.exec(
QueryBuilder::select()
.elements::<ClusterLogin>()
.ids(element.id)
.query(),
)?
.try_into()?,
)),
_ => Err(ServerError::new(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("unknown action: {action}"),
Expand Down Expand Up @@ -767,6 +777,10 @@ fn log_db_values(log: &Log<ClusterAction>) -> Vec<DbKeyValue> {
values.push(("action", "UserAdd").into());
values.extend(action.to_db_values());
}
ClusterAction::ClusterLogin(action) => {
values.push(("action", "ClusterLogin").into());
values.extend(action.to_db_values());
}
}

values
Expand Down
28 changes: 10 additions & 18 deletions agdb_server/tests/routes/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,26 +143,18 @@ async fn rebalance() -> anyhow::Result<()> {
}

#[tokio::test]
async fn user_add() -> anyhow::Result<()> {
let (leader, servers) = create_cluster(3).await?;
leader.client.write().await.user_login(ADMIN, ADMIN).await?;
leader
.client
.write()
.await
.admin_user_add("user1", "password123")
.await?;

for has_user in servers.iter().map(|s| {
let client = s.client.clone();
tokio::spawn(async move {
client.write().await.user_login(ADMIN, ADMIN).await?;
wait_for_user(client.clone(), "user1").await
})
}) {
has_user.await??;
async fn user() -> anyhow::Result<()> {
let (leader, servers) = create_cluster(2).await?;

{
let mut client = leader.client.write().await;
client.cluster_login(ADMIN, ADMIN).await?;
client.admin_user_add("user1", "password123").await?;
}

servers[0].client.write().await.token = leader.client.read().await.token.clone();
wait_for_user(servers[0].client.clone(), "user1").await?;

Ok(())
}

Expand Down

0 comments on commit dadcf02

Please sign in to comment.