Skip to content

Commit

Permalink
python/authn: Add Users API
Browse files Browse the repository at this point in the history
If applied, this commit will add the Users API to the authentication module, enabling the management of role-assigned users.

- Adds APIs to create, update, delete, get, and list users.
- Adds appropriate unit and integration tests.

Signed-off-by: Ryan Koo <[email protected]>
  • Loading branch information
rkoo19 committed Aug 19, 2024
1 parent e36e7a5 commit 604e60a
Show file tree
Hide file tree
Showing 6 changed files with 636 additions and 32 deletions.
10 changes: 10 additions & 0 deletions python/aistore/sdk/authn/authn_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from aistore.sdk.authn.types import TokenMsg, LoginMsg
from aistore.sdk.authn.cluster_manager import ClusterManager
from aistore.sdk.authn.role_manager import RoleManager
from aistore.sdk.authn.user_manager import UserManager

# logging
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -127,3 +128,12 @@ def role_manager(self) -> RoleManager:
RoleManager: An instance to manage role operations.
"""
return RoleManager(client=self._request_client)

def user_manager(self) -> UserManager:
"""
Factory method to create a UserManager instance.
Returns:
UserManager: An instance to manage user operations.
"""
return UserManager(client=self._request_client)
54 changes: 54 additions & 0 deletions python/aistore/sdk/authn/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,57 @@ def roles(self):

def __str__(self) -> str:
return "\n".join(str(role) for role in self.__root__)


class UserInfo(BaseModel):
"""
Represents user information in the AuthN service.
Attributes:
id (str): The username or ID of the user.
password (str, optional): The user's password. Serialized as 'pass' in the request.
roles (RolesList): The list of roles assigned to the user.
"""

id: str
password: Optional[str] = None
roles: RolesList

def dict(self, **kwargs):
"""
Override the dict method to serialize the 'password' field as 'pass'.
Returns:
Dict[str, Union[str, RolesList]]: The dict representation of the user information.
"""
user_dict = super().dict(**kwargs)
if "password" in user_dict and user_dict["password"] is not None:
user_dict["pass"] = user_dict.pop("password")
return user_dict


class UsersList(BaseModel):
"""
Represents a list of users.
Attributes:
__root__ (Dict[str, UserInfo]): Dictionary of user names/IDs to UserInfo objects.
"""

__root__: Dict[str, UserInfo]

def __iter__(self):
return iter(self.__root__.values())

def __getitem__(self, item):
return self.__root__[item]

def __len__(self):
return len(self.__root__)

@property
def users(self):
return self.__root__

def __str__(self) -> str:
return "\n".join(str(user) for user in self.__root__)
184 changes: 184 additions & 0 deletions python/aistore/sdk/authn/user_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#

# pylint: disable=too-many-arguments, duplicate-code

import logging

from typing import List, Optional

from aistore.sdk.authn.role_manager import RoleManager
from aistore.sdk.authn.types import UserInfo, RolesList, UsersList
from aistore.sdk.request_client import RequestClient
from aistore.sdk.const import (
HTTP_METHOD_DELETE,
HTTP_METHOD_GET,
HTTP_METHOD_POST,
HTTP_METHOD_PUT,
URL_PATH_AUTHN_USERS,
)


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class UserManager:
"""
UserManager provides methods to manage users in the AuthN service.
Args:
client (RequestClient): The request client to interact with AuthN service.
"""

def __init__(self, client: RequestClient):
self._client = client
self._role_manager = RoleManager(client)

@property
def client(self) -> RequestClient:
"""Returns the RequestClient instance used by this UserManager."""
return self._client

def get(self, username: str) -> UserInfo:
"""
Retrieve user information from the AuthN Server.
Args:
username (str): The username to retrieve.
Returns:
UserInfo: The user's information.
Raises:
AISError: If the user retrieval request fails.
"""
logger.info("Getting user with ID: %s", username)
response = self._client.request_deserialize(
HTTP_METHOD_GET,
path=f"{URL_PATH_AUTHN_USERS}/{username}",
res_model=UserInfo,
)

return response

def delete(self, username: str) -> None:
"""
Delete an existing user from the AuthN Server.
Args:
username (str): The username of the user to delete.
Raises:
AISError: If the user deletion request fails.
"""
logger.info("Deleting user with ID: %s", username)
self._client.request(
HTTP_METHOD_DELETE,
path=f"{URL_PATH_AUTHN_USERS}/{username}",
)

def create(
self,
username: str,
roles: List[str],
password: str,
) -> UserInfo:
"""
Create a new user in the AuthN Server.
Args:
username (str): The name or ID of the user to create.
password (str): The password for the user.
roles (List[str]): The list of names of roles to assign to the user.
Returns:
UserInfo: The created user's information.
Raises:
AISError: If the user creation request fails.
"""
logger.info("Creating user with ID: %s", username)
roles = self._get_roles_from_names(roles)
user_info = UserInfo(id=username, password=password, roles=roles)
self._client.request(
HTTP_METHOD_POST,
path=URL_PATH_AUTHN_USERS,
json=user_info.dict(),
)

return self.get(username)

def list(self):
"""
List all users in the AuthN Server.
Returns:
str: The list of users in the AuthN Server.
Raises:
AISError: If the user list request fails.
"""
logger.info("Listing all users")
response = self._client.request_deserialize(
HTTP_METHOD_GET,
path=URL_PATH_AUTHN_USERS,
res_model=UsersList,
)

return response

def update(
self,
username: str,
password: Optional[str] = None,
roles: Optional[List[str]] = None,
) -> UserInfo:
"""
Update an existing user's information in the AuthN Server.
Args:
username (str): The ID of the user to update.
password (str, optional): The new password for the user.
roles (List[str], optional): The list of names of roles to assign to the user.
Returns:
UserInfo: The updated user's information.
Raises:
AISError: If the user update request fails.
"""
if not (password or roles):
raise ValueError("You must change either the password or roles for a user.")

roles = self._get_roles_from_names(roles) if roles else []

logger.info("Updating user with ID: %s", username)
user_info = UserInfo(id=username, password=password, roles=roles)
self._client.request(
HTTP_METHOD_PUT,
path=f"{URL_PATH_AUTHN_USERS}/{username}",
json=user_info.dict(),
)

return self.get(username)

def _get_roles_from_names(self, role_names: List[str]) -> RolesList:
"""
Helper function to convert a list of role names into a RolesList.
Args:
role_names (List[str]): List of role names to convert.
Returns:
RolesList: The corresponding RolesList object.
Raises:
ValueError: If any role name is not found.
"""
roles = []
for name in role_names:
role = self._role_manager.get(name)
roles.append(role)
return roles
Loading

0 comments on commit 604e60a

Please sign in to comment.