-
Notifications
You must be signed in to change notification settings - Fork 189
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Ryan Koo <[email protected]>
- Loading branch information
Showing
5 changed files
with
158 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
# | ||
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
|
||
import logging | ||
from aistore.sdk.request_client import RequestClient | ||
from aistore.sdk.const import ( | ||
HTTP_METHOD_DELETE, | ||
URL_PATH_AUTHN_TOKENS, | ||
) | ||
from aistore.sdk.authn.types import TokenMsg | ||
|
||
|
||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class TokenManager: # pylint: disable=duplicate-code | ||
""" | ||
Manages token-related operations. | ||
This class provides methods to interact with tokens in the AuthN server. | ||
. | ||
Args: | ||
client (RequestClient): The RequestClient used to make HTTP requests. | ||
""" | ||
|
||
def __init__(self, client: RequestClient): | ||
self._client = client | ||
|
||
@property | ||
def client(self) -> RequestClient: | ||
"""Returns the RequestClient instance used by this TokenManager.""" | ||
return self._client | ||
|
||
def revoke(self, token: str) -> None: | ||
""" | ||
Revokes the specified authentication token. | ||
Args: | ||
token (str): The token to be revoked. | ||
Raises: | ||
ValueError: If the token is not provided. | ||
AISError: If the revoke token request fails. | ||
""" | ||
if not token: | ||
raise ValueError("Token must be provided to revoke.") | ||
|
||
logger.info("Revoking token: %s", token) | ||
|
||
self.client.request( | ||
method=HTTP_METHOD_DELETE, | ||
path=f"{URL_PATH_AUTHN_TOKENS}", | ||
json=TokenMsg(token=token).dict(), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
python/tests/integration/sdk/authn/test_authn_token_manager.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
# | ||
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
|
||
# pylint: disable=duplicate-code | ||
|
||
import unittest | ||
|
||
import pytest | ||
|
||
from aistore.sdk.authn.authn_client import AuthNClient | ||
from aistore.sdk.client import Client | ||
from tests.integration import ( | ||
AIS_AUTHN_SU_NAME, | ||
AIS_AUTHN_SU_PASS, | ||
AUTHN_ENDPOINT, | ||
CLUSTER_ENDPOINT, | ||
) | ||
from tests.utils import random_string | ||
|
||
|
||
class TestAuthNTokenManager( | ||
unittest.TestCase | ||
): # pylint: disable=too-many-instance-attributes | ||
def setUp(self) -> None: | ||
# AuthN Client | ||
self.authn_client = AuthNClient(AUTHN_ENDPOINT) | ||
self.token = self.authn_client.login(AIS_AUTHN_SU_NAME, AIS_AUTHN_SU_PASS) | ||
|
||
# AIS Client | ||
self._create_ais_client() | ||
self.uuid = self.ais_client.cluster().get_uuid() | ||
|
||
# Register the AIS Cluster | ||
self.cluster_alias = "Test-Cluster" + random_string() | ||
self.cluster_manager = self.authn_client.cluster_manager() | ||
self.cluster_info = self.cluster_manager.register( | ||
self.cluster_alias, [CLUSTER_ENDPOINT] | ||
) | ||
|
||
self.token_manager = self.authn_client.token_manager() | ||
|
||
def tearDown(self) -> None: | ||
self.authn_client.login(AIS_AUTHN_SU_NAME, AIS_AUTHN_SU_PASS) | ||
self._create_ais_client() | ||
self.cluster_manager.delete(cluster_id=self.uuid) | ||
|
||
def _create_ais_client(self): | ||
self.ais_client = Client(CLUSTER_ENDPOINT, token=self.authn_client.client.token) | ||
|
||
@pytest.mark.authn | ||
def test_revoke_token(self): | ||
# Assert token is valid and working | ||
self.ais_client.cluster().list_buckets() | ||
|
||
# Revoke the token | ||
self.token_manager.revoke(self.token) | ||
|
||
# Attempt to use the token after revoking it | ||
with self.assertRaises(Exception): | ||
self.ais_client.cluster().list_buckets() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# | ||
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
|
||
import unittest | ||
from unittest.mock import Mock | ||
|
||
from aistore.sdk.request_client import RequestClient | ||
from aistore.sdk.authn.types import TokenMsg | ||
from aistore.sdk.authn.token_manager import TokenManager | ||
from aistore.sdk.const import ( | ||
HTTP_METHOD_DELETE, | ||
URL_PATH_AUTHN_TOKENS, | ||
) | ||
|
||
|
||
class TestAuthNTokenManager(unittest.TestCase): | ||
def setUp(self) -> None: | ||
self.mock_client = Mock(RequestClient) | ||
self.token_manager = TokenManager(self.mock_client) | ||
|
||
def test_token_revoke(self): | ||
token = "test-token" | ||
self.token_manager.revoke(token=token) | ||
self.mock_client.request.assert_called_once_with( | ||
method=HTTP_METHOD_DELETE, | ||
path=f"{URL_PATH_AUTHN_TOKENS}", | ||
json=TokenMsg(token=token).dict(), | ||
) |