Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Add cluster logout #1383 #1384

Merged
merged 3 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions agdb_api/rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ impl<T: HttpClient> AgdbApi<T> {
&self.base_url
}

pub async fn admin_cluster_logout(&self, user: &str) -> AgdbApiResult<u16> {
Ok(self
.client
.post::<(), ()>(
&self.url(&format!("/admin/cluster/{user}/logout")),
&None,
&self.token,
)
.await?
.0)
}

pub async fn admin_db_add(&self, owner: &str, db: &str, db_type: DbType) -> AgdbApiResult<u16> {
Ok(self
.client
Expand Down Expand Up @@ -542,6 +554,16 @@ impl<T: HttpClient> AgdbApi<T> {
Ok(status)
}

pub async fn cluster_logout(&mut self) -> AgdbApiResult<u16> {
let status = self
.client
.post::<(), ()>(&self.url("/cluster/logout"), &None, &self.token)
.await?
.0;
self.token = None;
Ok(status)
}

pub async fn user_login(&mut self, user: &str, password: &str) -> AgdbApiResult<u16> {
let (status, token) = self
.client
Expand Down
2 changes: 2 additions & 0 deletions agdb_server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ use utoipa::OpenApi;
routes::db::user::list,
routes::db::user::remove,
routes::cluster::login,
routes::cluster::logout,
routes::cluster::status,
routes::cluster::admin_logout,
),
components(schemas(
routes::db::DbTypeParam,
Expand Down
5 changes: 5 additions & 0 deletions agdb_server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ pub(crate) fn app(
)
.route("/cluster", routing::post(routes::cluster::cluster))
.route("/cluster/login", routing::post(routes::cluster::login))
.route("/cluster/logout", routing::post(routes::cluster::logout))
.route(
"/admin/cluster/:user/logout",
routing::post(routes::cluster::admin_logout),
)
.route("/cluster/status", routing::get(routes::cluster::status))
.route("/user/login", routing::post(routes::user::login))
.route("/user/logout", routing::post(routes::user::logout))
Expand Down
64 changes: 64 additions & 0 deletions agdb_server/src/routes/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use crate::routes::user::do_login;
use crate::server_db::ServerDb;
use crate::server_error::ServerResponse;
use crate::server_error::ServerResult;
use crate::user_id::AdminId;
use crate::user_id::ClusterId;
use crate::user_id::UserId;
use agdb_api::ClusterStatus;
use agdb_api::UserLogin;
use axum::extract::Path;
use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;
Expand All @@ -24,6 +27,38 @@ pub(crate) async fn cluster(
Ok((StatusCode::OK, Json(response)))
}

#[utoipa::path(post,
path = "/api/v1/admin/cluster/{username}/logout",
operation_id = "admin_cluster_logout",
tag = "agdb",
security(("Token" = [])),
params(
("username" = String, Path, description = "user name"),
),
responses(
(status = 201, description = "user logged out"),
(status = 401, description = "admin only"),
(status = 404, description = "user not found"),
)
)]
pub(crate) async fn admin_logout(
_admin: AdminId,
State(server_db): State<ServerDb>,
State(cluster): State<Cluster>,
Path(username): Path<String>,
) -> ServerResponse {
let _user_id = server_db.user_id(&username).await?;

cluster
.append(ClusterLogin {
user: username,
new_token: String::new(),
})
.await?;

Ok(StatusCode::CREATED)
}

#[utoipa::path(post,
path = "/api/v1/cluster/login",
operation_id = "cluster_login",
Expand Down Expand Up @@ -53,6 +88,35 @@ pub(crate) async fn login(
Ok((StatusCode::OK, Json(token)))
}

#[utoipa::path(post,
path = "/api/v1/cluster/logout",
operation_id = "cluster_logout",
tag = "agdb",
security(("Token" = [])),
responses(
(status = 201, description = "user logged out"),
(status = 401, description = "invalid credentials")
)
)]
pub(crate) async fn logout(
user: UserId,
State(server_db): State<ServerDb>,
State(cluster): State<Cluster>,
) -> ServerResponse {
let token = server_db.user_token(user.0).await?;

if !token.is_empty() {
cluster
.append(ClusterLogin {
user: server_db.user_name(user.0).await?,
new_token: String::new(),
})
.await?;
}

Ok(StatusCode::CREATED)
}

#[utoipa::path(get,
path = "/api/v1/cluster/status",
operation_id = "cluster_status",
Expand Down
6 changes: 3 additions & 3 deletions agdb_server/src/routes/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ pub(crate) async fn do_login(
return Err(ServerError::new(StatusCode::UNAUTHORIZED, "unuauthorized"));
}

let user_id = user.db_id.unwrap();
let mut token = server_db.user_token(user_id).await?;
let user_id = user.db_id;
let mut token = server_db.user_token(user_id.unwrap_or_default()).await?;

if token.is_empty() {
let token_uuid = Uuid::new_v4();
token = token_uuid.to_string();
}

Ok((token, Some(user_id)))
Ok((token, user_id))
}

#[utoipa::path(post,
Expand Down
52 changes: 44 additions & 8 deletions agdb_server/tests/routes/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ async fn wait_for_user_gone(client: ClusterClient, username: &str) -> anyhow::Re
))
}

async fn wait_for_logout(client: ClusterClient) -> anyhow::Result<()> {
let now = Instant::now();

while now.elapsed().as_millis() < TEST_TIMEOUT {
if let Err(e) = client.read().await.user_status().await {
if e.status == 401 {
return Ok(());
}
}
std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL));
}

Err(anyhow::anyhow!(
"User not logged out within {} seconds",
TEST_TIMEOUT / 1000
))
}

async fn create_cluster(nodes: usize) -> anyhow::Result<(ClusterServer, Vec<ClusterServer>)> {
let mut configs = Vec::with_capacity(nodes);
let mut cluster = Vec::with_capacity(nodes);
Expand Down Expand Up @@ -170,20 +188,38 @@ async fn rebalance() -> anyhow::Result<()> {
#[tokio::test]
async fn user() -> anyhow::Result<()> {
let (leader, servers) = create_cluster(2).await?;
let client = servers[0].client.clone();

{
let mut client = servers[0].client.write().await;
client.cluster_login(ADMIN, ADMIN).await?;
client.admin_user_add("user1", "password123").await?;
}
client.write().await.cluster_login(ADMIN, ADMIN).await?;
client
.read()
.await
.admin_user_add("user1", "password123")
.await?;

wait_for_user(client.clone(), "user1").await?;

wait_for_user(servers[0].client.clone(), "user1").await?;
client
.write()
.await
.user_login("user1", "password123")
.await?;

let mut leader = leader.client.write().await;
leader.user_login(ADMIN, ADMIN).await?;
leader.cluster_login(ADMIN, ADMIN).await?;
leader.admin_cluster_logout("user1").await?;

wait_for_logout(client.clone()).await?;

leader.admin_user_remove("user1").await?;

wait_for_user_gone(servers[0].client.clone(), "user1").await?;
client.write().await.user_login(ADMIN, ADMIN).await?;
wait_for_user_gone(client.clone(), "user1").await?;

client.read().await.user_status().await?;
leader.cluster_logout().await?;

wait_for_logout(client.clone()).await?;

Ok(())
}
Expand Down
Loading