-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into feature/update-project-description
- Loading branch information
Showing
14 changed files
with
618 additions
and
411 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,72 +1,143 @@ | ||
from secrets import compare_digest | ||
from typing import Annotated | ||
|
||
from fastapi import Depends, HTTPException | ||
from fastapi.security import APIKeyHeader | ||
from fastapi import Depends, Header, HTTPException | ||
from fastapi.security import APIKeyHeader, HTTPBasic, HTTPBasicCredentials | ||
from starlette import status | ||
|
||
from mex.backend.settings import BackendSettings | ||
from mex.backend.types import APIKey | ||
|
||
X_API_KEY = APIKeyHeader(name="X-API-Key", auto_error=False) | ||
X_API_CREDENTIALS = HTTPBasic(auto_error=False) | ||
|
||
|
||
def has_write_access(api_key: Annotated[str | None, Depends(X_API_KEY)]) -> None: | ||
"""Verify if api key has write access. | ||
def __check_header_for_authorization_method( | ||
api_key: Annotated[str | None, Depends(X_API_KEY)] = None, | ||
credentials: Annotated[ | ||
HTTPBasicCredentials | None, Depends(X_API_CREDENTIALS) | ||
] = None, | ||
user_agent: Annotated[str, Header(include_in_schema=False)] = "n/a", | ||
) -> None: | ||
"""Check authorization header for API key or credentials. | ||
Raises: | ||
HTTPException if no header is provided or APIKey does not have write access. | ||
HTTPException if both API key and credentials or none of them are in header. | ||
Args: | ||
api_key: the API key | ||
Settings: | ||
backend_user_database: checked for presence of api_key | ||
credentials: username and password | ||
user_agent: user-agent (in case of a web browser starts with "Mozilla/") | ||
""" | ||
if not api_key: | ||
if not api_key and not credentials: | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="Missing authentication header X-API-Key.", | ||
detail="Missing authentication header X-API-Key or credentials.", | ||
headers={"WWW-Authenticate": "Basic"} | ||
if user_agent.startswith("Mozilla/") | ||
else None, | ||
) | ||
if api_key and credentials: | ||
raise HTTPException( | ||
status_code=status.HTTP_403_FORBIDDEN, | ||
detail="Authenticate with X-API-Key or credentials, not both.", | ||
headers={"WWW-Authenticate": "Basic"} | ||
if user_agent.startswith("Mozilla/") | ||
else None, | ||
) | ||
|
||
|
||
def has_write_access( | ||
api_key: Annotated[str | None, Depends(X_API_KEY)] = None, | ||
credentials: Annotated[ | ||
HTTPBasicCredentials | None, Depends(X_API_CREDENTIALS) | ||
] = None, | ||
user_agent: Annotated[str, Header(include_in_schema=False)] = "n/a", | ||
) -> None: | ||
"""Verify if provided api key or credentials have write access. | ||
Raises: | ||
HTTPException if no header or provided APIKey/credentials have no write access. | ||
Args: | ||
api_key: the API key | ||
credentials: username and password | ||
user_agent: user-agent (in case of a web browser starts with "Mozilla/") | ||
Settings: | ||
check credentials in backend_user_database or backend_api_key_database | ||
""" | ||
__check_header_for_authorization_method(api_key, credentials, user_agent) | ||
|
||
settings = BackendSettings.get() | ||
user_database = settings.backend_user_database | ||
can_write = APIKey(api_key) in user_database.write | ||
can_write = False | ||
if api_key: | ||
api_key_database = settings.backend_api_key_database | ||
can_write = APIKey(api_key) in api_key_database.write | ||
elif credentials: | ||
api_write_user_db = settings.backend_user_database.write | ||
user, pw = credentials.username, credentials.password.encode("utf-8") | ||
if api_write_user := api_write_user_db.get(user): | ||
can_write = compare_digest( | ||
pw, api_write_user.get_secret_value().encode("utf-8") | ||
) | ||
if not can_write: | ||
raise HTTPException( | ||
status_code=status.HTTP_403_FORBIDDEN, | ||
detail="Unauthorized API Key.", | ||
detail=f"Unauthorized {'API Key' if api_key else 'credentials'}.", | ||
headers={"WWW-Authenticate": "Basic"} | ||
if user_agent.startswith("Mozilla/") | ||
else None, | ||
) | ||
|
||
|
||
def has_read_access(api_key: Annotated[str | None, Depends(X_API_KEY)]) -> None: | ||
"""Verify if api key has read access or read access implied by write access. | ||
def has_read_access( | ||
api_key: Annotated[str | None, Depends(X_API_KEY)] = None, | ||
credentials: Annotated[ | ||
HTTPBasicCredentials | None, | ||
Depends(X_API_CREDENTIALS), | ||
] = None, | ||
user_agent: Annotated[str, Header(include_in_schema=False)] = "n/a", | ||
) -> None: | ||
"""Verify if api key or credentials have read access or write access. | ||
Raises: | ||
HTTPException if no header is provided or APIKey does not have read access. | ||
HTTPException if no header or provided APIKey/credentials have no read access. | ||
Args: | ||
api_key: the API key | ||
credentials: username and password | ||
user_agent: user-agent (in case of a web browser starts with "Mozilla/") | ||
Settings: | ||
backend_user_database: checked for presence of api_key | ||
check credentials in backend_user_database or backend_api_key_database | ||
""" | ||
if not api_key: | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="Missing authentication header X-API-Key.", | ||
) | ||
__check_header_for_authorization_method(api_key, credentials, user_agent) | ||
|
||
settings = BackendSettings.get() | ||
user_database = settings.backend_user_database | ||
try: | ||
has_write_access(api_key) | ||
has_write_access(api_key, credentials) # read access implied by write access | ||
can_write = True | ||
except HTTPException: | ||
can_write = False | ||
can_read = can_write or (APIKey(api_key) in user_database.read) | ||
|
||
settings = BackendSettings.get() | ||
can_read = False | ||
if api_key: | ||
api_key_database = settings.backend_api_key_database | ||
can_read = APIKey(api_key) in api_key_database.read | ||
elif credentials: | ||
api_read_user_db = settings.backend_user_database.read | ||
user, pw = credentials.username, credentials.password.encode("utf-8") | ||
if api_read_user := api_read_user_db.get(user): | ||
can_read = compare_digest( | ||
pw, api_read_user.get_secret_value().encode("utf-8") | ||
) | ||
can_read = can_read or can_write | ||
if not can_read: | ||
raise HTTPException( | ||
status_code=status.HTTP_403_FORBIDDEN, | ||
detail="Unauthorized API Key.", | ||
detail=f"Unauthorized {'API Key' if api_key else 'credentials'}.", | ||
headers={"WWW-Authenticate": "Basic"} | ||
if user_agent.startswith("Mozilla/") | ||
else None, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.