From 1ed9754d932c34e3fcffafd4b63474802f22ae43 Mon Sep 17 00:00:00 2001 From: leynier Date: Fri, 15 Oct 2021 01:12:10 -0400 Subject: [PATCH] feat: add types to API class and align implementation with gotrue-js --- gotrue/api.py | 584 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 432 insertions(+), 152 deletions(-) diff --git a/gotrue/api.py b/gotrue/api.py index 6eb1aac3..716c075e 100644 --- a/gotrue/api.py +++ b/gotrue/api.py @@ -1,194 +1,326 @@ -import json -from typing import Any, Dict +from json import dumps +from typing import Any, Dict, Optional, Union -import requests +from requests import delete, get, post, put from gotrue.lib.constants import COOKIE_OPTIONS - - -def to_dict(request_response) -> Dict[str, Any]: - """Wrap up request_response to user-friendly dict.""" - return {**request_response.json(), "status_code": request_response.status_code} +from gotrue.lib.helpers import ( + encode_uri_component, + parse_response, + parse_session_or_user, +) +from gotrue.lib.types import LinkType, Provider, Session, User, UserAttributes class GoTrueApi: def __init__( - self, url: str, headers: Dict[str, Any], cookie_options: Dict[str, Any] + self, + url: str, + headers: Dict[str, Any], + cookie_options: Dict[str, Any], ): """Initialise API class.""" self.url = url self.headers = headers self.cookie_options = {**COOKIE_OPTIONS, **cookie_options} - def sign_up_with_email(self, email: str, password: str) -> Dict[str, Any]: - """Creates a new user using their email address + def sign_up_with_email( + self, + email: str, + password: str, + redirect_to: Optional[str] = None, + data: Optional[Dict[str, Any]] = None, + ) -> Union[Session, User]: + """Creates a new user using their email address. Parameters ---------- email : str - The user's email address. + The email address of the user. password : str - The user's password. + The password of the user. + redirect_to : Optional[str] + A URL or mobile address to send the user to after they are confirmed. + data : Optional[Dict[str, Any]] + Optional user metadata. Returns ------- - request : dict of any - The user or error message returned by the supabase backend. + response : Union[Session, User] + A logged-in session if the server has "autoconfirm" ON + A user if the server has "autoconfirm" OFF + + Raises + ------ + error : ApiError + If an error occurs """ - credentials = {"email": email, "password": password} - request = requests.post( - f"{self.url}/signup", json.dumps(credentials), headers=self.headers - ) - return to_dict(request) - - def sign_up_with_phone(self, phone: str, password: str) -> Dict[str, Any]: - """Creates a new user using their phone number + headers = self.headers + query_string = "" + if redirect_to: + redirect_to_encoded = encode_uri_component(redirect_to) + query_string = f"?redirect_to={redirect_to_encoded}" + data = {"email": email, "password": password, "data": data} + url = f"{self.url}/signup{query_string}" + response = post(url, dumps(data), headers) + return parse_response(response, parse_session_or_user) + + def sign_in_with_email( + self, + email: str, + password: str, + redirect_to: Optional[str] = None, + ) -> Session: + """Logs in an existing user using their email address. Parameters ---------- - phone : str - The user's phone number. + email : str + The email address of the user. password : str - The user's password. + The password of the user. + redirect_to : Optional[str] + A URL or mobile address to send the user to after they are confirmed. Returns ------- - request : dict of any - The user or error message returned by the supabase backend. - """ - credentials = {"phone": phone, "password": password} - request = requests.post( - f"{self.url}/signup", json.dumps(credentials), headers=self.headers - ) - return to_dict(request) + response : Session + A logged-in session - def send_mobile_otp(self, phone: str) -> Dict[str, Any]: - """Sends a mobile OTP via SMS. Will register the account if it doesn't already exist + Raises + ------ + error : ApiError + If an error occurs + """ + headers = self.headers + query_string = "?grant_type=password" + if redirect_to: + redirect_to_encoded = encode_uri_component(redirect_to) + query_string += f"&redirect_to={redirect_to_encoded}" + data = {"email": email, "password": password} + url = f"{self.url}/token{query_string}" + response = post(url, dumps(data), headers) + return parse_response(response, Session.from_dict) + + def sign_up_with_phone( + self, + phone: str, + password: str, + data: Optional[Dict[str, Any]] = None, + ) -> Union[Session, User]: + """Signs up a new user using their phone number and a password. Parameters ---------- phone : str - The user's phone number WITH international prefix. + The phone number of the user. + password : str + The password of the user. + data : Optional[Dict[str, Any]] + Optional user metadata. Returns ------- - request : dict of any - The user or error message returned by the supabase backend. + response : Union[Session, User] + A logged-in session if the server has "autoconfirm" ON + A user if the server has "autoconfirm" OFF + + Raises + ------ + error : ApiError + If an error occurs """ - credentials = {"phone": phone} - request = requests.post( - f"{self.url}/otp", json.dumps(credentials), headers=self.headers - ) - return to_dict(request) - - def verify_mobile_otp(self, phone: str, token: str, options: Dict[str, Any]) -> Dict[str, Any]: - """Send User supplied Mobile OTP to be verified + headers = self.headers + data = {"phone": phone, "password": password, "data": data} + url = f"{self.url}/signup" + response = post(url, dumps(data), headers) + return parse_response(response, parse_session_or_user) + + def sign_in_with_phone( + self, + phone: str, + password: str, + ) -> Session: + """Logs in an existing user using their phone number and password. Parameters ---------- phone : str - The user's phone number WITH international prefix - token : str - Token that user was sent to their mobile phone - options : dict of any - A URL or mobile address to send the user to after they are confirmed. + The phone number of the user. + password : str + The password of the user. Returns ------- - request : dict of any - The user or error message returned by the supabase backend. - """ - payload = { - "phone": phone, - "token": token, - "type": "sms", - "redirect_to": options.get("redirect_to") - } - request = requests.post( - f"{self.url}/verify", json.dumps(payload), headers=self.headers - ) - return to_dict(request) + response : Session + A logged-in session - def sign_in_with_email(self, email: str, password: str) -> Dict[str, Any]: - """Logs in an existing user using their email address. + Raises + ------ + error : ApiError + If an error occurs + """ + headers = self.headers + query_string = "?grant_type=password" + data = {"phone": phone, "password": password} + url = f"{self.url}/token{query_string}" + response = post(url, dumps(data), headers) + return parse_response(response, Session.from_dict) + + def send_magic_link_email( + self, + email: str, + redirect_to: Optional[str] = None, + ) -> None: + """Sends a magic login link to an email address. Parameters ---------- email : str - The user's email address. - password : str - The user's password. + The email address of the user. + redirect_to : Optional[str] + A URL or mobile address to send the user to after they are confirmed. - Returns - ------- - request : dict of any - The user or error message returned by the supabase backend. + Raises + ------ + error : ApiError + If an error occurs """ - credentials = {"email": email, "password": password} - request = requests.post( - f"{self.url}/token?grant_type=password", - json.dumps(credentials), - headers=self.headers, - ) - return to_dict(request) - - def send_magic_link_email(self, email: str) -> Dict[str, Any]: - """Sends a magic login link to an email address. + headers = self.headers + query_string = "" + if redirect_to: + redirect_to_encoded = encode_uri_component(redirect_to) + query_string = f"?redirect_to={redirect_to_encoded}" + data = {"email": email} + url = f"{self.url}/magiclink{query_string}" + response = post(url, dumps(data), headers) + return parse_response(response, lambda _: None) + + def send_mobile_otp(self, phone: str) -> None: + """Sends a mobile OTP via SMS. Will register the account if it doesn't already exist Parameters ---------- - email : str - The user's email address. + phone : str + The user's phone number WITH international prefix + + Raises + ------ + error : ApiError + If an error occurs + """ + headers = self.headers + data = {"phone": phone} + url = f"{self.url}/otp" + response = post(url, dumps(data), headers) + return parse_response(response, lambda _: None) + + def verify_mobile_otp( + self, + phone: str, + token: str, + redirect_to: Optional[str] = None, + ) -> Union[Session, User]: + """Send User supplied Mobile OTP to be verified + + Parameters + ---------- + phone : str + The user's phone number WITH international prefix + token : str + Token that user was sent to their mobile phone + redirect_to : Optional[str] + A URL or mobile address to send the user to after they are confirmed. Returns ------- - request : dict of any - The user or error message returned by the supabase backend. + response : Union[Session, User] + A logged-in session if the server has "autoconfirm" ON + A user if the server has "autoconfirm" OFF + + Raises + ------ + error : ApiError + If an error occurs """ - credentials = {"email": email} - request = requests.post( - f"{self.url}/magiclink", json.dumps(credentials), headers=self.headers - ) - return to_dict(request) - - def invite_user_by_email(self, email: str) -> Dict[str, Any]: + headers = self.headers + data = { + "phone": phone, + "token": token, + "type": "sms", + } + if redirect_to: + redirect_to_encoded = encode_uri_component(redirect_to) + data["redirect_to"] = redirect_to_encoded + url = f"{self.url}/verify" + response = post(url, dumps(data), headers) + return parse_response(response, parse_session_or_user) + + def invite_user_by_email( + self, + email: str, + redirect_to: Optional[str] = None, + data: Optional[Dict[str, Any]] = None, + ) -> User: """Sends an invite link to an email address. Parameters ---------- email : str - The user's email address. + The email address of the user. + redirect_to : Optional[str] + A URL or mobile address to send the user to after they are confirmed. + data : Optional[Dict[str, Any]] + Optional user metadata. Returns ------- - request : dict of any - The invite or error message returned by the supabase backend. - """ - credentials = {"email": email} - request = requests.post( - f"{self.url}/invite", json.dumps(credentials), headers=self.headers - ) - return to_dict(request) + response : User + A user - def reset_password_for_email(self, email: str) -> Dict[str, Any]: + Raises + ------ + error : ApiError + If an error occurs + """ + headers = self.headers + query_string = "" + if redirect_to: + redirect_to_encoded = encode_uri_component(redirect_to) + query_string = f"?redirect_to={redirect_to_encoded}" + data = {"email": email, "data": data} + url = f"{self.url}/invite{query_string}" + response = post(url, dumps(data), headers) + return parse_response(response, User.from_dict) + + def reset_password_for_email( + self, + email: str, + redirect_to: Optional[str] = None, + ) -> None: """Sends a reset request to an email address. Parameters ---------- email : str - The user's email address. + The email address of the user. + redirect_to : Optional[str] + A URL or mobile address to send the user to after they are confirmed. - Returns - ------- - request : dict of any - The password reset status or error message returned by the supabase - backend. + Raises + ------ + error : ApiError + If an error occurs """ - credentials = {"email": email} - request = requests.post( - f"{self.url}/recover", json.dumps(credentials), headers=self.headers - ) - return to_dict(request) + headers = self.headers + query_string = "" + if redirect_to: + redirect_to_encoded = encode_uri_component(redirect_to) + query_string = f"?redirect_to={redirect_to_encoded}" + data = {"email": email} + url = f"{self.url}/recover{query_string}" + response = post(url, dumps(data), headers) + return parse_response(response, lambda _: None) def _create_request_headers(self, jwt: str) -> Dict[str, str]: """Create temporary object. @@ -199,7 +331,7 @@ def _create_request_headers(self, jwt: str) -> Dict[str, str]: Parameters ---------- jwt : str - A valid, logged-in JWT. + A valid, logged-in JWT. Returns ------- @@ -211,66 +343,214 @@ def _create_request_headers(self, jwt: str) -> Dict[str, str]: headers["Authorization"] = f"Bearer {jwt}" return headers - def sign_out(self, jwt: str): + def sign_out(self, jwt: str) -> None: """Removes a logged-in session. Parameters ---------- jwt : str - A valid, logged-in JWT. + A valid, logged-in JWT. + """ + headers = self._create_request_headers(jwt) + url = f"{self.url}/logout" + post(url, headers=headers) + + def get_url_for_provider( + self, + provider: Provider, + redirect_to: Optional[str] = None, + scopes: Optional[str] = None, + ) -> str: + """Generates the relevant login URL for a third-party provider. + + Parameters + ---------- + provider : Provider + One of the providers supported by GoTrue. + redirect_to : Optional[str] + A URL or mobile address to send the user to after they are confirmed. + scopes : Optional[str] + A space-separated list of scopes granted to the OAuth application. + + Returns + ------- + url : str + The URL to redirect the user to. + + Raises + ------ + error : ApiError + If an error occurs """ - requests.post(f"{self.url}/logout", headers=self._create_request_headers(jwt)) + url_params = [f"provider={encode_uri_component(provider)}"] + if redirect_to: + redirect_to_encoded = encode_uri_component(redirect_to) + url_params.append(f"redirect_to={redirect_to_encoded}") + if scopes: + url_params.append(f"scopes={encode_uri_component(scopes)}") + return f"{self.url}/authorize?{'&'.join(url_params)}" - def get_url_for_provider(self, provider: str) -> str: - """Generates the relevant login URL for a third-party provider.""" - return f"{self.url}/authorize?provider={provider}" + def get_user(self, jwt: str) -> User: + """Gets the user details. - def get_user(self, jwt: str) -> Dict[str, Any]: - """Gets the user details + Parameters + ---------- + jwt : str + A valid, logged-in JWT. + + Returns + ------- + response : User + A user + + Raises + ------ + error : ApiError + If an error occurs + """ + headers = self._create_request_headers(jwt) + url = f"{self.url}/user" + response = get(url, headers=headers) + return parse_response(response, User.from_dict) + + def update_user( + self, + jwt: str, + attributes: UserAttributes, + ) -> User: + """ + Updates the user data. Parameters ---------- jwt : str - A valid, logged-in JWT. + A valid, logged-in JWT. + attributes : UserAttributes + The data you want to update. Returns ------- - request : dict of any - The user or error message returned by the supabase backend. + response : User + A user + + Raises + ------ + error : ApiError + If an error occurs """ - request = requests.get( - f"{self.url}/user", headers=self._create_request_headers(jwt) - ) - return to_dict(request) - - def update_user(self, jwt: str, **attributes) -> Dict[str, Any]: - """Updates the user data through the attributes kwargs.""" - request = requests.put( - f"{self.url}/user", - json.dumps(attributes), - headers=self._create_request_headers(jwt), - ) - return to_dict(request) - - def refresh_access_token(self, refresh_token: str) -> Dict[str, Any]: + headers = self._create_request_headers(jwt) + data = attributes.to_dict() + url = f"{self.url}/user" + response = put(url, dumps(data), headers) + return parse_response(response, User.from_dict) + + def delete_user(self, uid: str, jwt: str) -> User: + """Delete a user. Requires a `service_role` key. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + + Parameters + ---------- + uid : str + The user uid you want to remove. + jwt : str + A valid, logged-in JWT. + + Returns + ------- + response : User + A user + + Raises + ------ + error : ApiError + If an error occurs + """ + headers = self._create_request_headers(jwt) + url = f"{self.url}/admin/users/${uid}" + response = delete(url, headers=headers) + return parse_response(response, User.from_dict) + + def refresh_access_token(self, refresh_token: str) -> Session: """Generates a new JWT. Parameters ---------- refresh_token : str A valid refresh token that was returned on login. + + Returns + ------- + response : Session + A session + + Raises + ------ + error : ApiError + If an error occurs + """ + headers = self.headers + query_string = "?grant_type=refresh_token" + data = {"refresh_token": refresh_token} + url = f"{self.url}/token{query_string}" + response = post(url, dumps(data), headers) + return parse_response(response, Session.from_dict) + + def generate_link( + self, + type: LinkType, + email: str, + password: Optional[str] = None, + redirect_to: Optional[str] = None, + data: Optional[Dict[str, Any]] = None, + ) -> Union[Session, User]: + """ + Generates links to be sent via email or other. + + Parameters + ---------- + type : LinkType + The link type ("signup" or "magiclink" or "recovery" or "invite"). + email : str + The user's email. + password : Optional[str] + User password. For signup only. + redirect_to : Optional[str] + The link type ("signup" or "magiclink" or "recovery" or "invite"). + data : Optional[Dict[str, Any]] + Optional user metadata. For signup only. + + Returns + ------- + response : Union[Session, User] + A logged-in session if the server has "autoconfirm" ON + A user if the server has "autoconfirm" OFF + + Raises + ------ + error : ApiError + If an error occurs """ - request = requests.post( - f"{self.url}/token?grant_type=refresh_token", - json.dumps({"refresh_token": refresh_token}), - headers=self.headers, - ) - return to_dict(request) - - def set_auth_cookie(req, res): + headers = self.headers + data = { + "type": str(type), + "email": email, + "data": data, + } + if password: + data["password"] = password + if redirect_to: + redirect_to_encoded = encode_uri_component(redirect_to) + data["redirect_to"] = redirect_to_encoded + url = f"{self.url}/admin/generate_link" + response = post(url, dumps(data), headers) + return parse_response(response, parse_session_or_user) + + def set_auth_cookie(self, req, res): """Stub for parity with JS api.""" raise NotImplementedError("set_auth_cookie not implemented.") - def get_user_by_cookie(req): + def get_user_by_cookie(self, req): """Stub for parity with JS api.""" raise NotImplementedError("get_user_by_cookie not implemented.")