-
-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #3709 from KBVE/dev
Preparing Alpha Branch
- Loading branch information
Showing
5 changed files
with
110 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
from .api import Routes, CORS | ||
from .apps import TursoDatabase, DiscordServerManager | ||
from .apps import TursoDatabase, DiscordServerManager, Kilobase | ||
from .api import SetupSchema, Hero, DiscordServer, Health, SchemaEngine, Utils |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
from .turso import TursoDatabase | ||
from .discord import DiscordServerManager | ||
from .discord import DiscordServerManager | ||
from .kilobase import Kilobase |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
import os | ||
import jwt | ||
import time | ||
from supabase import create_client, Client | ||
from jwt import ExpiredSignatureError, InvalidTokenError | ||
|
||
class Kilobase: | ||
def __init__(self): | ||
"""Initialize the Supabase client and JWT secret.""" | ||
self.supabase_url = os.getenv("SUPABASE_URL") | ||
self.supabase_key = os.getenv("SUPABASE_KEY") | ||
self.jwt_secret = os.getenv("JWT_SECRET") or "default_secret" | ||
|
||
if not self.supabase_url or not self.supabase_key: | ||
raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in the environment variables.") | ||
|
||
self.client: Client = create_client(self.supabase_url, self.supabase_key) | ||
|
||
def issue_jwt(self, user_id: str, expires_in: int = 3600) -> str: | ||
""" | ||
Issue a JWT for a given user. | ||
Args: | ||
user_id (str): The user ID to include in the token. | ||
expires_in (int): Token expiration time in seconds. Default is 1 hour. | ||
Returns: | ||
str: The generated JWT token. | ||
""" | ||
payload = { | ||
"sub": user_id, | ||
"exp": int(time.time()) + expires_in, | ||
"iat": int(time.time()) | ||
} | ||
token = jwt.encode(payload, self.jwt_secret, algorithm="HS256") | ||
return token | ||
|
||
def verify_jwt(self, token: str) -> dict: | ||
""" | ||
Verify a JWT token. | ||
Args: | ||
token (str): The JWT token to verify. | ||
Returns: | ||
dict: Decoded payload if the token is valid. | ||
Raises: | ||
ValueError: If the token is expired or invalid. | ||
""" | ||
try: | ||
decoded = jwt.decode(token, self.jwt_secret, algorithms=["HS256"]) | ||
return decoded | ||
except ExpiredSignatureError: | ||
raise ValueError("Token has expired.") | ||
except InvalidTokenError: | ||
raise ValueError("Invalid token.") | ||
|
||
def get_user_by_id(self, user_id: str): | ||
""" | ||
Fetch a user's data from the Supabase `users` table. | ||
Args: | ||
user_id (str): The user ID to query. | ||
Returns: | ||
dict: User data or None if not found. | ||
""" | ||
response = self.client.table("users").select("*").eq("id", user_id).single().execute() | ||
return response.data if response.data else None | ||
|
||
def extract_user_id(self, token: str) -> str: | ||
""" | ||
Extract the user ID (UUID) from a Supabase JWT token. | ||
Args: | ||
token (str): The JWT token to decode. | ||
Returns: | ||
str: The user's unique ID (UUID) if valid. | ||
Raises: | ||
ValueError: If the token is expired or invalid. | ||
""" | ||
try: | ||
# Decode the Supabase token using the Supabase key | ||
payload = jwt.decode(token, self.supabase_key, algorithms=["HS256"]) | ||
|
||
# Supabase stores the user ID under 'sub' (subject) claim | ||
user_id = payload.get("sub") | ||
if not user_id: | ||
raise ValueError("User ID not found in the token.") | ||
return user_id | ||
|
||
except ExpiredSignatureError: | ||
raise ValueError("Token has expired.") | ||
except InvalidTokenError: | ||
raise ValueError("Invalid token.") | ||
except Exception as e: | ||
raise ValueError(f"Token verification error: {e}") |
Empty file.